Skip to main content

fakecloud_lambda/
eventstream.rs

1//! Minimal `application/vnd.amazon.eventstream` frame encoder.
2//!
3//! This is the wire format AWS uses for streaming responses on
4//! `InvokeWithResponseStream`, S3 SelectObjectContent, Kinesis
5//! SubscribeToShard, Transcribe, etc. Each frame is:
6//!
7//! ```text
8//! +-----------------------------------+
9//! | total length        (u32 BE)      |
10//! | headers length      (u32 BE)      |
11//! | prelude CRC32       (u32 BE)      |  CRC of the two preceding u32s
12//! | headers bytes       (raw)         |
13//! | payload bytes       (raw)         |
14//! | message CRC32       (u32 BE)      |  CRC of the whole frame so far
15//! +-----------------------------------+
16//! ```
17//!
18//! Each header is encoded as:
19//!
20//! ```text
21//! +---------------------------------------+
22//! | name length     (u8)                  |
23//! | name bytes                            |
24//! | value type      (u8)                  |  7 = string
25//! | value length    (u16 BE) [for strings]|
26//! | value bytes                           |
27//! +---------------------------------------+
28//! ```
29//!
30//! Only the string header type (7) is needed for Lambda's response
31//! stream — `:event-type`, `:content-type`, `:message-type` are all
32//! short ASCII strings. Other value types (bool, int, byte_array,
33//! timestamp, uuid) aren't required here and are intentionally omitted
34//! to keep the surface minimal.
35
36const HEADER_TYPE_STRING: u8 = 7;
37
38/// Encode a single eventstream frame from a list of `(name, value)`
39/// string headers and a payload byte slice. Returns the bytes ready to
40/// be written to the response body.
41pub fn encode_frame(headers: &[(&str, &str)], payload: &[u8]) -> Vec<u8> {
42    let headers_bytes = encode_headers(headers);
43    let headers_len = headers_bytes.len() as u32;
44    // total = 4 (total) + 4 (headers_len) + 4 (prelude CRC) + headers + payload + 4 (msg CRC)
45    let total_len = 12u32 + headers_len + payload.len() as u32 + 4;
46
47    let mut out = Vec::with_capacity(total_len as usize);
48    out.extend_from_slice(&total_len.to_be_bytes());
49    out.extend_from_slice(&headers_len.to_be_bytes());
50
51    let prelude_crc = crc32fast::hash(&out[..8]);
52    out.extend_from_slice(&prelude_crc.to_be_bytes());
53
54    out.extend_from_slice(&headers_bytes);
55    out.extend_from_slice(payload);
56
57    let msg_crc = crc32fast::hash(&out);
58    out.extend_from_slice(&msg_crc.to_be_bytes());
59
60    out
61}
62
63fn encode_headers(headers: &[(&str, &str)]) -> Vec<u8> {
64    let mut buf = Vec::new();
65    for (name, value) in headers {
66        let name_bytes = name.as_bytes();
67        let value_bytes = value.as_bytes();
68        debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
69        debug_assert!(
70            value_bytes.len() <= u16::MAX as usize,
71            "header value too long"
72        );
73        buf.push(name_bytes.len() as u8);
74        buf.extend_from_slice(name_bytes);
75        buf.push(HEADER_TYPE_STRING);
76        buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
77        buf.extend_from_slice(value_bytes);
78    }
79    buf
80}
81
82/// Build a `PayloadChunk` event frame carrying a slice of the function's
83/// streamed response. AWS sends one of these per logical chunk emitted
84/// by the function (e.g. each `responseStream.write(...)` call in a
85/// Node.js streaming handler). The body is the raw chunk bytes — AWS
86/// does **not** wrap them in JSON; clients reconstruct the response by
87/// concatenating the payloads of every `PayloadChunk` event.
88pub fn payload_chunk_frame(chunk: &[u8]) -> Vec<u8> {
89    encode_frame(
90        &[
91            (":event-type", "PayloadChunk"),
92            (":content-type", "application/octet-stream"),
93            (":message-type", "event"),
94        ],
95        chunk,
96    )
97}
98
99/// Build the terminal `InvokeComplete` event frame. `error_code` /
100/// `error_details` are `None` on success; `log_result_b64` is the
101/// base64-encoded last 4 KiB of the function's tail log (empty string
102/// when no log was captured).
103pub fn invoke_complete_frame(
104    error_code: Option<&str>,
105    error_details: Option<&str>,
106    log_result_b64: &str,
107) -> Vec<u8> {
108    let payload = serde_json::json!({
109        "ErrorCode": error_code,
110        "ErrorDetails": error_details,
111        "LogResult": log_result_b64,
112    });
113    let body = serde_json::to_vec(&payload).expect("static JSON shape never fails to serialize");
114    encode_frame(
115        &[
116            (":event-type", "InvokeComplete"),
117            (":content-type", "application/json"),
118            (":message-type", "event"),
119        ],
120        &body,
121    )
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    /// Decode a single eventstream frame back into `(headers, payload)`,
129    /// validating both CRCs. Used by the tests below to round-trip the
130    /// encoder; mirrors what an AWS SDK does when parsing the response.
131    fn decode_frame(buf: &[u8]) -> (Vec<(String, String)>, Vec<u8>) {
132        assert!(buf.len() >= 16, "frame too short");
133        let total_len = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize;
134        assert_eq!(total_len, buf.len(), "total length mismatch");
135        let headers_len = u32::from_be_bytes(buf[4..8].try_into().unwrap()) as usize;
136        let prelude_crc = u32::from_be_bytes(buf[8..12].try_into().unwrap());
137        assert_eq!(prelude_crc, crc32fast::hash(&buf[0..8]), "prelude CRC bad");
138
139        let headers_start = 12;
140        let headers_end = headers_start + headers_len;
141        let payload_end = total_len - 4;
142        let msg_crc = u32::from_be_bytes(buf[payload_end..total_len].try_into().unwrap());
143        assert_eq!(msg_crc, crc32fast::hash(&buf[..payload_end]), "msg CRC bad");
144
145        let mut headers = Vec::new();
146        let hbuf = &buf[headers_start..headers_end];
147        let mut i = 0;
148        while i < hbuf.len() {
149            let nl = hbuf[i] as usize;
150            i += 1;
151            let name = std::str::from_utf8(&hbuf[i..i + nl]).unwrap().to_string();
152            i += nl;
153            let vt = hbuf[i];
154            i += 1;
155            assert_eq!(vt, HEADER_TYPE_STRING, "only string headers supported");
156            let vl = u16::from_be_bytes(hbuf[i..i + 2].try_into().unwrap()) as usize;
157            i += 2;
158            let value = std::str::from_utf8(&hbuf[i..i + vl]).unwrap().to_string();
159            i += vl;
160            headers.push((name, value));
161        }
162
163        let payload = buf[headers_end..payload_end].to_vec();
164        (headers, payload)
165    }
166
167    #[test]
168    fn round_trip_payload_chunk() {
169        let frame = payload_chunk_frame(b"hello world");
170        let (headers, payload) = decode_frame(&frame);
171        assert_eq!(payload, b"hello world");
172        assert!(headers
173            .iter()
174            .any(|(k, v)| k == ":event-type" && v == "PayloadChunk"));
175        assert!(headers
176            .iter()
177            .any(|(k, v)| k == ":message-type" && v == "event"));
178    }
179
180    #[test]
181    fn round_trip_invoke_complete_success() {
182        let frame = invoke_complete_frame(None, None, "");
183        let (headers, payload) = decode_frame(&frame);
184        assert!(headers
185            .iter()
186            .any(|(k, v)| k == ":event-type" && v == "InvokeComplete"));
187        let v: serde_json::Value = serde_json::from_slice(&payload).unwrap();
188        assert!(v["ErrorCode"].is_null());
189        assert!(v["ErrorDetails"].is_null());
190        assert_eq!(v["LogResult"], "");
191    }
192
193    #[test]
194    fn round_trip_invoke_complete_error() {
195        let frame = invoke_complete_frame(Some("Runtime.UserError"), Some("boom"), "bG9n");
196        let (headers, payload) = decode_frame(&frame);
197        assert!(headers
198            .iter()
199            .any(|(k, v)| k == ":event-type" && v == "InvokeComplete"));
200        let v: serde_json::Value = serde_json::from_slice(&payload).unwrap();
201        assert_eq!(v["ErrorCode"], "Runtime.UserError");
202        assert_eq!(v["ErrorDetails"], "boom");
203        assert_eq!(v["LogResult"], "bG9n");
204    }
205
206    #[test]
207    fn empty_chunk_still_well_formed() {
208        let frame = payload_chunk_frame(b"");
209        let (_headers, payload) = decode_frame(&frame);
210        assert!(payload.is_empty());
211    }
212
213    #[test]
214    fn multiple_frames_concatenate_cleanly() {
215        let mut out = Vec::new();
216        out.extend(payload_chunk_frame(b"chunk-1"));
217        out.extend(payload_chunk_frame(b"chunk-2"));
218        out.extend(invoke_complete_frame(None, None, ""));
219
220        // Decode each frame in sequence by reading total_len.
221        let mut frames = Vec::new();
222        let mut cursor = 0;
223        while cursor < out.len() {
224            let total = u32::from_be_bytes(out[cursor..cursor + 4].try_into().unwrap()) as usize;
225            frames.push(decode_frame(&out[cursor..cursor + total]));
226            cursor += total;
227        }
228        assert_eq!(frames.len(), 3);
229        assert_eq!(frames[0].1, b"chunk-1");
230        assert_eq!(frames[1].1, b"chunk-2");
231        // Last is InvokeComplete with JSON body
232        let v: serde_json::Value = serde_json::from_slice(&frames[2].1).unwrap();
233        assert!(v["ErrorCode"].is_null());
234    }
235}