aws_lambda_log_proxy/processor/
sink.rs1use std::{
2 env, io,
3 num::ParseIntError,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use super::Timestamp;
9use tokio::{
10 io::{AsyncWrite, AsyncWriteExt, Stderr, Stdout},
11 sync::{mpsc, oneshot},
12};
13
14#[derive(Debug, Clone, PartialEq, Eq, Default)]
15pub enum OutputFormat {
16 #[default]
18 Standard,
19 TelemetryLogFd,
22}
23
24pub struct Sink<T: AsyncWrite + Send + Unpin + 'static> {
39 writer: T,
40 format: OutputFormat,
41 buffer_size: usize,
42}
43
44impl<T: AsyncWrite + Send + Unpin + 'static> Sink<T> {
45 pub fn new(writer: T) -> Self {
47 Sink {
48 writer,
49 format: OutputFormat::Standard,
50 buffer_size: 16,
51 }
52 }
53
54 pub fn format(mut self, kind: OutputFormat) -> Self {
56 self.format = kind;
57 self
58 }
59
60 pub fn buffer_size(mut self, size: usize) -> Self {
63 self.buffer_size = size;
64 self
65 }
66
67 pub fn spawn(self) -> SinkHandle {
69 let (action_tx, mut action_rx) = mpsc::channel(self.buffer_size);
70
71 let mut writer = self.writer;
72 tokio::spawn(async move {
73 while let Some(action) = action_rx.recv().await {
74 match action {
75 Action::Write(bytes) => writer.write_all(&bytes).await.unwrap(),
76 Action::Flush(ack_tx) => {
77 writer.flush().await.unwrap();
78 ack_tx.send(()).unwrap();
79 }
80 }
81 }
82 });
83
84 SinkHandle {
85 action_tx,
86 format: self.format,
87 }
88 }
89}
90
91impl Sink<Stdout> {
92 pub fn stdout() -> Self {
94 Sink::new(tokio::io::stdout())
95 }
96}
97
98impl Sink<Stderr> {
99 pub fn stderr() -> Self {
101 Sink::new(tokio::io::stderr())
102 }
103}
104
105#[derive(Default, Clone, Copy, Debug, PartialEq, Eq)]
106pub struct MockSink;
107
108impl AsyncWrite for MockSink {
109 fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<io::Result<usize>> {
110 Poll::Ready(Ok(0))
111 }
112
113 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
114 Poll::Ready(Ok(()))
115 }
116
117 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
118 Poll::Ready(Ok(()))
119 }
120}
121
122impl Sink<MockSink> {
123 pub fn mock() -> Self {
125 Sink::new(MockSink)
126 }
127}
128
129#[cfg(target_os = "linux")]
130impl Sink<tokio::fs::File> {
131 pub fn lambda_telemetry_log_fd() -> Result<Self, Error> {
133 env::var("_LAMBDA_TELEMETRY_LOG_FD")
134 .map_err(Error::VarError)
135 .and_then(|fd| {
136 let fd = fd.parse().map_err(Error::ParseIntError)?;
137 Ok(
138 Sink::new(unsafe { <tokio::fs::File as std::os::fd::FromRawFd>::from_raw_fd(fd) })
139 .format(OutputFormat::TelemetryLogFd),
140 )
141 })
142 }
143}
144
145enum Action {
146 Write(Vec<u8>),
147 Flush(oneshot::Sender<()>),
148}
149
150#[derive(Clone)]
154pub struct SinkHandle {
155 action_tx: mpsc::Sender<Action>,
156 format: OutputFormat,
157}
158
159impl SinkHandle {
160 pub async fn write_line(&self, s: String, timestamp: Timestamp) {
165 let mut line = s.into_bytes();
166 line.push(b'\n');
167
168 let bytes = match self.format {
169 OutputFormat::Standard => line,
170 OutputFormat::TelemetryLogFd => {
171 let mut content = build_telemetry_log_fd_format_header(&line, timestamp.timestamp_micros());
172 content.append(&mut line);
173 content
174 }
175 };
176
177 self.action_tx.send(Action::Write(bytes)).await.unwrap();
178 }
179
180 pub async fn flush(&self) {
182 let (ack_tx, ack_rx) = oneshot::channel();
183 self.action_tx.send(Action::Flush(ack_tx)).await.unwrap();
184 ack_rx.await.unwrap();
185 }
186}
187
188fn build_telemetry_log_fd_format_header(line: &[u8], timestamp: i64) -> Vec<u8> {
189 let mut buf = vec![0; 16];
191 buf[0..4].copy_from_slice(&0xa55a0003u32.to_be_bytes());
194 buf[4..8].copy_from_slice(&(line.len() as u32).to_be_bytes());
196 buf[8..16].copy_from_slice(×tamp.to_be_bytes());
198 buf
199}
200
201#[derive(Debug)]
202pub enum Error {
203 VarError(env::VarError),
204 ParseIntError(ParseIntError),
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use chrono::DateTime;
211 use serial_test::serial;
212
213 #[test]
214 fn default_sink_format() {
215 let builder = Sink::new(Vec::new());
216 assert_eq!(builder.format, OutputFormat::Standard);
217 }
218
219 #[test]
220 #[serial]
221 fn sink_stdout_format() {
222 let sb = Sink::stdout();
223 assert_eq!(sb.format, OutputFormat::Standard);
224 }
225
226 #[test]
227 #[serial]
228 fn sink_stderr_format() {
229 let sb = Sink::stderr();
230 assert_eq!(sb.format, OutputFormat::Standard);
231 }
232
233 #[test]
234 #[serial]
235 fn sink_format() {
236 let sink = Sink::stdout().format(OutputFormat::TelemetryLogFd);
237 assert_eq!(sink.format, OutputFormat::TelemetryLogFd);
238 }
239
240 #[test]
241 #[serial]
242 fn sink_buffer_size() {
243 let sink = Sink::stdout().buffer_size(32);
244 assert_eq!(sink.buffer_size, 32);
245 }
246
247 #[cfg(target_os = "linux")]
248 #[test]
249 #[serial]
250 fn sink_lambda_telemetry_log_fd() {
251 use std::{fs::File, os::fd::IntoRawFd};
252
253 let file = File::create("/dev/null").unwrap();
254 let fd = file.into_raw_fd();
255 env::set_var("_LAMBDA_TELEMETRY_LOG_FD", fd.to_string());
256 let sink = Sink::lambda_telemetry_log_fd().unwrap();
257 assert_eq!(sink.format, OutputFormat::TelemetryLogFd);
258 env::remove_var("_LAMBDA_TELEMETRY_LOG_FD");
259
260 let result = Sink::lambda_telemetry_log_fd();
262 assert!(matches!(result, Err(Error::VarError(_))));
263
264 env::set_var("_LAMBDA_TELEMETRY_LOG_FD", "invalid");
266 let result = Sink::lambda_telemetry_log_fd();
267 assert!(matches!(result, Err(Error::ParseIntError(_))));
268 env::remove_var("_LAMBDA_TELEMETRY_LOG_FD");
269 }
270
271 #[tokio::test]
272 async fn sink_write_line() {
273 Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build())
275 .spawn()
276 .write_line("hello".to_string(), mock_timestamp())
277 .await;
278
279 assert_eq!(
281 build_telemetry_log_fd_format_header("hello\n".as_bytes(), 0),
282 &[0xa5, 0x5a, 0x00, 0x03, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0]
283 );
284
285 Sink::new(
287 tokio_test::io::Builder::new()
288 .write(b"\xa5\x5a\x00\x03\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00hello\n")
289 .build(),
290 )
291 .format(OutputFormat::TelemetryLogFd)
292 .spawn()
293 .write_line("hello".to_string(), mock_timestamp())
294 .await;
295 }
296
297 #[tokio::test]
298 async fn sink_flush() {
299 let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
300 sink.write_line("hello".to_string(), mock_timestamp()).await;
301 sink.flush().await;
302 }
303
304 #[tokio::test]
305 async fn sink_handle_clone_able() {
306 let sink = Sink::new(
307 tokio_test::io::Builder::new()
308 .write(b"hello\nworld\n")
309 .build(),
310 )
311 .spawn();
312 let sink2 = sink.clone();
313 sink.write_line("hello".to_string(), mock_timestamp()).await;
314 sink2
315 .write_line("world".to_string(), mock_timestamp())
316 .await;
317 }
318
319 fn mock_timestamp() -> Timestamp {
320 DateTime::from_timestamp(0, 0).unwrap()
321 }
322}