fakecloud_s3/
eventstream.rs1const HEADER_TYPE_STRING: u8 = 7;
7
8pub fn encode_frame(headers: &[(&str, &str)], payload: &[u8]) -> Vec<u8> {
9 let headers_bytes = encode_headers(headers);
10 let headers_len = headers_bytes.len() as u32;
11 let total_len = 12u32 + headers_len + payload.len() as u32 + 4;
12
13 let mut out = Vec::with_capacity(total_len as usize);
14 out.extend_from_slice(&total_len.to_be_bytes());
15 out.extend_from_slice(&headers_len.to_be_bytes());
16
17 let prelude_crc = crc32fast::hash(&out[..8]);
18 out.extend_from_slice(&prelude_crc.to_be_bytes());
19
20 out.extend_from_slice(&headers_bytes);
21 out.extend_from_slice(payload);
22
23 let msg_crc = crc32fast::hash(&out);
24 out.extend_from_slice(&msg_crc.to_be_bytes());
25
26 out
27}
28
29fn encode_headers(headers: &[(&str, &str)]) -> Vec<u8> {
30 let mut buf = Vec::new();
31 for (name, value) in headers {
32 let name_bytes = name.as_bytes();
33 let value_bytes = value.as_bytes();
34 debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
35 debug_assert!(
36 value_bytes.len() <= u16::MAX as usize,
37 "header value too long"
38 );
39 buf.push(name_bytes.len() as u8);
40 buf.extend_from_slice(name_bytes);
41 buf.push(HEADER_TYPE_STRING);
42 buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
43 buf.extend_from_slice(value_bytes);
44 }
45 buf
46}
47
48pub fn records_event_frame(payload: &[u8]) -> Vec<u8> {
49 encode_frame(
50 &[
51 (":event-type", "Records"),
52 (":content-type", "application/octet-stream"),
53 (":message-type", "event"),
54 ],
55 payload,
56 )
57}
58
59pub fn stats_event_frame(bytes_scanned: u64, bytes_processed: u64, bytes_returned: u64) -> Vec<u8> {
60 let payload = format!(
61 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Stats>\n <BytesScanned>{}</BytesScanned>\n <BytesProcessed>{}</BytesProcessed>\n <BytesReturned>{}</BytesReturned>\n</Stats>\n",
62 bytes_scanned, bytes_processed, bytes_returned
63 );
64 encode_frame(
65 &[
66 (":event-type", "Stats"),
67 (":content-type", "application/xml"),
68 (":message-type", "event"),
69 ],
70 payload.as_bytes(),
71 )
72}
73
74pub fn end_event_frame() -> Vec<u8> {
75 encode_frame(
76 &[
77 (":event-type", "End"),
78 (":content-type", "application/xml"),
79 (":message-type", "event"),
80 ],
81 b"",
82 )
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use aws_smithy_eventstream::frame::read_message_from;
89 use bytes::Bytes;
90
91 #[test]
92 fn records_frame_decodes_with_sdk() {
93 let frame = records_event_frame(b"hello");
94 let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
95 assert_eq!(msg.payload().as_ref(), b"hello");
96 let headers: Vec<_> = msg
97 .headers()
98 .iter()
99 .map(|h| {
100 (
101 h.name().as_str().to_string(),
102 h.value().as_string().unwrap().as_str().to_string(),
103 )
104 })
105 .collect();
106 assert!(headers
107 .iter()
108 .any(|(k, v)| k == ":event-type" && v == "Records"));
109 assert!(headers
110 .iter()
111 .any(|(k, v)| k == ":message-type" && v == "event"));
112 }
113
114 #[test]
115 fn stats_frame_decodes_with_sdk() {
116 let frame = stats_event_frame(100, 50, 25);
117 let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
118 let payload = std::str::from_utf8(msg.payload().as_ref()).unwrap();
119 assert!(payload.contains("BytesScanned"));
120 assert!(payload.contains("100"));
121 }
122
123 #[test]
124 fn end_frame_decodes_with_sdk() {
125 let frame = end_event_frame();
126 let msg = read_message_from(&mut Bytes::from(frame)).unwrap();
127 assert!(msg.payload().is_empty());
128 let headers: Vec<_> = msg
129 .headers()
130 .iter()
131 .map(|h| {
132 (
133 h.name().as_str().to_string(),
134 h.value().as_string().unwrap().as_str().to_string(),
135 )
136 })
137 .collect();
138 assert!(headers
139 .iter()
140 .any(|(k, v)| k == ":event-type" && v == "End"));
141 }
142
143 #[test]
144 fn concatenated_frames_decode_with_sdk_decoder() {
145 let records = records_event_frame(b"hello");
146 let stats = stats_event_frame(100, 50, 25);
147 let end = end_event_frame();
148
149 let mut combined = Vec::new();
150 combined.extend_from_slice(&records);
151 combined.extend_from_slice(&stats);
152 combined.extend_from_slice(&end);
153
154 let mut decoder = aws_smithy_eventstream::frame::MessageFrameDecoder::new();
155 let mut buf = bytes_utils::SegmentedBuf::new();
156 buf.push(Bytes::from(combined));
157
158 let mut count = 0;
159 while let aws_smithy_eventstream::frame::DecodedFrame::Complete(_) =
160 decoder.decode_frame(&mut buf).unwrap()
161 {
162 count += 1;
163 }
164 assert_eq!(count, 3, "Expected 3 decoded frames");
165 }
166}