Skip to main content

fakecloud_s3/
eventstream.rs

1//! Minimal `application/vnd.amazon.eventstream` frame encoder for S3 Select.
2//!
3//! Reuses the same wire format as Lambda's eventstream (prelude + headers +
4//! payload + CRCs) but defines S3-specific event types.
5
6const HEADER_TYPE_STRING: u8 = 7;
7
8pub fn encode_frame(headers: &[(&str, &str)], payload: &[u8]) -> Vec<u8> {
9    let headers_bytes = encode_headers(headers);
10    let headers_len = headers_bytes.len() as u32;
11    let total_len = 12u32 + headers_len + payload.len() as u32 + 4;
12
13    let mut out = Vec::with_capacity(total_len as usize);
14    out.extend_from_slice(&total_len.to_be_bytes());
15    out.extend_from_slice(&headers_len.to_be_bytes());
16
17    let prelude_crc = crc32fast::hash(&out[..8]);
18    out.extend_from_slice(&prelude_crc.to_be_bytes());
19
20    out.extend_from_slice(&headers_bytes);
21    out.extend_from_slice(payload);
22
23    let msg_crc = crc32fast::hash(&out);
24    out.extend_from_slice(&msg_crc.to_be_bytes());
25
26    out
27}
28
29fn encode_headers(headers: &[(&str, &str)]) -> Vec<u8> {
30    let mut buf = Vec::new();
31    for (name, value) in headers {
32        let name_bytes = name.as_bytes();
33        let value_bytes = value.as_bytes();
34        debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
35        debug_assert!(
36            value_bytes.len() <= u16::MAX as usize,
37            "header value too long"
38        );
39        buf.push(name_bytes.len() as u8);
40        buf.extend_from_slice(name_bytes);
41        buf.push(HEADER_TYPE_STRING);
42        buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
43        buf.extend_from_slice(value_bytes);
44    }
45    buf
46}
47
48pub fn records_event_frame(payload: &[u8]) -> Vec<u8> {
49    encode_frame(
50        &[
51            (":event-type", "Records"),
52            (":content-type", "application/octet-stream"),
53            (":message-type", "event"),
54        ],
55        payload,
56    )
57}
58
59pub fn stats_event_frame(bytes_scanned: u64, bytes_processed: u64, bytes_returned: u64) -> Vec<u8> {
60    let payload = format!(
61        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Stats>\n  <BytesScanned>{}</BytesScanned>\n  <BytesProcessed>{}</BytesProcessed>\n  <BytesReturned>{}</BytesReturned>\n</Stats>\n",
62        bytes_scanned, bytes_processed, bytes_returned
63    );
64    encode_frame(
65        &[
66            (":event-type", "Stats"),
67            (":content-type", "application/xml"),
68            (":message-type", "event"),
69        ],
70        payload.as_bytes(),
71    )
72}
73
74pub fn end_event_frame() -> Vec<u8> {
75    encode_frame(
76        &[
77            (":event-type", "End"),
78            (":content-type", "application/xml"),
79            (":message-type", "event"),
80        ],
81        b"",
82    )
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use aws_smithy_eventstream::frame::read_message_from;
89    use bytes::Bytes;
90
91    #[test]
92    fn records_frame_decodes_with_sdk() {
93        let frame = records_event_frame(b"hello");
94        let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
95        assert_eq!(msg.payload().as_ref(), b"hello");
96        let headers: Vec<_> = msg
97            .headers()
98            .iter()
99            .map(|h| {
100                (
101                    h.name().as_str().to_string(),
102                    h.value().as_string().unwrap().as_str().to_string(),
103                )
104            })
105            .collect();
106        assert!(headers
107            .iter()
108            .any(|(k, v)| k == ":event-type" && v == "Records"));
109        assert!(headers
110            .iter()
111            .any(|(k, v)| k == ":message-type" && v == "event"));
112    }
113
114    #[test]
115    fn stats_frame_decodes_with_sdk() {
116        let frame = stats_event_frame(100, 50, 25);
117        let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
118        let payload = std::str::from_utf8(msg.payload().as_ref()).unwrap();
119        assert!(payload.contains("BytesScanned"));
120        assert!(payload.contains("100"));
121    }
122
123    #[test]
124    fn end_frame_decodes_with_sdk() {
125        let frame = end_event_frame();
126        let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
127        assert!(msg.payload().is_empty());
128        let headers: Vec<_> = msg
129            .headers()
130            .iter()
131            .map(|h| {
132                (
133                    h.name().as_str().to_string(),
134                    h.value().as_string().unwrap().as_str().to_string(),
135                )
136            })
137            .collect();
138        assert!(headers
139            .iter()
140            .any(|(k, v)| k == ":event-type" && v == "End"));
141    }
142
143    #[test]
144    fn concatenated_frames_decode_with_sdk_decoder() {
145        let records = records_event_frame(b"hello");
146        let stats = stats_event_frame(100, 50, 25);
147        let end = end_event_frame();
148
149        let mut combined = Vec::new();
150        combined.extend_from_slice(&records);
151        combined.extend_from_slice(&stats);
152        combined.extend_from_slice(&end);
153
154        let mut decoder = aws_smithy_eventstream::frame::MessageFrameDecoder::new();
155        let mut buf = bytes_utils::SegmentedBuf::new();
156        buf.push(Bytes::from(combined));
157
158        let mut count = 0;
159        while let aws_smithy_eventstream::frame::DecodedFrame::Complete(_) =
160            decoder.decode_frame(&mut buf).unwrap()
161        {
162            count += 1;
163        }
164        assert_eq!(count, 3, "Expected 3 decoded frames");
165    }
166}