Skip to main content

rusoto_core/
event_stream.rs

1//! Event Stream protocol support utilities
2
3use std::convert::TryInto;
4use std::fmt::{Display, Formatter};
5use std::marker::PhantomData;
6use std::pin::Pin;
7
8use crc32fast::Hasher;
9use futures::task::{Context, Poll};
10use futures::Stream;
11
12use crate::error::RusotoError;
13use crate::request::HttpResponse;
14use crate::stream::ByteStream;
15
16#[doc(hidden)]
17pub trait DeserializeEvent: Sized {
18    fn deserialize_event(event_type: &str, data: &[u8]) -> Result<Self, RusotoError<()>>;
19}
20
21#[derive(Debug, Eq, PartialEq)]
22enum EventStreamParseError {
23    UnexpectedEof,
24    InvalidCrc,
25    InvalidData(&'static str),
26}
27
28fn check_crc32(data: &[u8], ref_value: u32) -> Result<(), EventStreamParseError> {
29    let mut hasher = Hasher::new();
30    hasher.update(data);
31
32    if hasher.finalize() != ref_value {
33        Err(EventStreamParseError::InvalidCrc)
34    } else {
35        Ok(())
36    }
37}
38
39fn read_slice<'a>(reader: &mut &'a [u8], size: usize) -> Result<&'a [u8], EventStreamParseError> {
40    if reader.len() < size {
41        return Err(EventStreamParseError::UnexpectedEof);
42    }
43
44    let slice = &reader[..size];
45    *reader = &reader[size..];
46    Ok(slice)
47}
48
49fn read_u8(reader: &mut &[u8]) -> Result<u8, EventStreamParseError> {
50    let buf = read_slice(reader, std::mem::size_of::<u8>())?
51        .try_into()
52        .unwrap();
53    Ok(u8::from_be_bytes(buf))
54}
55
56fn read_u16(reader: &mut &[u8]) -> Result<u16, EventStreamParseError> {
57    let buf = read_slice(reader, std::mem::size_of::<u16>())?
58        .try_into()
59        .unwrap();
60    Ok(u16::from_be_bytes(buf))
61}
62
63fn read_u32(reader: &mut &[u8]) -> Result<u32, EventStreamParseError> {
64    let buf = read_slice(reader, std::mem::size_of::<u32>())?
65        .try_into()
66        .unwrap();
67    Ok(u32::from_be_bytes(buf))
68}
69
70fn read_u64(reader: &mut &[u8]) -> Result<u64, EventStreamParseError> {
71    let buf = read_slice(reader, std::mem::size_of::<u64>())?
72        .try_into()
73        .unwrap();
74    Ok(u64::from_be_bytes(buf))
75}
76
77impl EventStreamParseError {
78    fn eof_as_invalid(self) -> Self {
79        match self {
80            EventStreamParseError::UnexpectedEof => {
81                EventStreamParseError::InvalidData("Malformed event: ended unexpectedly")
82            }
83            other => other,
84        }
85    }
86}
87
88impl std::error::Error for EventStreamParseError {}
89
90impl Display for EventStreamParseError {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        match self {
93            EventStreamParseError::UnexpectedEof => write!(f, "Expected additional data"),
94            EventStreamParseError::InvalidCrc => write!(f, "CRC check failed"),
95            EventStreamParseError::InvalidData(msg) => write!(f, "{}", msg),
96        }
97    }
98}
99
100impl<T> Into<RusotoError<T>> for EventStreamParseError {
101    fn into(self) -> RusotoError<T> {
102        RusotoError::ParseError(self.to_string())
103    }
104}
105
106#[allow(missing_docs)]
107#[derive(Clone, Copy, Debug, Eq, PartialEq)]
108enum EventStreamHeaderValue<'a> {
109    Bool(bool),
110    UInt8(u8),
111    UInt16(u16),
112    UInt32(u32),
113    UInt64(u64),
114    ByteArray(&'a [u8]),
115    String(&'a str),
116    Timestamp(u64),
117    Uuid(&'a [u8; 16]), // don't want to pull the uuid dependency for this, so just u8
118}
119
120impl<'a> EventStreamHeaderValue<'a> {
121    pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
122        let value_type = read_u8(reader)?;
123        let value = match value_type {
124            0 => EventStreamHeaderValue::Bool(true),
125            1 => EventStreamHeaderValue::Bool(false),
126            2 => EventStreamHeaderValue::UInt8(read_u8(reader)?),
127            3 => EventStreamHeaderValue::UInt16(read_u16(reader)?),
128            4 => EventStreamHeaderValue::UInt32(read_u32(reader)?),
129            5 => EventStreamHeaderValue::UInt64(read_u64(reader)?),
130            6 => {
131                let size = read_u16(reader)? as usize;
132                let byte_array = read_slice(reader, size)?;
133                EventStreamHeaderValue::ByteArray(byte_array)
134            }
135            7 => {
136                let size = read_u16(reader)? as usize;
137                let string_bytes = read_slice(reader, size)?;
138                let string = std::str::from_utf8(string_bytes).or(Err(
139                    EventStreamParseError::InvalidData("Header string data is not valid utf-8"),
140                ))?;
141                EventStreamHeaderValue::String(string)
142            }
143            8 => EventStreamHeaderValue::Timestamp(read_u64(reader)?),
144            9 => EventStreamHeaderValue::Uuid(read_slice(reader, 16)?.try_into().unwrap()),
145            _ => Err(EventStreamParseError::InvalidData(
146                "Invalid header value type",
147            ))?,
148        };
149        Ok(value)
150    }
151}
152
153#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154struct EventStreamHeader<'a> {
155    name: &'a str,
156    value: EventStreamHeaderValue<'a>,
157}
158
159impl<'a> EventStreamHeader<'a> {
160    pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
161        let name_size = read_u8(reader)? as usize;
162        let name_bytes = read_slice(reader, name_size)?;
163        let name = std::str::from_utf8(name_bytes).or(Err(EventStreamParseError::InvalidData(
164            "Header name is not valid utf-8",
165        )))?;
166
167        let value = EventStreamHeaderValue::parse(reader)?;
168
169        Ok(EventStreamHeader { name, value })
170    }
171}
172
173#[derive(Clone, Debug, Eq, PartialEq)]
174struct EventStreamMessage<'a> {
175    headers: Vec<EventStreamHeader<'a>>,
176    payload: &'a [u8],
177}
178
179impl<'a> EventStreamMessage<'a> {
180    const MIN_LENGTH: usize = 16;
181
182    pub fn parse(reader: &mut &'a [u8]) -> Result<Self, EventStreamParseError> {
183        // Get a copy of the entire slice before it gets advanced
184        let mut event_buf: &[u8] = *reader;
185
186        let total_length = read_u32(reader)? as usize;
187        // Ensure later subtractions don't underflow
188        if total_length < Self::MIN_LENGTH {
189            return Err(EventStreamParseError::InvalidData(
190                "Invalid event total length value",
191            ));
192        }
193        let remaining_length = total_length - 4;
194
195        // Ensure we have the entire event data for event_buf
196        if event_buf.len() < total_length {
197            return Err(EventStreamParseError::UnexpectedEof);
198        }
199        event_buf = &event_buf[..total_length];
200
201        let mut remainder_reader = read_slice(reader, remaining_length)?;
202        Self::parse_complete_event(event_buf, &mut remainder_reader)
203            // The entire event is available, EOF is no longer possible with well-formed packets
204            .map_err(EventStreamParseError::eof_as_invalid)
205    }
206
207    fn parse_complete_event(
208        event_buf: &'a [u8],
209        remainder_reader: &mut &'a [u8],
210    ) -> Result<Self, EventStreamParseError> {
211        let headers_length = read_u32(remainder_reader)? as usize;
212        let prelude_crc = read_u32(remainder_reader)?;
213        check_crc32(&event_buf[..8], prelude_crc)?;
214
215        let mut headers_reader = read_slice(remainder_reader, headers_length)?;
216        let mut headers = Vec::with_capacity(3);
217        while !headers_reader.is_empty() {
218            let header = EventStreamHeader::parse(&mut headers_reader)?;
219            headers.push(header);
220        }
221
222        if remainder_reader.len() < 4 {
223            return Err(EventStreamParseError::InvalidData(
224                "Malformed event: unexpected EOF",
225            ));
226        }
227        let payload = read_slice(remainder_reader, remainder_reader.len() - 4)?;
228        let payload_crc = read_u32(remainder_reader)?;
229        check_crc32(&event_buf[..(event_buf.len() - 4)], payload_crc)?;
230
231        Ok(EventStreamMessage { headers, payload })
232    }
233
234    pub fn get_header(&self, name: &str) -> Option<&EventStreamHeader<'a>> {
235        self.headers.iter().find(|h| h.name == name)
236    }
237}
238
239/// Event Stream decoder
240///
241/// This struct implements `futures::Stream` and decodes events of type `T` from a streaming HTTP body.
242#[derive(Debug)]
243pub struct EventStream<T> {
244    response_body: Option<Pin<Box<ByteStream>>>,
245    buf: Vec<u8>,
246    _phantom: std::marker::PhantomData<T>,
247}
248
249impl<T: DeserializeEvent + Unpin> EventStream<T> {
250    #[doc(hidden)]
251    pub fn new(response: HttpResponse) -> EventStream<T> {
252        EventStream {
253            response_body: Some(Box::pin(response.body)),
254            buf: Vec::with_capacity(512),
255            _phantom: PhantomData {},
256        }
257    }
258
259    fn pop_event(buf: &mut Vec<u8>) -> Result<Option<T>, RusotoError<()>> {
260        loop {
261            let mut reader: &[u8] = &buf;
262            let initial_size = reader.len();
263            let event_msg = match EventStreamMessage::parse(&mut reader) {
264                Ok(msg) => msg,
265                Err(EventStreamParseError::UnexpectedEof) => return Ok(None),
266                Err(err) => return Err(err.into()),
267            };
268            log::trace!("Parsed event stream event: {:?}", event_msg);
269
270            let event_type_header = event_msg
271                .get_header(":event-type")
272                .or_else(|| event_msg.get_header(":exception-type"))
273                .ok_or_else(|| {
274                    RusotoError::ParseError(
275                        "Expected event-type or exception-type header".to_string(),
276                    )
277                })?;
278            let event_type: &str = match event_type_header.value {
279                EventStreamHeaderValue::String(s) => s,
280                _ => {
281                    return Err(EventStreamParseError::InvalidData(
282                        "Invalid event-type header type",
283                    )
284                    .into())
285                }
286            };
287
288            let event = if event_type == "initial-response" {
289                None
290            } else {
291                Some(T::deserialize_event(event_type, event_msg.payload)?)
292            };
293
294            let bytes_consumed = initial_size - reader.len();
295            buf.drain(..bytes_consumed);
296
297            if event.is_none() {
298                continue;
299            }
300
301            break Ok(event);
302        }
303    }
304
305    fn drop_response_body(&mut self) {
306        self.response_body = None;
307    }
308}
309
310impl<T: DeserializeEvent + Unpin> futures::Stream for EventStream<T> {
311    type Item = Result<T, RusotoError<()>>;
312
313    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314        let projection = self.get_mut();
315
316        // First try to use the buffer
317        match Self::pop_event(&mut projection.buf) {
318            Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
319            Ok(None) => {}
320            Err(err) => {
321                projection.drop_response_body();
322                return Poll::Ready(Some(Err(err)));
323            }
324        };
325
326        // Otherwise, see if new data has arrived from the network
327        let chunk_option = match &mut projection.response_body {
328            // We still maintain the stream, poll it and return if nothing is available
329            Some(body) => futures::ready!(Stream::poll_next(body.as_mut(), cx)),
330
331            // We dropped the stream because we encountered an error.
332            // This means that the stream is potentially broken and so we end it.
333            None => return Poll::Ready(None),
334        };
335        match chunk_option {
336            // We received an http body chunk
337            Some(Ok(byte_chunk)) => {
338                log::trace!("Got event stream bytes: {:?}", byte_chunk);
339
340                projection.buf.extend(byte_chunk);
341
342                let parsed_event = match Self::pop_event(&mut projection.buf) {
343                    Ok(None) => {
344                        cx.waker().wake_by_ref();
345                        return Poll::Pending;
346                    }
347                    Ok(Some(item)) => Ok(item),
348                    Err(err) => {
349                        projection.drop_response_body();
350                        Err(err)
351                    }
352                };
353                Poll::Ready(Some(parsed_event))
354            }
355
356            // Something went wrong with the network connection
357            Some(Err(e)) => {
358                projection.drop_response_body();
359                Poll::Ready(Some(Err(RusotoError::from(e))))
360            }
361
362            // The underlying stream is closed
363            None => {
364                projection.drop_response_body();
365
366                if !projection.buf.is_empty() {
367                    Poll::Ready(Some(Err(RusotoError::ParseError(
368                        "Event stream closed with incomplete data remaining".to_string(),
369                    ))))
370                } else {
371                    Poll::Ready(None)
372                }
373            }
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn parse_initial_response() {
384        let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
385            :content-type\x07\0\x1aapplication/x-amz-json-1.1\
386            \r:message-type\x07\0\x05event{}\xac\xaek}";
387
388        let event_msg = EventStreamMessage::parse(&mut &data[..]);
389        assert_eq!(
390            event_msg,
391            Ok(EventStreamMessage {
392                headers: vec![
393                    EventStreamHeader {
394                        name: ":event-type",
395                        value: EventStreamHeaderValue::String("initial-response"),
396                    },
397                    EventStreamHeader {
398                        name: ":content-type",
399                        value: EventStreamHeaderValue::String("application/x-amz-json-1.1"),
400                    },
401                    EventStreamHeader {
402                        name: ":message-type",
403                        value: EventStreamHeaderValue::String("event"),
404                    },
405                ],
406                payload: b"{}",
407            }),
408        );
409    }
410
411    #[test]
412    fn parse_error_event() {
413        let data = b"\0\0\x01\x06\0\0\0pq;\x88P\x0f:exception-type\x07\0\x18\
414            KMSAccessDeniedException\r:content-type\x07\0\x1aapplication/x-amz-json-1.1\r\
415            :message-type\x07\0\texception{\"message\":\"User AIDAAAAAAAAAAAAAAAAAA is not \
416            authorized to decrypt records in stream 666666666666:rusoto-test-tud2Vz6q1V\
417            :1590674508\"}\xfc\xd1\x99T";
418
419        let event_msg = EventStreamMessage::parse(&mut &data[..]);
420        assert_eq!(
421            event_msg,
422            Ok(EventStreamMessage {
423                headers: vec![
424                    EventStreamHeader {
425                        name: ":exception-type",
426                        value: EventStreamHeaderValue::String("KMSAccessDeniedException"),
427                    },
428                    EventStreamHeader {
429                        name: ":content-type",
430                        value: EventStreamHeaderValue::String("application/x-amz-json-1.1"),
431                    },
432                    EventStreamHeader {
433                        name: ":message-type",
434                        value: EventStreamHeaderValue::String("exception"),
435                    },
436                ],
437                payload: b"{\"message\":\"User AIDAAAAAAAAAAAAAAAAAA is not \
438                    authorized to decrypt records in stream 666666666666:rusoto-test-tud2Vz6q1V\
439                    :1590674508\"}",
440            }),
441        );
442    }
443
444    #[test]
445    fn invalid_prelude_crc() {
446        let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9f\x0b:event-type\x07\0\x10initial-response\r\
447            :content-type\x07\0\x1aapplication/x-amz-json-1.1\
448            \r:message-type\x07\0\x05event{}\xac\xaek}";
449
450        let event_msg = EventStreamMessage::parse(&mut &data[..]);
451        assert_eq!(event_msg, Err(EventStreamParseError::InvalidCrc));
452    }
453
454    #[test]
455    fn invalid_message_crc() {
456        let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
457            :content-type\x07\0\x1aapplication/x-amz-json-1.1\
458            \r:message-type\x07\0\x05event{}\xad\xaek}";
459
460        let event_msg = EventStreamMessage::parse(&mut &data[..]);
461        assert_eq!(event_msg, Err(EventStreamParseError::InvalidCrc));
462    }
463
464    #[test]
465    fn incomplete_event() {
466        let data = b"\0\0\0r\0\0\0`\xab\x82\r\x9e\x0b:event-type\x07\0\x10initial-response\r\
467            :content-type\x07\0\x1aapplication/x-amz-json-1.1\
468            \r:message-type\x07\0\x05event{}\xac";
469
470        let event_msg = EventStreamMessage::parse(&mut &data[..]);
471        assert_eq!(event_msg, Err(EventStreamParseError::UnexpectedEof));
472    }
473
474    #[test]
475    fn empty_reader() {
476        let data = b"";
477
478        let event_msg = EventStreamMessage::parse(&mut &data[..]);
479        assert_eq!(event_msg, Err(EventStreamParseError::UnexpectedEof));
480    }
481
482    #[test]
483    fn invalid_header_size() {
484        let data = b"\0\0\0r\0\0\0c2\x8b\\$\x0b:event-type\x07\0\x10initial-response\r\
485            :content-type\x07\0\x1aapplication/x-amz-json-1.1\
486            \r:message-type\x07\0\x05event{}m\x858\x01";
487
488        let event_msg = EventStreamMessage::parse(&mut &data[..]);
489        assert!(matches!(
490            event_msg,
491            Err(EventStreamParseError::InvalidData(_))
492        ));
493    }
494}