Skip to main content

sse_core/
decode.rs

1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{fmt, num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8/// Represents a single Server-Sent Event message.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12    /// The event name (defaults to `"message"`).
13    pub event: Cow<'static, str>,
14    /// The payload data.
15    pub data: String,
16    /// The `Last-Event-ID` sent by the server, if any.
17    pub last_event_id: Option<Arc<str>>,
18}
19
20/// Commands and payloads yielded by the SSE stream.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24    /// A standard data message.
25    Message(MessageEvent),
26    /// A server request to change the client's reconnect time (in milliseconds).
27    Retry(u32),
28}
29
30/// Error indicating that a parsed field exceeded the maximum allowed buffer size.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36const MAX_DEBUG_SIZE: usize = 200;
37
38struct ShowBigStr<'a>(&'a str);
39
40impl fmt::Debug for ShowBigStr<'_> {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        let mut end = self.0.len().min(MAX_DEBUG_SIZE);
43        while !self.0.is_char_boundary(end) {
44            end -= 1;
45        }
46        let s = &self.0[..end];
47
48        fmt::Debug::fmt(s, f)?;
49        if end < self.0.len() {
50            write!(f, "... ({} bytes total)", self.0.len())?;
51        }
52
53        Ok(())
54    }
55}
56
57struct ShowBigBuf<'a>(&'a [u8]);
58
59impl fmt::Debug for ShowBigBuf<'_> {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        let (buf, truncated) = match self.0.len() {
62            ..=MAX_DEBUG_SIZE => (self.0, false),
63            _ => (&self.0[..MAX_DEBUG_SIZE], true),
64        };
65
66        let mut chunks = buf.utf8_chunks().peekable();
67
68        f.write_str("\"")?;
69        while let Some(chunk) = chunks.next() {
70            fmt::Display::fmt(&chunk.valid().escape_debug(), f)?;
71
72            let invalid = chunk.invalid();
73            if invalid.is_empty() {
74                continue;
75            }
76
77            // If we truncated and this is the very last chunk, the invalid bytes
78            // are almost certainly just a sliced multi-byte UTF-8 character.
79            if truncated && chunks.peek().is_none() {
80                break;
81            }
82
83            for &byte in invalid {
84                write!(f, "\\x{byte:02X}")?;
85            }
86        }
87
88        if truncated {
89            write!(f, "\"... ({} bytes total)", self.0.len())?;
90        } else {
91            f.write_str("\"")?;
92        }
93
94        Ok(())
95    }
96}
97
98#[derive(Clone, Copy, Default)]
99struct FieldMode {
100    len: u8,
101    buf: [u8; 5],
102}
103
104impl FieldMode {
105    #[inline]
106    const fn new() -> Self {
107        Self {
108            len: 0,
109            buf: [0; 5],
110        }
111    }
112
113    #[inline]
114    fn try_extend(&mut self, src: &[u8]) -> bool {
115        let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
116            return false;
117        };
118        dst.copy_from_slice(src);
119        self.len += src.len() as u8;
120        true
121    }
122
123    #[inline]
124    fn as_slice(&self) -> &[u8] {
125        &self.buf[..self.len as usize]
126    }
127}
128
129impl fmt::Debug for FieldMode {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_tuple("FieldMode")
132            .field(&ShowBigBuf(self.as_slice()))
133            .finish()
134    }
135}
136
137#[derive(Debug, Clone, Copy)]
138enum ValueMode {
139    Data,
140    Event,
141    Retry,
142    Id,
143}
144
145#[derive(Debug, Clone, Copy)]
146enum Mode {
147    Bom { bytes_read: u8 },
148    Field(FieldMode),
149    Value(ValueMode),
150    Ignore,
151    PostCr,
152    PostColon(ValueMode),
153}
154
155/// The core state-machine parser for SSE.
156///
157/// This decoder does not perform any I/O. It consumes bytes from a given buffer
158/// and yields parsed [`SseEvent`]s. It is suitable for `no_std` environments.
159#[derive(Clone)]
160pub struct SseDecoder {
161    mode: Mode,
162    last_event_id: Option<Arc<str>>,
163    staged_last_event_id: Option<Arc<str>>,
164    last_event_id_buf: Vec<u8>,
165    event_buf: Vec<u8>,
166    data_buf: Vec<u8>,
167    retry_buf: Option<u32>,
168    max_payload_size: NonZeroUsize,
169}
170
171impl SseDecoder {
172    /// Creates a new decoder with the default payload size limit of 512KiB.
173    ///
174    /// # Example
175    /// ```rust
176    /// # use bytes::{Buf, Bytes};
177    /// # use sse_core::{SseDecoder, SseEvent};
178    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
179    /// let mut decoder = SseDecoder::new();
180    /// let mut buf = Bytes::from("data: standard stream\n\n");
181    ///
182    /// let event = decoder.next(&mut buf)?;
183    /// assert!(event.is_some());
184    /// # Ok(())
185    /// # }
186    /// ```
187    #[inline]
188    #[must_use]
189    pub fn new() -> Self {
190        Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
191    }
192
193    /// Creates a new decoder with a custom maximum payload size limit.
194    ///
195    /// This is useful in memory-constrained environments or when connecting to
196    /// untrusted servers to prevent memory exhaustion from infinitely long lines.
197    ///
198    /// # Example
199    /// ```rust
200    /// # use core::num::NonZeroUsize;
201    /// # use bytes::Bytes;
202    /// # use sse_core::{SseDecoder, SseEvent};
203    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
204    /// // Create a strict decoder that rejects payloads over 1024 bytes
205    /// let limit = NonZeroUsize::new(1024).unwrap();
206    /// let mut decoder = SseDecoder::with_limit(limit);
207    ///
208    /// let mut buf = Bytes::from("data: small payload\n\n");
209    /// let _event = decoder.next(&mut buf)?;
210    /// # Ok(())
211    /// # }
212    /// ```
213    #[inline]
214    #[must_use]
215    pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
216        Self {
217            mode: Mode::Bom { bytes_read: 0 },
218            last_event_id: None,
219            staged_last_event_id: None,
220            last_event_id_buf: vec![],
221            event_buf: vec![],
222            data_buf: vec![],
223            retry_buf: None,
224            max_payload_size,
225        }
226    }
227
228    /// Returns the current `Last-Event-ID` known to the decoder, if any.
229    #[inline]
230    #[must_use]
231    pub fn last_event_id(&self) -> Option<&Arc<str>> {
232        self.last_event_id.as_ref()
233    }
234
235    /// Resets the decoder state for a new connection, explicitly overriding
236    /// the currently tracked `Last-Event-ID`.
237    ///
238    /// This method clears all internal byte buffers and resets the parser, but
239    /// instead of keeping the previous ID (like [`reconnect()`](Self::reconnect))
240    /// or dropping it (like [`clear()`](Self::clear)), it injects the provided ID.
241    ///
242    /// It is typically used to prime the state machine with a known ID
243    /// (e.g., from a local database) right before feeding the decoder bytes
244    /// from a newly established connection.
245    #[inline]
246    pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
247        self.last_event_id = id;
248        self.reconnect();
249    }
250
251    /// Resets the decoder state completely, dropping the current `Last-Event-ID`.
252    ///
253    /// This clears all internal byte buffers and purges the parser's state,
254    /// effectively starting fresh. Because it drops the `Last-Event-ID`, the
255    /// next connection will start from the present moment rather than resuming.
256    ///
257    /// * To reset the state but **keep** the current ID, use [`reconnect()`](Self::reconnect).
258    /// * To reset the state and **inject** a specific ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
259    #[inline]
260    pub fn clear(&mut self) {
261        self.reconnect_with_id(None);
262    }
263
264    /// Resets the buffer state for a new connection while retaining the `Last-Event-ID`.
265    ///
266    /// This clears the internal byte buffers to prepare for a fresh stream of data,
267    /// but safely preserves the most recently parsed `Last-Event-ID`. This ensures
268    /// that when you reconnect to the server, you can resume exactly where you left off.
269    ///
270    /// * To reset the state and **drop** the ID, use [`clear()`](Self::clear).
271    /// * To reset the state and **override** the ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
272    #[inline]
273    pub fn reconnect(&mut self) {
274        self.mode = Mode::Bom { bytes_read: 0 };
275        self.data_buf.clear();
276    }
277
278    fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
279        self.last_event_id = self.staged_last_event_id.clone();
280
281        self.mode = match cr {
282            true => Mode::PostCr,
283            false => Mode::Field(FieldMode::new()),
284        };
285
286        match self.data_buf.last() {
287            Some(b'\n') => {
288                self.data_buf.pop();
289            }
290            Some(_) => {}
291            None => {
292                self.event_buf.clear();
293                return None;
294            }
295        }
296
297        let data = String::from_utf8_lossy(&self.data_buf).into_owned();
298        self.data_buf.clear();
299
300        let event = match &*self.event_buf {
301            b"" => Cow::Borrowed("message"),
302            event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
303        };
304        self.event_buf.clear();
305
306        Some(SseEvent::Message(MessageEvent {
307            data,
308            event,
309            last_event_id: self.last_event_id.clone(),
310        }))
311    }
312
313    /// Consumes bytes from the provided buffer and attempts to yield an event.
314    ///
315    /// The decoder does not store unparsed bytes internally. It reads directly
316    /// from the provided buffer, advancing the buffer's cursor only for the bytes
317    /// it successfully parses.
318    ///
319    /// If `Ok(None)` is returned, the provided buffer has been exhausted and
320    /// more bytes are needed to complete the current event. You should fetch more
321    /// data, append it to your buffer, and call `next()` again.
322    ///
323    /// # Example
324    /// ```
325    /// use bytes::{Buf, Bytes};
326    /// # use sse_core::{SseDecoder, SseEvent};
327    ///
328    /// let mut decoder = SseDecoder::new();
329    /// let mut buffer = Bytes::from("data: hello\n\n");
330    ///
331    /// // Call next() in a loop to drain all available events
332    /// while let Some(event) = decoder.next(&mut buffer).unwrap() {
333    ///     println!("Received: {:?}", event);
334    /// }
335    ///
336    /// // When next() returns None, the decoder is waiting for more data.
337    /// assert!(buffer.is_empty());
338    /// ```
339    ///
340    /// # Errors
341    ///
342    /// Returns a [`PayloadTooLargeError`] if a single field (like data or event name)
343    /// exceeds the maximum payload size limit configured for this decoder.
344    pub fn next(&mut self, buf: &mut impl Buf) -> Result<Option<SseEvent>, PayloadTooLargeError> {
345        // # 9.2.5 Parsing an event stream
346        //
347        // stream        = [ bom ] *event
348        // event         = *( comment / field ) end-of-line
349        // comment       = colon *any-char end-of-line
350        // field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
351        // end-of-line   = ( cr lf / cr / lf )
352        //
353        // ; characters
354        // lf            = %x000A ; U+000A LINE FEED (LF)
355        // cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
356        // space         = %x0020 ; U+0020 SPACE
357        // colon         = %x003A ; U+003A COLON (:)
358        // bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
359        // name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
360        //                 ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
361        // any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
362        //                 ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
363
364        loop {
365            let chunk = buf.chunk();
366            if chunk.is_empty() {
367                return Ok(None);
368            }
369
370            match &mut self.mode {
371                Mode::Bom { bytes_read } => {
372                    let b0 = chunk[0];
373
374                    const BOM: &[u8; 3] = b"\xef\xbb\xbf";
375
376                    if b0 != BOM[*bytes_read as usize] {
377                        self.mode = match *bytes_read {
378                            0 => Mode::Field(FieldMode::new()),
379                            _ => Mode::Ignore,
380                        };
381                        continue;
382                    }
383
384                    buf.advance(1);
385                    *bytes_read += 1;
386
387                    if BOM.len() <= *bytes_read as usize {
388                        self.mode = Mode::Field(FieldMode::new());
389                    }
390                }
391                Mode::Field(field) => {
392                    let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
393                        if !field.try_extend(chunk) {
394                            self.mode = Mode::Ignore;
395                        }
396                        buf.advance(chunk.len());
397                        continue;
398                    };
399
400                    let subchunk = &chunk[..field_end];
401                    let b0 = chunk[field_end];
402
403                    if !field.try_extend(subchunk) {
404                        self.mode = Mode::Ignore;
405                        buf.advance(subchunk.len());
406                        continue;
407                    }
408
409                    buf.advance(subchunk.len() + 1);
410
411                    let value = match field.as_slice() {
412                        b"data" => ValueMode::Data,
413                        b"event" => {
414                            self.event_buf.clear();
415                            ValueMode::Event
416                        }
417                        b"retry" => {
418                            self.retry_buf = None;
419                            ValueMode::Retry
420                        }
421                        b"id" => {
422                            self.last_event_id_buf.clear();
423                            ValueMode::Id
424                        }
425                        b"" => match b0 {
426                            b':' => {
427                                self.mode = Mode::Ignore;
428                                continue;
429                            }
430                            b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
431                                Some(ev) => return Ok(Some(ev)),
432                                None => continue,
433                            },
434                            _ => unreachable!(),
435                        },
436                        _ => {
437                            self.mode = Mode::Ignore;
438                            continue;
439                        }
440                    };
441
442                    match b0 {
443                        b'\n' => self.mode = Mode::Field(FieldMode::new()),
444                        b'\r' => self.mode = Mode::PostCr,
445                        b':' => {
446                            self.mode = Mode::PostColon(value);
447                            continue;
448                        }
449                        _ => unreachable!(),
450                    }
451
452                    match value {
453                        ValueMode::Data => self.data_buf.push(b'\n'),
454                        ValueMode::Id => self.last_event_id_buf.clear(),
455                        ValueMode::Event | ValueMode::Retry => {}
456                    }
457                }
458                Mode::Value(ValueMode::Retry) => {
459                    let mut advanced = 0;
460                    let mut return_event = false;
461
462                    for &b in chunk {
463                        advanced += 1;
464                        match b {
465                            b'0'..=b'9' => {
466                                let digit = (b & 0xf) as _;
467
468                                let retry_buf = self.retry_buf.unwrap_or(0);
469                                let Some(retry_buf) = retry_buf.checked_mul(10) else {
470                                    self.mode = Mode::Ignore;
471                                    break;
472                                };
473                                let Some(retry_buf) = retry_buf.checked_add(digit) else {
474                                    self.mode = Mode::Ignore;
475                                    break;
476                                };
477                                self.retry_buf = Some(retry_buf);
478                            }
479                            b'\r' => {
480                                self.mode = Mode::PostCr;
481                                return_event = true;
482                                break;
483                            }
484                            b'\n' => {
485                                self.mode = Mode::Field(FieldMode::new());
486                                return_event = true;
487                                break;
488                            }
489                            _ => {
490                                self.mode = Mode::Ignore;
491                                break;
492                            }
493                        }
494                    }
495
496                    buf.advance(advanced);
497
498                    if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
499                        return Ok(Some(SseEvent::Retry(retry_buf)));
500                    }
501                }
502                Mode::Value(ValueMode::Data) => {
503                    if consume_until_newline(
504                        &mut self.mode,
505                        Some(&mut self.data_buf),
506                        self.max_payload_size,
507                        buf,
508                    )? {
509                        self.data_buf.push(b'\n');
510                    }
511                }
512                Mode::Value(ValueMode::Event) => {
513                    consume_until_newline(
514                        &mut self.mode,
515                        Some(&mut self.event_buf),
516                        self.max_payload_size,
517                        buf,
518                    )?;
519                }
520                Mode::Value(ValueMode::Id) => {
521                    if consume_until_newline(
522                        &mut self.mode,
523                        Some(&mut self.last_event_id_buf),
524                        self.max_payload_size,
525                        buf,
526                    )? && memchr(0, &self.last_event_id_buf).is_none()
527                    {
528                        self.staged_last_event_id = match &*self.last_event_id_buf {
529                            [] => None,
530                            buf => Some(String::from_utf8_lossy(buf).into()),
531                        };
532                    }
533                }
534                Mode::Ignore => {
535                    consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
536                        .expect("there should be no payload to grow too large");
537                }
538                Mode::PostCr => {
539                    if chunk[0] == b'\n' {
540                        buf.advance(1);
541                    }
542                    self.mode = Mode::Field(FieldMode::new());
543                }
544                Mode::PostColon(value) => {
545                    if chunk[0] == b' ' {
546                        buf.advance(1);
547                    }
548                    self.mode = Mode::Value(*value);
549                }
550            }
551        }
552    }
553}
554
555impl Default for SseDecoder {
556    fn default() -> Self {
557        Self::new()
558    }
559}
560
561impl fmt::Debug for SseDecoder {
562    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563        f.debug_struct("SseDecoder")
564            .field("mode", &self.mode)
565            .field(
566                "last_event_id",
567                &self.last_event_id.as_deref().map(ShowBigStr),
568            )
569            .field(
570                "staged_last_event_id",
571                &self.staged_last_event_id.as_deref().map(ShowBigStr),
572            )
573            .field("last_event_id_buf", &ShowBigBuf(&self.last_event_id_buf))
574            .field("event_buf", &ShowBigBuf(&self.event_buf))
575            .field("data_buf", &ShowBigBuf(&self.data_buf))
576            .field("retry_buf", &self.retry_buf)
577            .field("max_payload_size", &self.max_payload_size)
578            .finish()
579    }
580}
581
582fn consume_until_newline(
583    mode: &mut Mode,
584    mut out: Option<&mut Vec<u8>>,
585    max_size: NonZeroUsize,
586    buf: &mut impl Buf,
587) -> Result<bool, PayloadTooLargeError> {
588    loop {
589        let chunk = buf.chunk();
590        if chunk.is_empty() {
591            return Ok(false);
592        };
593
594        let Some(i) = memchr2(b'\r', b'\n', chunk) else {
595            if let Some(out) = out.as_deref_mut() {
596                if max_size.get() < out.len() + chunk.len() {
597                    *mode = Mode::Ignore;
598                    return Err(PayloadTooLargeError);
599                }
600                out.extend_from_slice(chunk);
601            }
602            buf.advance(chunk.len());
603            continue;
604        };
605
606        if let Some(out) = out {
607            if max_size.get() < out.len() + i {
608                *mode = Mode::Ignore;
609                return Err(PayloadTooLargeError);
610            }
611            out.extend_from_slice(&chunk[..i]);
612        }
613
614        *mode = match chunk[i] {
615            b'\r' => Mode::PostCr,
616            b'\n' => Mode::Field(FieldMode::new()),
617            _ => unreachable!(),
618        };
619
620        buf.advance(i + 1);
621
622        return Ok(true);
623    }
624}
625
626#[test]
627fn hard_parse() -> Result<(), PayloadTooLargeError> {
628    use core::slice;
629
630    // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
631    let bytes = "
632
633:
634
635event: my-event\r
636data:line1
637data: line2
638:
639id: my-id
640:should be ignored too\rretry:42
641retry:
642
643data:second
644
645data:ignored
646";
647
648    let mut decoder = SseDecoder::new();
649
650    let events = bytes
651        .bytes()
652        .filter_map(|b| decoder.next(&mut slice::from_ref(&b)).transpose())
653        .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
654
655    let id = Some("my-id".into());
656
657    assert_eq!(
658        events,
659        &[
660            SseEvent::Retry(42),
661            SseEvent::Message(MessageEvent {
662                event: "my-event".into(),
663                data: "line1\nline2".into(),
664                last_event_id: id.clone()
665            }),
666            SseEvent::Message(MessageEvent {
667                event: "message".into(),
668                data: "second".into(),
669                last_event_id: id.clone()
670            })
671        ]
672    );
673    Ok(())
674}