1use std::convert::TryInto;
4use std::fmt::{Display, Formatter};
5use std::marker::PhantomData;
6use std::pin::Pin;
7
8use crc32fast::Hasher;
9use futures::task::{Context, Poll};
10use futures::Stream;
11
12use crate::error::RusotoError;
13use crate::request::HttpResponse;
14use crate::stream::ByteStream;
15
16#[doc(hidden)]
17pub trait DeserializeEvent: Sized {
18 fn deserialize_event(event_type: &str, data: &[u8]) -> Result<Self, RusotoError<()>>;
19}
20
21#[derive(Debug, Eq, PartialEq)]
22enum EventStreamParseError {
23 UnexpectedEof,
24 InvalidCrc,
25 InvalidData(&'static str),
26}
27
28fn check_crc32(data: &[u8], ref_value: u32) -> Result<(), EventStreamParseError> {
29 let mut hasher = Hasher::new();
30 hasher.update(data);
31
32 if hasher.finalize() != ref_value {
33 Err(EventStreamParseError::InvalidCrc)
34 } else {
35 Ok(())
36 }
37}
38
39fn read_slice<'a>(reader: &mut &'a [u8], size: usize) -> Result<&'a [u8], EventStreamParseError> {
40 if reader.len() < size {
41 return Err(EventStreamParseError::UnexpectedEof);
42 }
43
44 let slice = &reader[..size];
45 *reader = &reader[size..];
46 Ok(slice)
47}
48
49fn read_u8(reader: &mut &[u8]) -> Result<u8, EventStreamParseError> {
50 let buf = read_slice(reader, std::mem::size_of::<u8>())?
51 .try_into()
52 .unwrap();
53 Ok(u8::from_be_bytes(buf))
54}
55
56fn read_u16(reader: &mut &[u8]) -> Result<u16, EventStreamParseError> {
57 let buf = read_slice(reader, std::mem::size_of::<u16>())?
58 .try_into()
59 .unwrap();
60 Ok(u16::from_be_bytes(buf))
61}
62
63fn read_u32(reader: &mut &[u8]) -> Result<u32, EventStreamParseError> {
64 let buf = read_slice(reader, std::mem::size_of::<u32>())?
65 .try_into()
66 .unwrap();
67 Ok(u32::from_be_bytes(buf))
68}
69
70fn read_u64(reader: &mut &[u8]) -> Result<u64, EventStreamParseError> {
71 let buf = read_slice(reader, std::mem::size_of::<u64>())?
72 .try_into()
73 .unwrap();
74 Ok(u64::from_be_bytes(buf))
75}
76
77impl EventStreamParseError {
78 fn eof_as_invalid(self) -> Self {
79 match self {
80 EventStreamParseError::UnexpectedEof => {
81 EventStreamParseError::InvalidData("Malformed event: ended unexpectedly")
82 }
83 other => other,
84 }
85 }
86}
87
88impl std::error::Error for EventStreamParseError {}
89
90impl Display for EventStreamParseError {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 match self {
93 EventStreamParseError::UnexpectedEof => write!(f, "Expected additional data"),
94 EventStreamParseError::InvalidCrc => write!(f, "CRC check failed"),
95 EventStreamParseError::InvalidData(msg) => write!(f, "{}", msg),
96 }
97 }
98}
99
100impl<T> Into<RusotoError<T>> for EventStreamParseError {
101 fn into(self) -> RusotoError<T> {
102 RusotoError::ParseError(self.to_string())
103 }
104}
105
106#[allow(missing_docs)]
107#[derive(Clone, Copy, Debug, Eq, PartialEq)]
108enum EventStreamHeaderValue<'a> {
109 Bool(bool),
110 UInt8(u8),
111 UInt16(u16),
112 UInt32(u32),
113 UInt64(u64),
114 ByteArray(&'a [u8]),
115 String(&'a str),
116 Timestamp(u64),
117 Uuid(&'a [u8; 16]), }
119
120impl<'a> EventStreamHeaderValue<'a> {
121 pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
122 let value_type = read_u8(reader)?;
123 let value = match value_type {
124 0 => EventStreamHeaderValue::Bool(true),
125 1 => EventStreamHeaderValue::Bool(false),
126 2 => EventStreamHeaderValue::UInt8(read_u8(reader)?),
127 3 => EventStreamHeaderValue::UInt16(read_u16(reader)?),
128 4 => EventStreamHeaderValue::UInt32(read_u32(reader)?),
129 5 => EventStreamHeaderValue::UInt64(read_u64(reader)?),
130 6 => {
131 let size = read_u16(reader)? as usize;
132 let byte_array = read_slice(reader, size)?;
133 EventStreamHeaderValue::ByteArray(byte_array)
134 }
135 7 => {
136 let size = read_u16(reader)? as usize;
137 let string_bytes = read_slice(reader, size)?;
138 let string = std::str::from_utf8(string_bytes).or(Err(
139 EventStreamParseError::InvalidData("Header string data is not valid utf-8"),
140 ))?;
141 EventStreamHeaderValue::String(string)
142 }
143 8 => EventStreamHeaderValue::Timestamp(read_u64(reader)?),
144 9 => EventStreamHeaderValue::Uuid(read_slice(reader, 16)?.try_into().unwrap()),
145 _ => Err(EventStreamParseError::InvalidData(
146 "Invalid header value type",
147 ))?,
148 };
149 Ok(value)
150 }
151}
152
153#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154struct EventStreamHeader<'a> {
155 name: &'a str,
156 value: EventStreamHeaderValue<'a>,
157}
158
159impl<'a> EventStreamHeader<'a> {
160 pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
161 let name_size = read_u8(reader)? as usize;
162 let name_bytes = read_slice(reader, name_size)?;
163 let name = std::str::from_utf8(name_bytes).or(Err(EventStreamParseError::InvalidData(
164 "Header name is not valid utf-8",
165 )))?;
166
167 let value = EventStreamHeaderValue::parse(reader)?;
168
169 Ok(EventStreamHeader { name, value })
170 }
171}
172
173#[derive(Clone, Debug, Eq, PartialEq)]
174struct EventStreamMessage<'a> {
175 headers: Vec<EventStreamHeader<'a>>,
176 payload: &'a [u8],
177}
178
179impl<'a> EventStreamMessage<'a> {
180 const MIN_LENGTH: usize = 16;
181
182 pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
183 let mut event_buf: &[u8] = *reader;
185
186 let total_length = read_u32(reader)? as usize;
187 if total_length < Self::MIN_LENGTH {
189 return Err(EventStreamParseError::InvalidData(
190 "Invalid event total length value",
191 ));
192 }
193 let remaining_length = total_length - 4;
194
195 if event_buf.len() < total_length {
197 return Err(EventStreamParseError::UnexpectedEof);
198 }
199 event_buf = &event_buf[..total_length];
200
201 let mut remainder_reader = read_slice(reader, remaining_length)?;
202 Self::parse_complete_event(event_buf, &mut remainder_reader)
203 .map_err(EventStreamParseError::eof_as_invalid)
205 }
206
207 fn parse_complete_event(
208 event_buf: &'a [u8],
209 remainder_reader: &mut &'a [u8],
210 ) -> Result<Self, EventStreamParseError> {
211 let headers_length = read_u32(remainder_reader)? as usize;
212 let prelude_crc = read_u32(remainder_reader)?;
213 check_crc32(&event_buf[..8], prelude_crc)?;
214
215 let mut headers_reader = read_slice(remainder_reader, headers_length)?;
216 let mut headers = Vec::with_capacity(3);
217 while !headers_reader.is_empty() {
218 let header = EventStreamHeader::parse(&mut headers_reader)?;
219 headers.push(header);
220 }
221
222 if remainder_reader.len() < 4 {
223 return Err(EventStreamParseError::InvalidData(
224 "Malformed event: unexpected EOF",
225 ));
226 }
227 let payload = read_slice(remainder_reader, remainder_reader.len() - 4)?;
228 let payload_crc = read_u32(remainder_reader)?;
229 check_crc32(&event_buf[..(event_buf.len() - 4)], payload_crc)?;
230
231 Ok(EventStreamMessage { headers, payload })
232 }
233
234 pub fn get_header(&self, name: &str) -> Option<&EventStreamHeader<'a>> {
235 self.headers.iter().find(|h| h.name == name)
236 }
237}
238
239#[derive(Debug)]
243pub struct EventStream<T> {
244 response_body: Option<Pin<Box<ByteStream>>>,
245 buf: Vec<u8>,
246 _phantom: std::marker::PhantomData<T>,
247}
248
249impl<T: DeserializeEvent + Unpin> EventStream<T> {
250 #[doc(hidden)]
251 pub fn new(response: HttpResponse) -> EventStream<T> {
252 EventStream {
253 response_body: Some(Box::pin(response.body)),
254 buf: Vec::with_capacity(512),
255 _phantom: PhantomData {},
256 }
257 }
258
259 fn pop_event(buf: &mut Vec<u8>) -> Result<Option<T>, RusotoError<()>> {
260 loop {
261 let mut reader: &[u8] = &buf;
262 let initial_size = reader.len();
263 let event_msg = match EventStreamMessage::parse(&mut reader) {
264 Ok(msg) => msg,
265 Err(EventStreamParseError::UnexpectedEof) => return Ok(None),
266 Err(err) => return Err(err.into()),
267 };
268 log::trace!("Parsed event stream event: {:?}", event_msg);
269
270 let event_type_header = event_msg
271 .get_header(":event-type")
272 .or_else(|| event_msg.get_header(":exception-type"))
273 .ok_or_else(|| {
274 RusotoError::ParseError(
275 "Expected event-type or exception-type header".to_string(),
276 )
277 })?;
278 let event_type: &str = match event_type_header.value {
279 EventStreamHeaderValue::String(s) => s,
280 _ => {
281 return Err(EventStreamParseError::InvalidData(
282 "Invalid event-type header type",
283 )
284 .into())
285 }
286 };
287
288 let event = if event_type == "initial-response" {
289 None
290 } else {
291 Some(T::deserialize_event(event_type, event_msg.payload)?)
292 };
293
294 let bytes_consumed = initial_size - reader.len();
295 buf.drain(..bytes_consumed);
296
297 if event.is_none() {
298 continue;
299 }
300
301 break Ok(event);
302 }
303 }
304
305 fn drop_response_body(&mut self) {
306 self.response_body = None;
307 }
308}
309
310impl<T: DeserializeEvent + Unpin> futures::Stream for EventStream<T> {
311 type Item = Result<T, RusotoError<()>>;
312
313 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314 let projection = self.get_mut();
315
316 match Self::pop_event(&mut projection.buf) {
318 Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
319 Ok(None) => {}
320 Err(err) => {
321 projection.drop_response_body();
322 return Poll::Ready(Some(Err(err)));
323 }
324 };
325
326 let chunk_option = match &mut projection.response_body {
328 Some(body) => futures::ready!(Stream::poll_next(body.as_mut(), cx)),
330
331 None => return Poll::Ready(None),
334 };
335 match chunk_option {
336 Some(Ok(byte_chunk)) => {
338 log::trace!("Got event stream bytes: {:?}", byte_chunk);
339
340 projection.buf.extend(byte_chunk);
341
342 let parsed_event = match Self::pop_event(&mut projection.buf) {
343 Ok(None) => {
344 cx.waker().wake_by_ref();
345 return Poll::Pending;
346 }
347 Ok(Some(item)) => Ok(item),
348 Err(err) => {
349 projection.drop_response_body();
350 Err(err)
351 }
352 };
353 Poll::Ready(Some(parsed_event))
354 }
355
356 Some(Err(e)) => {
358 projection.drop_response_body();
359 Poll::Ready(Some(Err(RusotoError::from(e))))
360 }
361
362 None => {
364 projection.drop_response_body();
365
366 if !projection.buf.is_empty() {
367 Poll::Ready(Some(Err(RusotoError::ParseError(
368 "Event stream closed with incomplete data remaining".to_string(),
369 ))))
370 } else {
371 Poll::Ready(None)
372 }
373 }
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn parse_initial_response() {
384 let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
385 :content-type\x07\0\x1aapplication/x-amz-json-1.1\
386 \r:message-type\x07\0\x05event{}\xac\xaek}";
387
388 let event_msg = EventStreamMessage::parse(&mut &data[..]);
389 assert_eq!(
390 event_msg,
391 Ok(EventStreamMessage {
392 headers: vec![
393 EventStreamHeader {
394 name: ":event-type",
395 value: EventStreamHeaderValue::String("initial-response"),
396 },
397 EventStreamHeader {
398 name: ":content-type",
399 value: EventStreamHeaderValue::String("application/x-amz-json-1.1"),
400 },
401 EventStreamHeader {
402 name: ":message-type",
403 value: EventStreamHeaderValue::String("event"),
404 },
405 ],
406 payload: b"{}",
407 }),
408 );
409 }
410
411 #[test]
412 fn parse_error_event() {
413 let data = b"\0\0\x01\x06\0\0\0pq;\x88P\x0f:exception-type\x07\0\x18\
414 KMSAccessDeniedException\r:content-type\x07\0\x1aapplication/x-amz-json-1.1\r\
415 :message-type\x07\0\texception{\"message\":\"User AIDAAAAAAAAAAAAAAAAAA is not \
416 authorized to decrypt records in stream 666666666666:rusoto-test-tud2Vz6q1V\
417 :1590674508\"}\xfc\xd1\x99T";
418
419 let event_msg = EventStreamMessage::parse(&mut &data[..]);
420 assert_eq!(
421 event_msg,
422 Ok(EventStreamMessage {
423 headers: vec![
424 EventStreamHeader {
425 name: ":exception-type",
426 value: EventStreamHeaderValue::String("KMSAccessDeniedException"),
427 },
428 EventStreamHeader {
429 name: ":content-type",
430 value: EventStreamHeaderValue::String("application/x-amz-json-1.1"),
431 },
432 EventStreamHeader {
433 name: ":message-type",
434 value: EventStreamHeaderValue::String("exception"),
435 },
436 ],
437 payload: b"{\"message\":\"User AIDAAAAAAAAAAAAAAAAAA is not \
438 authorized to decrypt records in stream 666666666666:rusoto-test-tud2Vz6q1V\
439 :1590674508\"}",
440 }),
441 );
442 }
443
444 #[test]
445 fn invalid_prelude_crc() {
446 let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9f\x0b:event-type\x07\0\x10initial-response\r\
447 :content-type\x07\0\x1aapplication/x-amz-json-1.1\
448 \r:message-type\x07\0\x05event{}\xac\xaek}";
449
450 let event_msg = EventStreamMessage::parse(&mut &data[..]);
451 assert_eq!(event_msg, Err(EventStreamParseError::InvalidCrc));
452 }
453
454 #[test]
455 fn invalid_message_crc() {
456 let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
457 :content-type\x07\0\x1aapplication/x-amz-json-1.1\
458 \r:message-type\x07\0\x05event{}\xad\xaek}";
459
460 let event_msg = EventStreamMessage::parse(&mut &data[..]);
461 assert_eq!(event_msg, Err(EventStreamParseError::InvalidCrc));
462 }
463
464 #[test]
465 fn incomplete_event() {
466 let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
467 :content-type\x07\0\x1aapplication/x-amz-json-1.1\
468 \r:message-type\x07\0\x05event{}\xac";
469
470 let event_msg = EventStreamMessage::parse(&mut &data[..]);
471 assert_eq!(event_msg, Err(EventStreamParseError::UnexpectedEof));
472 }
473
474 #[test]
475 fn empty_reader() {
476 let data = b"";
477
478 let event_msg = EventStreamMessage::parse(&mut &data[..]);
479 assert_eq!(event_msg, Err(EventStreamParseError::UnexpectedEof));
480 }
481
482 #[test]
483 fn invalid_header_size() {
484 let data = b"\0\0\0r\0\0\0c2\x8b\\$\x0b:event-type\x07\0\x10initial-response\r\
485 :content-type\x07\0\x1aapplication/x-amz-json-1.1\
486 \r:message-type\x07\0\x05event{}m\x858\x01";
487
488 let event_msg = EventStreamMessage::parse(&mut &data[..]);
489 assert!(matches!(
490 event_msg,
491 Err(EventStreamParseError::InvalidData(_))
492 ));
493 }
494}