aws_lambda_log_proxy/processor/
sink.rs

1use std::{
2  env, io,
3  num::ParseIntError,
4  pin::Pin,
5  task::{Context, Poll},
6};
7
8use super::Timestamp;
9use tokio::{
10  io::{AsyncWrite, AsyncWriteExt, Stderr, Stdout},
11  sync::{mpsc, oneshot},
12};
13
14#[derive(Debug, Clone, PartialEq, Eq, Default)]
15pub enum OutputFormat {
16  /// For this variant, log lines are written as bytes and a newline (`'\n'`) will be appended.
17  #[default]
18  Standard,
19  /// For this variant, log lines will be appended with a newline (`'\n'`) and written using the telemetry log format.
20  /// See https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/2ce88619fd176a5823bc5f38c5484d1cbdf95717/src/LogPatch.js#L90-L101.
21  TelemetryLogFd,
22}
23
24/// A sink to write log lines to.
25/// # Caveats
26/// To prevent interleaved output, you should clone a [`SinkHandle`]
27/// instead of creating a new one if you want to write to the same sink.
28/// # Examples
29/// ```
30/// use aws_lambda_log_proxy::{Sink, SinkHandle};
31///
32/// #[tokio::main]
33/// async fn main() {
34///   let sink: SinkHandle = Sink::stdout().spawn();
35///   let sink2 = sink.clone();
36/// }
37/// ```
38pub struct Sink<T: AsyncWrite + Send + Unpin + 'static> {
39  writer: T,
40  format: OutputFormat,
41  buffer_size: usize,
42}
43
44impl<T: AsyncWrite + Send + Unpin + 'static> Sink<T> {
45  /// Create a new [`Standard`](OutputFormat::Standard) sink from an [`AsyncWrite`] implementor.
46  pub fn new(writer: T) -> Self {
47    Sink {
48      writer,
49      format: OutputFormat::Standard,
50      buffer_size: 16,
51    }
52  }
53
54  /// Set the output format of the sink.
55  pub fn format(mut self, kind: OutputFormat) -> Self {
56    self.format = kind;
57    self
58  }
59
60  /// Set the buffer size of the sink.
61  /// The default buffer size is `16` lines.
62  pub fn buffer_size(mut self, size: usize) -> Self {
63    self.buffer_size = size;
64    self
65  }
66
67  /// Spawn the sink and return a [`SinkHandle`] to write to it.
68  pub fn spawn(self) -> SinkHandle {
69    let (action_tx, mut action_rx) = mpsc::channel(self.buffer_size);
70
71    let mut writer = self.writer;
72    tokio::spawn(async move {
73      while let Some(action) = action_rx.recv().await {
74        match action {
75          Action::Write(bytes) => writer.write_all(&bytes).await.unwrap(),
76          Action::Flush(ack_tx) => {
77            writer.flush().await.unwrap();
78            ack_tx.send(()).unwrap();
79          }
80        }
81      }
82    });
83
84    SinkHandle {
85      action_tx,
86      format: self.format,
87    }
88  }
89}
90
91impl Sink<Stdout> {
92  /// Create a new [`stdout`](tokio::io::stdout) sink.
93  pub fn stdout() -> Self {
94    Sink::new(tokio::io::stdout())
95  }
96}
97
98impl Sink<Stderr> {
99  /// Create a new [`stderr`](tokio::io::stderr) sink.
100  pub fn stderr() -> Self {
101    Sink::new(tokio::io::stderr())
102  }
103}
104
105#[derive(Default, Clone, Copy, Debug, PartialEq, Eq)]
106pub struct MockSink;
107
108impl AsyncWrite for MockSink {
109  fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<io::Result<usize>> {
110    Poll::Ready(Ok(0))
111  }
112
113  fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
114    Poll::Ready(Ok(()))
115  }
116
117  fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
118    Poll::Ready(Ok(()))
119  }
120}
121
122impl Sink<MockSink> {
123  /// Create a new [`MockSink`] for testing.
124  pub fn mock() -> Self {
125    Sink::new(MockSink)
126  }
127}
128
129#[cfg(target_os = "linux")]
130impl Sink<tokio::fs::File> {
131  /// Create a new sink from the `_LAMBDA_TELEMETRY_LOG_FD` environment variable.
132  pub fn lambda_telemetry_log_fd() -> Result<Self, Error> {
133    env::var("_LAMBDA_TELEMETRY_LOG_FD")
134      .map_err(Error::VarError)
135      .and_then(|fd| {
136        let fd = fd.parse().map_err(Error::ParseIntError)?;
137        Ok(
138          Sink::new(unsafe { <tokio::fs::File as std::os::fd::FromRawFd>::from_raw_fd(fd) })
139            .format(OutputFormat::TelemetryLogFd),
140        )
141      })
142  }
143}
144
145enum Action {
146  Write(Vec<u8>),
147  Flush(oneshot::Sender<()>),
148}
149
150/// See [`Sink`].
151///
152/// This is cheap to clone.
153#[derive(Clone)]
154pub struct SinkHandle {
155  action_tx: mpsc::Sender<Action>,
156  format: OutputFormat,
157}
158
159impl SinkHandle {
160  /// Write a string to the sink with a newline(`'\n'`) appended.
161  /// The `timestamp` will be used if the [`Sink::format`] is [`OutputFormat::TelemetryLogFd`].
162  /// Bytes are pushed into a queue and might not be written immediately.
163  /// You can call [`Self::flush`] to ensure all buffered data is written to the underlying writer.
164  pub async fn write_line(&self, s: String, timestamp: Timestamp) {
165    let mut line = s.into_bytes();
166    line.push(b'\n');
167
168    let bytes = match self.format {
169      OutputFormat::Standard => line,
170      OutputFormat::TelemetryLogFd => {
171        let mut content = build_telemetry_log_fd_format_header(&line, timestamp.timestamp_micros());
172        content.append(&mut line);
173        content
174      }
175    };
176
177    self.action_tx.send(Action::Write(bytes)).await.unwrap();
178  }
179
180  /// Flush the sink. Wait until all buffered data is written to the underlying writer.
181  pub async fn flush(&self) {
182    let (ack_tx, ack_rx) = oneshot::channel();
183    self.action_tx.send(Action::Flush(ack_tx)).await.unwrap();
184    ack_rx.await.unwrap();
185  }
186}
187
188fn build_telemetry_log_fd_format_header(line: &[u8], timestamp: i64) -> Vec<u8> {
189  // create a 16 bytes buffer to store type and length
190  let mut buf = vec![0; 16];
191  // the first 4 bytes are 0xa55a0003
192  // TODO: what about the level mask? See https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/2ce88619fd176a5823bc5f38c5484d1cbdf95717/src/LogPatch.js#L113
193  buf[0..4].copy_from_slice(&0xa55a0003u32.to_be_bytes());
194  // the second 4 bytes are the length of the message
195  buf[4..8].copy_from_slice(&(line.len() as u32).to_be_bytes());
196  // the next 8 bytes are the UNIX timestamp of the message with microseconds precision.
197  buf[8..16].copy_from_slice(&timestamp.to_be_bytes());
198  buf
199}
200
201#[derive(Debug)]
202pub enum Error {
203  VarError(env::VarError),
204  ParseIntError(ParseIntError),
205}
206
207#[cfg(test)]
208mod tests {
209  use super::*;
210  use chrono::DateTime;
211  use serial_test::serial;
212
213  #[test]
214  fn default_sink_format() {
215    let builder = Sink::new(Vec::new());
216    assert_eq!(builder.format, OutputFormat::Standard);
217  }
218
219  #[test]
220  #[serial]
221  fn sink_stdout_format() {
222    let sb = Sink::stdout();
223    assert_eq!(sb.format, OutputFormat::Standard);
224  }
225
226  #[test]
227  #[serial]
228  fn sink_stderr_format() {
229    let sb = Sink::stderr();
230    assert_eq!(sb.format, OutputFormat::Standard);
231  }
232
233  #[test]
234  #[serial]
235  fn sink_format() {
236    let sink = Sink::stdout().format(OutputFormat::TelemetryLogFd);
237    assert_eq!(sink.format, OutputFormat::TelemetryLogFd);
238  }
239
240  #[test]
241  #[serial]
242  fn sink_buffer_size() {
243    let sink = Sink::stdout().buffer_size(32);
244    assert_eq!(sink.buffer_size, 32);
245  }
246
247  #[cfg(target_os = "linux")]
248  #[test]
249  #[serial]
250  fn sink_lambda_telemetry_log_fd() {
251    use std::{fs::File, os::fd::IntoRawFd};
252
253    let file = File::create("/dev/null").unwrap();
254    let fd = file.into_raw_fd();
255    env::set_var("_LAMBDA_TELEMETRY_LOG_FD", fd.to_string());
256    let sink = Sink::lambda_telemetry_log_fd().unwrap();
257    assert_eq!(sink.format, OutputFormat::TelemetryLogFd);
258    env::remove_var("_LAMBDA_TELEMETRY_LOG_FD");
259
260    // missing env var
261    let result = Sink::lambda_telemetry_log_fd();
262    assert!(matches!(result, Err(Error::VarError(_))));
263
264    // invalid fd
265    env::set_var("_LAMBDA_TELEMETRY_LOG_FD", "invalid");
266    let result = Sink::lambda_telemetry_log_fd();
267    assert!(matches!(result, Err(Error::ParseIntError(_))));
268    env::remove_var("_LAMBDA_TELEMETRY_LOG_FD");
269  }
270
271  #[tokio::test]
272  async fn sink_write_line() {
273    // standard format
274    Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build())
275      .spawn()
276      .write_line("hello".to_string(), mock_timestamp())
277      .await;
278
279    // telemetry log format header
280    assert_eq!(
281      build_telemetry_log_fd_format_header("hello\n".as_bytes(), 0),
282      &[0xa5, 0x5a, 0x00, 0x03, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0]
283    );
284
285    // telemetry log format
286    Sink::new(
287      tokio_test::io::Builder::new()
288        .write(b"\xa5\x5a\x00\x03\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00hello\n")
289        .build(),
290    )
291    .format(OutputFormat::TelemetryLogFd)
292    .spawn()
293    .write_line("hello".to_string(), mock_timestamp())
294    .await;
295  }
296
297  #[tokio::test]
298  async fn sink_flush() {
299    let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
300    sink.write_line("hello".to_string(), mock_timestamp()).await;
301    sink.flush().await;
302  }
303
304  #[tokio::test]
305  async fn sink_handle_clone_able() {
306    let sink = Sink::new(
307      tokio_test::io::Builder::new()
308        .write(b"hello\nworld\n")
309        .build(),
310    )
311    .spawn();
312    let sink2 = sink.clone();
313    sink.write_line("hello".to_string(), mock_timestamp()).await;
314    sink2
315      .write_line("world".to_string(), mock_timestamp())
316      .await;
317  }
318
319  fn mock_timestamp() -> Timestamp {
320    DateTime::from_timestamp(0, 0).unwrap()
321  }
322}