fakecloud_lambda/
eventstream.rs1const HEADER_TYPE_STRING: u8 = 7;
37
38pub fn encode_frame(headers: &[(&str, &str)], payload: &[u8]) -> Vec<u8> {
42 let headers_bytes = encode_headers(headers);
43 let headers_len = headers_bytes.len() as u32;
44 let total_len = 12u32 + headers_len + payload.len() as u32 + 4;
46
47 let mut out = Vec::with_capacity(total_len as usize);
48 out.extend_from_slice(&total_len.to_be_bytes());
49 out.extend_from_slice(&headers_len.to_be_bytes());
50
51 let prelude_crc = crc32fast::hash(&out[..8]);
52 out.extend_from_slice(&prelude_crc.to_be_bytes());
53
54 out.extend_from_slice(&headers_bytes);
55 out.extend_from_slice(payload);
56
57 let msg_crc = crc32fast::hash(&out);
58 out.extend_from_slice(&msg_crc.to_be_bytes());
59
60 out
61}
62
63fn encode_headers(headers: &[(&str, &str)]) -> Vec<u8> {
64 let mut buf = Vec::new();
65 for (name, value) in headers {
66 let name_bytes = name.as_bytes();
67 let value_bytes = value.as_bytes();
68 debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
69 debug_assert!(
70 value_bytes.len() <= u16::MAX as usize,
71 "header value too long"
72 );
73 buf.push(name_bytes.len() as u8);
74 buf.extend_from_slice(name_bytes);
75 buf.push(HEADER_TYPE_STRING);
76 buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
77 buf.extend_from_slice(value_bytes);
78 }
79 buf
80}
81
82pub fn payload_chunk_frame(chunk: &[u8]) -> Vec<u8> {
89 encode_frame(
90 &[
91 (":event-type", "PayloadChunk"),
92 (":content-type", "application/octet-stream"),
93 (":message-type", "event"),
94 ],
95 chunk,
96 )
97}
98
99pub fn invoke_complete_frame(
104 error_code: Option<&str>,
105 error_details: Option<&str>,
106 log_result_b64: &str,
107) -> Vec<u8> {
108 let payload = serde_json::json!({
109 "ErrorCode": error_code,
110 "ErrorDetails": error_details,
111 "LogResult": log_result_b64,
112 });
113 let body = serde_json::to_vec(&payload).expect("static JSON shape never fails to serialize");
114 encode_frame(
115 &[
116 (":event-type", "InvokeComplete"),
117 (":content-type", "application/json"),
118 (":message-type", "event"),
119 ],
120 &body,
121 )
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 fn decode_frame(buf: &[u8]) -> (Vec<(String, String)>, Vec<u8>) {
132 assert!(buf.len() >= 16, "frame too short");
133 let total_len = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize;
134 assert_eq!(total_len, buf.len(), "total length mismatch");
135 let headers_len = u32::from_be_bytes(buf[4..8].try_into().unwrap()) as usize;
136 let prelude_crc = u32::from_be_bytes(buf[8..12].try_into().unwrap());
137 assert_eq!(prelude_crc, crc32fast::hash(&buf[0..8]), "prelude CRC bad");
138
139 let headers_start = 12;
140 let headers_end = headers_start + headers_len;
141 let payload_end = total_len - 4;
142 let msg_crc = u32::from_be_bytes(buf[payload_end..total_len].try_into().unwrap());
143 assert_eq!(msg_crc, crc32fast::hash(&buf[..payload_end]), "msg CRC bad");
144
145 let mut headers = Vec::new();
146 let hbuf = &buf[headers_start..headers_end];
147 let mut i = 0;
148 while i < hbuf.len() {
149 let nl = hbuf[i] as usize;
150 i += 1;
151 let name = std::str::from_utf8(&hbuf[i..i + nl]).unwrap().to_string();
152 i += nl;
153 let vt = hbuf[i];
154 i += 1;
155 assert_eq!(vt, HEADER_TYPE_STRING, "only string headers supported");
156 let vl = u16::from_be_bytes(hbuf[i..i + 2].try_into().unwrap()) as usize;
157 i += 2;
158 let value = std::str::from_utf8(&hbuf[i..i + vl]).unwrap().to_string();
159 i += vl;
160 headers.push((name, value));
161 }
162
163 let payload = buf[headers_end..payload_end].to_vec();
164 (headers, payload)
165 }
166
167 #[test]
168 fn round_trip_payload_chunk() {
169 let frame = payload_chunk_frame(b"hello world");
170 let (headers, payload) = decode_frame(&frame);
171 assert_eq!(payload, b"hello world");
172 assert!(headers
173 .iter()
174 .any(|(k, v)| k == ":event-type" && v == "PayloadChunk"));
175 assert!(headers
176 .iter()
177 .any(|(k, v)| k == ":message-type" && v == "event"));
178 }
179
180 #[test]
181 fn round_trip_invoke_complete_success() {
182 let frame = invoke_complete_frame(None, None, "");
183 let (headers, payload) = decode_frame(&frame);
184 assert!(headers
185 .iter()
186 .any(|(k, v)| k == ":event-type" && v == "InvokeComplete"));
187 let v: serde_json::Value = serde_json::from_slice(&payload).unwrap();
188 assert!(v["ErrorCode"].is_null());
189 assert!(v["ErrorDetails"].is_null());
190 assert_eq!(v["LogResult"], "");
191 }
192
193 #[test]
194 fn round_trip_invoke_complete_error() {
195 let frame = invoke_complete_frame(Some("Runtime.UserError"), Some("boom"), "bG9n");
196 let (headers, payload) = decode_frame(&frame);
197 assert!(headers
198 .iter()
199 .any(|(k, v)| k == ":event-type" && v == "InvokeComplete"));
200 let v: serde_json::Value = serde_json::from_slice(&payload).unwrap();
201 assert_eq!(v["ErrorCode"], "Runtime.UserError");
202 assert_eq!(v["ErrorDetails"], "boom");
203 assert_eq!(v["LogResult"], "bG9n");
204 }
205
206 #[test]
207 fn empty_chunk_still_well_formed() {
208 let frame = payload_chunk_frame(b"");
209 let (_headers, payload) = decode_frame(&frame);
210 assert!(payload.is_empty());
211 }
212
213 #[test]
214 fn multiple_frames_concatenate_cleanly() {
215 let mut out = Vec::new();
216 out.extend(payload_chunk_frame(b"chunk-1"));
217 out.extend(payload_chunk_frame(b"chunk-2"));
218 out.extend(invoke_complete_frame(None, None, ""));
219
220 let mut frames = Vec::new();
222 let mut cursor = 0;
223 while cursor < out.len() {
224 let total = u32::from_be_bytes(out[cursor..cursor + 4].try_into().unwrap()) as usize;
225 frames.push(decode_frame(&out[cursor..cursor + total]));
226 cursor += total;
227 }
228 assert_eq!(frames.len(), 3);
229 assert_eq!(frames[0].1, b"chunk-1");
230 assert_eq!(frames[1].1, b"chunk-2");
231 let v: serde_json::Value = serde_json::from_slice(&frames[2].1).unwrap();
233 assert!(v["ErrorCode"].is_null());
234 }
235}