Skip to main content

sse_core/
decode.rs

1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{fmt, num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8/// Represents a single Server-Sent Event message.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12    /// The event name (defaults to `"message"`).
13    pub event: Cow<'static, str>,
14    /// The payload data.
15    pub data: String,
16    /// The `Last-Event-ID` sent by the server, if any.
17    pub last_event_id: Option<Arc<str>>,
18}
19
20/// Commands and payloads yielded by the SSE stream.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24    /// A standard data message.
25    Message(MessageEvent),
26    /// A server request to change the client's reconnect time (in milliseconds).
27    Retry(u32),
28}
29
30/// Error indicating that a parsed field exceeded the maximum allowed buffer size.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36const MAX_DEBUG_SIZE: usize = 200;
37
38struct ShowBigStr<'a>(&'a str);
39
40impl fmt::Debug for ShowBigStr<'_> {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        let mut end = self.0.len().min(MAX_DEBUG_SIZE);
43        while !self.0.is_char_boundary(end) {
44            end -= 1;
45        }
46        let s = &self.0[..end];
47
48        fmt::Debug::fmt(s, f)?;
49        if end < self.0.len() {
50            write!(f, "... ({} bytes total)", self.0.len())?;
51        }
52
53        Ok(())
54    }
55}
56
57struct ShowBigBuf<'a>(&'a [u8]);
58
59impl fmt::Debug for ShowBigBuf<'_> {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        let (buf, truncated) = match self.0.len() {
62            ..=MAX_DEBUG_SIZE => (self.0, false),
63            _ => (&self.0[..MAX_DEBUG_SIZE], true),
64        };
65
66        let mut chunks = buf.utf8_chunks().peekable();
67
68        f.write_str("\"")?;
69        while let Some(chunk) = chunks.next() {
70            fmt::Display::fmt(&chunk.valid().escape_debug(), f)?;
71
72            let invalid = chunk.invalid();
73            if invalid.is_empty() {
74                continue;
75            }
76
77            // If we truncated and this is the very last chunk, the invalid bytes
78            // are almost certainly just a sliced multi-byte UTF-8 character.
79            if truncated && chunks.peek().is_none() {
80                break;
81            }
82
83            for &byte in invalid {
84                write!(f, "\\x{byte:02X}")?;
85            }
86        }
87
88        if truncated {
89            write!(f, "\"... ({} bytes total)", self.0.len())?;
90        } else {
91            f.write_str("\"")?;
92        }
93
94        Ok(())
95    }
96}
97
98#[derive(Clone, Copy, Default)]
99struct FieldMode {
100    len: u8,
101    buf: [u8; 5],
102}
103
104impl FieldMode {
105    #[inline]
106    const fn new() -> Self {
107        Self {
108            len: 0,
109            buf: [0; 5],
110        }
111    }
112
113    #[inline]
114    fn try_extend(&mut self, src: &[u8]) -> bool {
115        let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
116            return false;
117        };
118        dst.copy_from_slice(src);
119        self.len += src.len() as u8;
120        true
121    }
122
123    #[inline]
124    fn as_slice(&self) -> &[u8] {
125        &self.buf[..self.len as usize]
126    }
127}
128
129impl fmt::Debug for FieldMode {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_tuple("FieldMode")
132            .field(&ShowBigBuf(self.as_slice()))
133            .finish()
134    }
135}
136
137#[derive(Debug, Clone, Copy)]
138enum ValueMode {
139    Data,
140    Event,
141    Retry,
142    Id,
143}
144
145#[derive(Debug, Clone, Copy)]
146enum Mode {
147    Bom { bytes_read: u8 },
148    Field(FieldMode),
149    Value(ValueMode),
150    Ignore,
151    PostCr,
152    PostColon(ValueMode),
153}
154
155/// The core state-machine parser for SSE.
156///
157/// This decoder does not perform any I/O. It consumes bytes from a given buffer
158/// and yields parsed [`SseEvent`]s. It is suitable for `no_std` environments.
159#[derive(Clone)]
160pub struct SseDecoder {
161    mode: Mode,
162    last_event_id: Option<Arc<str>>,
163    staged_last_event_id: Option<Arc<str>>,
164    last_event_id_buf: Vec<u8>,
165    event_buf: Vec<u8>,
166    data_buf: Vec<u8>,
167    retry_buf: Option<u32>,
168    max_payload_size: NonZeroUsize,
169    corrupted: bool,
170}
171
172impl SseDecoder {
173    /// Creates a new decoder with the default payload size limit of 512KiB.
174    ///
175    /// # Example
176    /// ```rust
177    /// # use bytes::{Buf, Bytes};
178    /// # use sse_core::{SseDecoder, SseEvent};
179    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
180    /// let mut decoder = SseDecoder::new();
181    /// let mut buf = Bytes::from("data: standard stream\n\n");
182    ///
183    /// let event = decoder.next(&mut buf).transpose()?;
184    /// assert!(event.is_some());
185    /// # Ok(())
186    /// # }
187    /// ```
188    #[inline]
189    #[must_use]
190    pub fn new() -> Self {
191        Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
192    }
193
194    /// Creates a new decoder with a custom maximum payload size limit.
195    ///
196    /// This is useful in memory-constrained environments or when connecting to
197    /// untrusted servers to prevent memory exhaustion from infinitely long lines.
198    ///
199    /// # Example
200    /// ```rust
201    /// # use core::num::NonZeroUsize;
202    /// # use bytes::Bytes;
203    /// # use sse_core::{SseDecoder, SseEvent};
204    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
205    /// // Create a strict decoder that rejects payloads over 1024 bytes
206    /// let limit = NonZeroUsize::new(1024).unwrap();
207    /// let mut decoder = SseDecoder::with_limit(limit);
208    ///
209    /// let mut buf = Bytes::from("data: small payload\n\n");
210    /// let Some(event) = decoder.next(&mut buf) else {
211    ///     panic!();
212    /// };
213    /// assert!(event.is_ok());
214    /// # Ok(())
215    /// # }
216    /// ```
217    #[inline]
218    #[must_use]
219    pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
220        Self {
221            mode: Mode::Bom { bytes_read: 0 },
222            last_event_id: None,
223            staged_last_event_id: None,
224            last_event_id_buf: vec![],
225            event_buf: vec![],
226            data_buf: vec![],
227            retry_buf: None,
228            max_payload_size,
229            corrupted: false,
230        }
231    }
232
233    /// Returns the current `Last-Event-ID` known to the decoder, if any.
234    #[inline]
235    #[must_use]
236    pub fn last_event_id(&self) -> Option<&Arc<str>> {
237        self.last_event_id.as_ref()
238    }
239
240    /// Resets the decoder state for a new connection, explicitly overriding
241    /// the currently tracked `Last-Event-ID`.
242    ///
243    /// This method clears all internal byte buffers and resets the parser, but
244    /// instead of keeping the previous ID (like [`reconnect()`](Self::reconnect))
245    /// or dropping it (like [`clear()`](Self::clear)), it injects the provided ID.
246    ///
247    /// It is typically used to prime the state machine with a known ID
248    /// (e.g., from a local database) right before feeding the decoder bytes
249    /// from a newly established connection.
250    #[inline]
251    pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
252        self.last_event_id = id;
253        self.reconnect();
254    }
255
256    /// Resets the decoder state completely, dropping the current `Last-Event-ID`.
257    ///
258    /// This clears all internal byte buffers and purges the parser's state,
259    /// effectively starting fresh. Because it drops the `Last-Event-ID`, the
260    /// next connection will start from the present moment rather than resuming.
261    ///
262    /// * To reset the state but **keep** the current ID, use [`reconnect()`](Self::reconnect).
263    /// * To reset the state and **inject** a specific ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
264    #[inline]
265    pub fn clear(&mut self) {
266        self.reconnect_with_id(None);
267    }
268
269    /// Resets the buffer state for a new connection while retaining the `Last-Event-ID`.
270    ///
271    /// This clears the internal byte buffers to prepare for a fresh stream of data,
272    /// but safely preserves the most recently parsed `Last-Event-ID`. This ensures
273    /// that when you reconnect to the server, you can resume exactly where you left off.
274    ///
275    /// * To reset the state and **drop** the ID, use [`clear()`](Self::clear).
276    /// * To reset the state and **override** the ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
277    #[inline]
278    pub fn reconnect(&mut self) {
279        self.mode = Mode::Bom { bytes_read: 0 };
280        self.clear_bufs();
281        self.corrupted = false;
282    }
283
284    fn mark_corrupted(&mut self) {
285        self.clear_bufs();
286        self.corrupted = true
287    }
288
289    #[inline]
290    fn clear_bufs(&mut self) {
291        self.data_buf.clear();
292        self.event_buf.clear();
293        self.last_event_id_buf.clear();
294        self.staged_last_event_id = self.last_event_id.clone();
295    }
296
297    fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
298        self.mode = match cr {
299            true => Mode::PostCr,
300            false => Mode::Field(FieldMode::new()),
301        };
302
303        if self.corrupted {
304            self.corrupted = false;
305            // bufs should be clear already
306            return None;
307        }
308
309        self.last_event_id = self.staged_last_event_id.clone();
310
311        match self.data_buf.last() {
312            Some(b'\n') => {
313                self.data_buf.pop();
314            }
315            Some(_) => {}
316            None => {
317                self.event_buf.clear();
318                return None;
319            }
320        }
321
322        let data = String::from_utf8_lossy(&self.data_buf).into_owned();
323        self.data_buf.clear();
324
325        let event = match &*self.event_buf {
326            b"" => Cow::Borrowed("message"),
327            event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
328        };
329        self.event_buf.clear();
330
331        Some(SseEvent::Message(MessageEvent {
332            data,
333            event,
334            last_event_id: self.last_event_id.clone(),
335        }))
336    }
337
338    /// Consumes bytes from the provided buffer and attempts to yield an event.
339    ///
340    /// The decoder does not store unparsed bytes internally. It reads directly
341    /// from the provided buffer, advancing the buffer's cursor only for the bytes
342    /// it successfully parses.
343    ///
344    /// If `Ok(None)` is returned, the provided buffer has been exhausted and
345    /// more bytes are needed to complete the current event. You should fetch more
346    /// data, append it to your buffer, and call `next()` again.
347    ///
348    /// # Example
349    /// ```
350    /// use bytes::{Buf, Bytes};
351    /// # use sse_core::{SseDecoder, SseEvent};
352    ///
353    /// let mut decoder = SseDecoder::new();
354    /// let mut buffer = Bytes::from("data: hello\n\n");
355    ///
356    /// // Call next() in a loop to drain all available events
357    /// while let Some(event) = decoder.next(&mut buffer) {
358    ///     println!("Received: {event:?}");
359    /// }
360    ///
361    /// // When next() returns None, the decoder is waiting for more data.
362    /// assert!(buffer.is_empty());
363    /// ```
364    ///
365    /// # Errors
366    ///
367    /// Returns a [`PayloadTooLargeError`] if a single field (like data or event name)
368    /// exceeds the maximum payload size limit configured for this decoder.
369    pub fn next(&mut self, buf: &mut impl Buf) -> Option<Result<SseEvent, PayloadTooLargeError>> {
370        // # 9.2.5 Parsing an event stream
371        //
372        // stream        = [ bom ] *event
373        // event         = *( comment / field ) end-of-line
374        // comment       = colon *any-char end-of-line
375        // field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
376        // end-of-line   = ( cr lf / cr / lf )
377        //
378        // ; characters
379        // lf            = %x000A ; U+000A LINE FEED (LF)
380        // cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
381        // space         = %x0020 ; U+0020 SPACE
382        // colon         = %x003A ; U+003A COLON (:)
383        // bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
384        // name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
385        //                 ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
386        // any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
387        //                 ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
388
389        loop {
390            let chunk = buf.chunk();
391            if chunk.is_empty() {
392                return None;
393            }
394
395            match &mut self.mode {
396                Mode::Bom { bytes_read } => {
397                    let b0 = chunk[0];
398
399                    const BOM: &[u8; 3] = b"\xef\xbb\xbf";
400
401                    if b0 != BOM[*bytes_read as usize] {
402                        self.mode = match *bytes_read {
403                            0 => Mode::Field(FieldMode::new()),
404                            _ => Mode::Ignore,
405                        };
406                        continue;
407                    }
408
409                    buf.advance(1);
410                    *bytes_read += 1;
411
412                    if BOM.len() <= *bytes_read as usize {
413                        self.mode = Mode::Field(FieldMode::new());
414                    }
415                }
416                Mode::Field(field) => {
417                    let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
418                        if !field.try_extend(chunk) {
419                            self.mode = Mode::Ignore;
420                        }
421                        buf.advance(chunk.len());
422                        continue;
423                    };
424
425                    let subchunk = &chunk[..field_end];
426                    let b0 = chunk[field_end];
427
428                    if !field.try_extend(subchunk) {
429                        self.mode = Mode::Ignore;
430                        buf.advance(subchunk.len());
431                        continue;
432                    }
433
434                    buf.advance(subchunk.len() + 1);
435
436                    let value = match field.as_slice() {
437                        b"data" if !self.corrupted => ValueMode::Data,
438                        b"event" if !self.corrupted => {
439                            self.event_buf.clear();
440                            ValueMode::Event
441                        }
442                        b"id" if !self.corrupted => {
443                            self.last_event_id_buf.clear();
444                            ValueMode::Id
445                        }
446                        b"retry" => {
447                            self.retry_buf = None;
448                            ValueMode::Retry
449                        }
450                        b"" => match b0 {
451                            b':' => {
452                                self.mode = Mode::Ignore;
453                                continue;
454                            }
455                            b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
456                                Some(ev) => return Some(Ok(ev)),
457                                None => continue,
458                            },
459                            _ => unreachable!(),
460                        },
461                        _ => {
462                            self.mode = Mode::Ignore;
463                            continue;
464                        }
465                    };
466
467                    match b0 {
468                        b'\n' => self.mode = Mode::Field(FieldMode::new()),
469                        b'\r' => self.mode = Mode::PostCr,
470                        b':' => {
471                            self.mode = Mode::PostColon(value);
472                            continue;
473                        }
474                        _ => unreachable!(),
475                    }
476
477                    match value {
478                        ValueMode::Data => self.data_buf.push(b'\n'),
479                        ValueMode::Id => self.last_event_id_buf.clear(),
480                        ValueMode::Event | ValueMode::Retry => {}
481                    }
482                }
483                Mode::Value(ValueMode::Retry) => {
484                    let mut advanced = 0;
485                    let mut return_event = false;
486
487                    for &b in chunk {
488                        advanced += 1;
489                        match b {
490                            b'0'..=b'9' => {
491                                let digit = (b & 0xf) as _;
492
493                                let retry_buf = self.retry_buf.unwrap_or(0);
494                                let Some(retry_buf) = retry_buf.checked_mul(10) else {
495                                    self.mode = Mode::Ignore;
496                                    break;
497                                };
498                                let Some(retry_buf) = retry_buf.checked_add(digit) else {
499                                    self.mode = Mode::Ignore;
500                                    break;
501                                };
502                                self.retry_buf = Some(retry_buf);
503                            }
504                            b'\r' => {
505                                self.mode = Mode::PostCr;
506                                return_event = true;
507                                break;
508                            }
509                            b'\n' => {
510                                self.mode = Mode::Field(FieldMode::new());
511                                return_event = true;
512                                break;
513                            }
514                            _ => {
515                                self.mode = Mode::Ignore;
516                                break;
517                            }
518                        }
519                    }
520
521                    buf.advance(advanced);
522
523                    if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
524                        return Some(Ok(SseEvent::Retry(retry_buf)));
525                    }
526                }
527                Mode::Value(ValueMode::Data) => {
528                    match consume_until_newline(
529                        &mut self.mode,
530                        Some(&mut self.data_buf),
531                        self.max_payload_size,
532                        buf,
533                    ) {
534                        Ok(true) => self.data_buf.push(b'\n'),
535                        Ok(false) => {}
536                        Err(err) => {
537                            self.mark_corrupted();
538                            return Some(Err(err));
539                        }
540                    }
541                }
542                Mode::Value(ValueMode::Event) => {
543                    if let Err(err) = consume_until_newline(
544                        &mut self.mode,
545                        Some(&mut self.event_buf),
546                        self.max_payload_size,
547                        buf,
548                    ) {
549                        self.mark_corrupted();
550                        return Some(Err(err));
551                    }
552                }
553                Mode::Value(ValueMode::Id) => {
554                    match consume_until_newline(
555                        &mut self.mode,
556                        Some(&mut self.last_event_id_buf),
557                        self.max_payload_size,
558                        buf,
559                    ) {
560                        Ok(true) => {
561                            if memchr(0, &self.last_event_id_buf).is_none() {
562                                self.staged_last_event_id = match &*self.last_event_id_buf {
563                                    [] => None,
564                                    buf => Some(String::from_utf8_lossy(buf).into()),
565                                };
566                            }
567                            self.last_event_id_buf.clear();
568                        }
569                        Ok(false) => {}
570                        Err(err) => {
571                            self.mark_corrupted();
572                            return Some(Err(err));
573                        }
574                    }
575                }
576                Mode::Ignore => {
577                    consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
578                        .expect("there should be no payload to grow too large");
579                }
580                Mode::PostCr => {
581                    if chunk[0] == b'\n' {
582                        buf.advance(1);
583                    }
584                    self.mode = Mode::Field(FieldMode::new());
585                }
586                Mode::PostColon(value) => {
587                    if chunk[0] == b' ' {
588                        buf.advance(1);
589                    }
590                    self.mode = Mode::Value(*value);
591                }
592            }
593        }
594    }
595}
596
597impl Default for SseDecoder {
598    fn default() -> Self {
599        Self::new()
600    }
601}
602
603impl fmt::Debug for SseDecoder {
604    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
605        f.debug_struct("SseDecoder")
606            .field("mode", &self.mode)
607            .field(
608                "last_event_id",
609                &self.last_event_id.as_deref().map(ShowBigStr),
610            )
611            .field(
612                "staged_last_event_id",
613                &self.staged_last_event_id.as_deref().map(ShowBigStr),
614            )
615            .field("last_event_id_buf", &ShowBigBuf(&self.last_event_id_buf))
616            .field("event_buf", &ShowBigBuf(&self.event_buf))
617            .field("data_buf", &ShowBigBuf(&self.data_buf))
618            .field("retry_buf", &self.retry_buf)
619            .field("max_payload_size", &self.max_payload_size)
620            .finish()
621    }
622}
623
624fn consume_until_newline(
625    mode: &mut Mode,
626    mut out: Option<&mut Vec<u8>>,
627    max_size: NonZeroUsize,
628    buf: &mut impl Buf,
629) -> Result<bool, PayloadTooLargeError> {
630    loop {
631        let chunk = buf.chunk();
632        if chunk.is_empty() {
633            return Ok(false);
634        };
635
636        let Some(i) = memchr2(b'\r', b'\n', chunk) else {
637            if let Some(out) = out.as_deref_mut() {
638                if max_size.get() < out.len() + chunk.len() {
639                    out.clear();
640                    *mode = Mode::Ignore;
641                    return Err(PayloadTooLargeError);
642                }
643                out.extend_from_slice(chunk);
644            }
645            buf.advance(chunk.len());
646            continue;
647        };
648
649        if let Some(out) = out {
650            if max_size.get() < out.len() + i {
651                out.clear();
652                *mode = Mode::Ignore;
653                return Err(PayloadTooLargeError);
654            }
655            out.extend_from_slice(&chunk[..i]);
656        }
657
658        *mode = match chunk[i] {
659            b'\r' => Mode::PostCr,
660            b'\n' => Mode::Field(FieldMode::new()),
661            _ => unreachable!(),
662        };
663
664        buf.advance(i + 1);
665
666        return Ok(true);
667    }
668}
669
670#[test]
671fn hard_parse() -> Result<(), PayloadTooLargeError> {
672    use core::slice;
673
674    // Based on https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
675    let bytes = "\u{FEFF}data: x
676
677:
678
679event: my-event\r
680data:line1
681data: line2
682:
683id: my-id
684:should be ignored too\rretry:42
685retry:
686
687data:second
688
689data:ignored
690";
691
692    let mut decoder = SseDecoder::new();
693
694    let events = bytes
695        .bytes()
696        .filter_map(|b| decoder.next(&mut slice::from_ref(&b)))
697        .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
698
699    let id = Some("my-id".into());
700
701    assert_eq!(
702        events,
703        &[
704            SseEvent::Message(MessageEvent {
705                event: "message".into(),
706                data: "x".into(),
707                last_event_id: None
708            }),
709            SseEvent::Retry(42),
710            SseEvent::Message(MessageEvent {
711                event: "my-event".into(),
712                data: "line1\nline2".into(),
713                last_event_id: id.clone()
714            }),
715            SseEvent::Message(MessageEvent {
716                event: "message".into(),
717                data: "second".into(),
718                last_event_id: id.clone()
719            })
720        ]
721    );
722    Ok(())
723}
724
725#[test]
726fn test_reconnect() {
727    let mut stream1: &[u8] = b"
728id: my-id
729
730event: my-event
731data:line1
732:
733data: line2
734id: ignored1
735";
736
737    let mut stream2: &[u8] = b"
738
739data: data
740
741id: final
742
743id: ignored2
744";
745
746    let my_id = Some("my-id".into());
747
748    let mut decoder = SseDecoder::new();
749
750    assert_eq!(decoder.next(&mut stream1), None);
751    assert_eq!(decoder.last_event_id(), my_id.as_ref());
752    assert!(stream1.is_empty());
753
754    decoder.reconnect();
755
756    // Check that the buffer was cleared
757    assert_eq!(
758        decoder.next(&mut stream2),
759        Some(Ok(SseEvent::Message(MessageEvent {
760            event: "message".into(),
761            data: "data".into(),
762            last_event_id: my_id,
763        }))),
764    );
765    assert_eq!(decoder.next(&mut stream2), None);
766    assert_eq!(decoder.last_event_id().map(|id| &**id), Some("final"));
767    assert!(stream2.is_empty());
768}
769
770#[test]
771fn test_limits() {
772    let mut stream: &[u8] = b"
773data: 0123456789
774id: my-id
775
776id: 01234567890
777data: thing
778event: ev
779
780data: mid
781
782event: jojo
783id: ignored
784data: 01234567890
785retry: 10
786
787event: final
788data:
789
790";
791
792    let my_id = Some("my-id".into());
793
794    let mut decoder = SseDecoder::with_limit(NonZeroUsize::new(10).unwrap());
795
796    assert_eq!(
797        decoder.next(&mut stream),
798        Some(Ok(SseEvent::Message(MessageEvent {
799            event: "message".into(),
800            data: "0123456789".into(),
801            last_event_id: my_id.clone(),
802        })))
803    );
804    assert_eq!(decoder.next(&mut stream), Some(Err(PayloadTooLargeError)));
805    assert_eq!(
806        decoder.next(&mut stream),
807        Some(Ok(SseEvent::Message(MessageEvent {
808            event: "message".into(),
809            data: "mid".into(),
810            last_event_id: my_id.clone()
811        })))
812    );
813    assert_eq!(decoder.next(&mut stream), Some(Err(PayloadTooLargeError)));
814    assert_eq!(decoder.next(&mut stream), Some(Ok(SseEvent::Retry(10))));
815    assert_eq!(
816        decoder.next(&mut stream),
817        Some(Ok(SseEvent::Message(MessageEvent {
818            event: "final".into(),
819            data: "".into(),
820            last_event_id: my_id.clone()
821        })))
822    );
823    assert!(stream.is_empty());
824}