1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::sync::Arc;
use tokio::{
  io::{AsyncWrite, AsyncWriteExt},
  sync::Mutex,
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutputFormat {
  /// For this variant, log lines are written as bytes and a newline (`'\n'`) will be appended.
  Standard,
  /// For this variant, log lines will be appended with a newline (`'\n'`) and written using the telemetry log format.
  /// See https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/2ce88619fd176a5823bc5f38c5484d1cbdf95717/src/LogPatch.js#L90-L101.
  TelemetryLogFd,
}

/// A sink to write log lines to.
/// # Caveats
/// To prevent interleaved output, you should clone a sink
/// instead of creating a new one if you want to write to the same sink.
pub struct Sink(Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>, OutputFormat);

impl Clone for Sink {
  fn clone(&self) -> Self {
    Sink(Arc::clone(&self.0), self.1.clone())
  }
}

impl Sink {
  /// Create a new [`Standard`](OutputFormat::Standard) sink from an [`AsyncWrite`] implementor.
  pub fn new(s: impl AsyncWrite + Send + Unpin + 'static) -> Self {
    Sink(Arc::new(Mutex::new(Box::new(s))), OutputFormat::Standard)
  }

  /// Create a new [`stdout`](tokio::io::stdout) sink.
  pub fn stdout() -> Self {
    Sink::new(tokio::io::stdout())
  }

  /// Create a new [`stderr`](tokio::io::stderr) sink.
  pub fn stderr() -> Self {
    Sink::new(tokio::io::stderr())
  }

  /// Set the output format of the sink.
  pub fn format(mut self, kind: OutputFormat) -> Self {
    self.1 = kind;
    self
  }

  #[cfg(target_os = "linux")]
  /// Create a new sink from the `_LAMBDA_TELEMETRY_LOG_FD` environment variable.
  pub fn lambda_telemetry_log_fd() -> Result<Self, Error> {
    std::env::var("_LAMBDA_TELEMETRY_LOG_FD")
      .map_err(|e| Error::VarError(e))
      .and_then(|fd| {
        let fd = fd.parse().map_err(|e| Error::ParseIntError(e))?;
        Ok(
          Sink::new(unsafe { <tokio::fs::File as std::os::fd::FromRawFd>::from_raw_fd(fd) })
            .format(OutputFormat::TelemetryLogFd),
        )
      })
  }

  /// Write a string to the sink then write a newline(`'\n'`).
  pub async fn write_line(&self, s: String) {
    let mut f = self.0.lock().await;
    match self.1 {
      OutputFormat::Standard => {
        f.write_all(s.as_bytes()).await.unwrap();
        f.write_all(b"\n").await.unwrap();
      }
      OutputFormat::TelemetryLogFd => {
        // create a 16 bytes buffer to store type and length
        let mut buf = [0; 16];
        // the first 4 bytes are 0xa55a0003
        // TODO: what about the level mask?
        buf[0..4].copy_from_slice(&0xa55a0003u32.to_be_bytes());
        // the second 4 bytes are the length of the message
        let len = s.len() as u32 + 1; // 1 for the last newline
        buf[4..8].copy_from_slice(&len.to_be_bytes());
        // the next 8 bytes are the UNIX timestamp of the message with microseconds precision.
        let timestamp = chrono::Utc::now().timestamp_micros();
        buf[8..16].copy_from_slice(&timestamp.to_be_bytes());
        // write the buffer
        f.write_all(&buf).await.unwrap();
        f.write_all(s.as_bytes()).await.unwrap();
        f.write_all(b"\n").await.unwrap();
      }
    }
  }

  /// Flush the sink.
  pub async fn flush(&self) {
    self.0.lock().await.flush().await.unwrap()
  }
}

#[derive(Debug)]
pub enum Error {
  VarError(std::env::VarError),
  ParseIntError(std::num::ParseIntError),
}