Skip to main content

sse_core/
decode.rs

1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8/// Represents a single Server-Sent Event message.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12    /// The event name (defaults to `"message"`).
13    pub event: Cow<'static, str>,
14    /// The payload data.
15    pub data: String,
16    /// The `Last-Event-ID` sent by the server, if any.
17    pub last_event_id: Option<Arc<str>>,
18}
19
20/// Commands and payloads yielded by the SSE stream.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24    /// A standard data message.
25    Message(MessageEvent),
26    /// A server request to change the client's reconnect time (in milliseconds).
27    Retry(u32),
28}
29
30/// Error indicating that a parsed field exceeded the maximum allowed buffer size.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("Payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36#[derive(Debug, Clone, Copy, Default)]
37struct FieldMode {
38    len: u8,
39    buf: [u8; 5],
40}
41
42impl FieldMode {
43    #[inline]
44    const fn new() -> Self {
45        Self {
46            len: 0,
47            buf: [0; 5],
48        }
49    }
50
51    #[inline]
52    fn try_extend(&mut self, src: &[u8]) -> bool {
53        let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
54            return false;
55        };
56        dst.copy_from_slice(src);
57        self.len += src.len() as u8;
58        true
59    }
60
61    #[inline]
62    fn as_slice(&self) -> &[u8] {
63        &self.buf[..self.len as usize]
64    }
65}
66
67#[derive(Debug, Clone, Copy)]
68enum ValueMode {
69    Data,
70    Event,
71    Retry,
72    Id,
73}
74
75#[derive(Debug, Clone, Copy)]
76enum Mode {
77    Bom { bytes_read: u8 },
78    Field(FieldMode),
79    Value(ValueMode),
80    Ignore,
81    PostCr,
82    PostColon(ValueMode),
83}
84
85/// The core state-machine parser for SSE.
86///
87/// This decoder does not perform any I/O. It consumes bytes from a given buffer
88/// and yields parsed [`SseEvent`]s. It is suitable for `no_std` environments.
89#[derive(Debug, Clone)]
90pub struct SseDecoder {
91    mode: Mode,
92    last_event_id: Option<Arc<str>>,
93    staged_last_event_id: Option<Arc<str>>,
94    last_event_id_buf: Vec<u8>,
95    event_buf: Vec<u8>,
96    data_buf: Vec<u8>,
97    retry_buf: Option<u32>,
98    max_payload_size: NonZeroUsize,
99}
100
101impl Default for SseDecoder {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl SseDecoder {
108    /// Creates a new decoder with the default payload size limit of 512KiB.
109    ///
110    /// # Example
111    /// ```rust
112    /// # use bytes::{Buf, Bytes};
113    /// # use sse_core::{SseDecoder, SseEvent};
114    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
115    /// let mut decoder = SseDecoder::new();
116    /// let mut buf = Bytes::from("data: standard stream\n\n");
117    ///
118    /// let event = decoder.next(&mut buf)?;
119    /// assert!(event.is_some());
120    /// # Ok(())
121    /// # }
122    /// ```
123    #[inline]
124    #[must_use]
125    pub fn new() -> Self {
126        Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
127    }
128
129    /// Creates a new decoder with a custom maximum payload size limit.
130    ///
131    /// This is useful in memory-constrained environments or when connecting to
132    /// untrusted servers to prevent memory exhaustion from infinitely long lines.
133    ///
134    /// # Example
135    /// ```rust
136    /// # use core::num::NonZeroUsize;
137    /// # use bytes::Bytes;
138    /// # use sse_core::{SseDecoder, SseEvent};
139    /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
140    /// // Create a strict decoder that rejects payloads over 1024 bytes
141    /// let limit = NonZeroUsize::new(1024).unwrap();
142    /// let mut decoder = SseDecoder::with_limit(limit);
143    ///
144    /// let mut buf = Bytes::from("data: small payload\n\n");
145    /// let _event = decoder.next(&mut buf)?;
146    /// # Ok(())
147    /// # }
148    /// ```
149    #[inline]
150    #[must_use]
151    pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
152        Self {
153            mode: Mode::Bom { bytes_read: 0 },
154            last_event_id: None,
155            staged_last_event_id: None,
156            last_event_id_buf: vec![],
157            event_buf: vec![],
158            data_buf: vec![],
159            retry_buf: None,
160            max_payload_size,
161        }
162    }
163
164    /// Returns the current `Last-Event-ID` known to the decoder, if any.
165    #[inline]
166    #[must_use]
167    pub fn last_event_id(&self) -> Option<&Arc<str>> {
168        self.last_event_id.as_ref()
169    }
170
171    /// Resets the decoder state for a new connection, explicitly overriding
172    /// the currently tracked `Last-Event-ID`.
173    ///
174    /// This method clears all internal byte buffers and resets the parser, but
175    /// instead of keeping the previous ID (like [`reconnect()`](Self::reconnect))
176    /// or dropping it (like [`clear()`](Self::clear)), it injects the provided ID.
177    ///
178    /// It is typically used to prime the state machine with a known ID
179    /// (e.g., from a local database) right before feeding the decoder bytes
180    /// from a newly established connection.
181    #[inline]
182    pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
183        self.last_event_id = id;
184        self.reconnect();
185    }
186
187    /// Resets the decoder state completely, dropping the current `Last-Event-ID`.
188    ///
189    /// This clears all internal byte buffers and purges the parser's state,
190    /// effectively starting fresh. Because it drops the `Last-Event-ID`, the
191    /// next connection will start from the present moment rather than resuming.
192    ///
193    /// * To reset the state but **keep** the current ID, use [`reconnect()`](Self::reconnect).
194    /// * To reset the state and **inject** a specific ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
195    #[inline]
196    pub fn clear(&mut self) {
197        self.reconnect_with_id(None);
198    }
199
200    /// Resets the buffer state for a new connection while retaining the `Last-Event-ID`.
201    ///
202    /// This clears the internal byte buffers to prepare for a fresh stream of data,
203    /// but safely preserves the most recently parsed `Last-Event-ID`. This ensures
204    /// that when you reconnect to the server, you can resume exactly where you left off.
205    ///
206    /// * To reset the state and **drop** the ID, use [`clear()`](Self::clear).
207    /// * To reset the state and **override** the ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
208    #[inline]
209    pub fn reconnect(&mut self) {
210        self.mode = Mode::Bom { bytes_read: 0 };
211        self.data_buf.clear();
212    }
213
214    fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
215        self.last_event_id = self.staged_last_event_id.clone();
216
217        self.mode = match cr {
218            true => Mode::PostCr,
219            false => Mode::Field(FieldMode::new()),
220        };
221
222        match self.data_buf.last() {
223            Some(b'\n') => {
224                self.data_buf.pop();
225            }
226            Some(_) => {}
227            None => {
228                self.event_buf.clear();
229                return None;
230            }
231        }
232
233        let data = String::from_utf8_lossy(&self.data_buf).into_owned();
234        self.data_buf.clear();
235
236        let event = match &*self.event_buf {
237            b"" => Cow::Borrowed("message"),
238            event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
239        };
240        self.event_buf.clear();
241
242        Some(SseEvent::Message(MessageEvent {
243            data,
244            event,
245            last_event_id: self.last_event_id.clone(),
246        }))
247    }
248
249    /// Consumes bytes from the provided buffer and attempts to yield an event.
250    ///
251    /// The decoder does not store unparsed bytes internally. It reads directly
252    /// from the provided buffer, advancing the buffer's cursor only for the bytes
253    /// it successfully parses.
254    ///
255    /// If `Ok(None)` is returned, the provided buffer has been exhausted and
256    /// more bytes are needed to complete the current event. You should fetch more
257    /// data, append it to your buffer, and call `next()` again.
258    ///
259    /// # Example
260    /// ```
261    /// use bytes::{Buf, Bytes};
262    /// # use sse_core::{SseDecoder, SseEvent};
263    ///
264    /// let mut decoder = SseDecoder::new();
265    /// let mut buffer = Bytes::from("data: hello\n\n");
266    ///
267    /// // Call next() in a loop to drain all available events
268    /// while let Some(event) = decoder.next(&mut buffer).unwrap() {
269    ///     println!("Received: {:?}", event);
270    /// }
271    ///
272    /// // When next() returns None, the decoder is waiting for more data.
273    /// assert!(buffer.is_empty());
274    /// ```
275    ///
276    /// # Errors
277    ///
278    /// Returns a [`PayloadTooLargeError`] if a single field (like data or event name)
279    /// exceeds the maximum payload size limit configured for this decoder.
280    pub fn next(&mut self, buf: &mut impl Buf) -> Result<Option<SseEvent>, PayloadTooLargeError> {
281        // # 9.2.5 Parsing an event stream
282        //
283        // stream        = [ bom ] *event
284        // event         = *( comment / field ) end-of-line
285        // comment       = colon *any-char end-of-line
286        // field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
287        // end-of-line   = ( cr lf / cr / lf )
288        //
289        // ; characters
290        // lf            = %x000A ; U+000A LINE FEED (LF)
291        // cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
292        // space         = %x0020 ; U+0020 SPACE
293        // colon         = %x003A ; U+003A COLON (:)
294        // bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
295        // name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
296        //                 ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
297        // any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
298        //                 ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
299
300        loop {
301            let chunk = buf.chunk();
302            if chunk.is_empty() {
303                return Ok(None);
304            }
305
306            match &mut self.mode {
307                Mode::Bom { bytes_read } => {
308                    let b0 = chunk[0];
309
310                    const BOM: &[u8; 3] = b"\xef\xbb\xbf";
311
312                    if b0 != BOM[*bytes_read as usize] {
313                        self.mode = match *bytes_read {
314                            0 => Mode::Field(FieldMode::new()),
315                            _ => Mode::Ignore,
316                        };
317                        continue;
318                    }
319
320                    buf.advance(1);
321                    *bytes_read += 1;
322
323                    if BOM.len() <= *bytes_read as usize {
324                        self.mode = Mode::Field(FieldMode::new());
325                    }
326                }
327                Mode::Field(field) => {
328                    let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
329                        if !field.try_extend(chunk) {
330                            self.mode = Mode::Ignore;
331                        }
332                        buf.advance(chunk.len());
333                        continue;
334                    };
335
336                    let subchunk = &chunk[..field_end];
337                    let b0 = chunk[field_end];
338
339                    if !field.try_extend(subchunk) {
340                        self.mode = Mode::Ignore;
341                        buf.advance(subchunk.len());
342                        continue;
343                    }
344
345                    buf.advance(subchunk.len() + 1);
346
347                    let value = match field.as_slice() {
348                        b"data" => ValueMode::Data,
349                        b"event" => {
350                            self.event_buf.clear();
351                            ValueMode::Event
352                        }
353                        b"retry" => {
354                            self.retry_buf = None;
355                            ValueMode::Retry
356                        }
357                        b"id" => {
358                            self.last_event_id_buf.clear();
359                            ValueMode::Id
360                        }
361                        b"" => match b0 {
362                            b':' => {
363                                self.mode = Mode::Ignore;
364                                continue;
365                            }
366                            b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
367                                Some(ev) => return Ok(Some(ev)),
368                                None => continue,
369                            },
370                            _ => unreachable!(),
371                        },
372                        _ => {
373                            self.mode = Mode::Ignore;
374                            continue;
375                        }
376                    };
377
378                    match b0 {
379                        b'\n' => self.mode = Mode::Field(FieldMode::new()),
380                        b'\r' => self.mode = Mode::PostCr,
381                        b':' => {
382                            self.mode = Mode::PostColon(value);
383                            continue;
384                        }
385                        _ => unreachable!(),
386                    }
387
388                    match value {
389                        ValueMode::Data => self.data_buf.push(b'\n'),
390                        ValueMode::Id => self.last_event_id_buf.clear(),
391                        ValueMode::Event | ValueMode::Retry => {}
392                    }
393                }
394                Mode::Value(ValueMode::Retry) => {
395                    let mut advanced = 0;
396                    let mut return_event = false;
397
398                    for &b in chunk {
399                        advanced += 1;
400                        match b {
401                            b'0'..=b'9' => {
402                                let digit = (b & 0xf) as _;
403
404                                let retry_buf = self.retry_buf.unwrap_or(0);
405                                let Some(retry_buf) = retry_buf.checked_mul(10) else {
406                                    self.mode = Mode::Ignore;
407                                    break;
408                                };
409                                let Some(retry_buf) = retry_buf.checked_add(digit) else {
410                                    self.mode = Mode::Ignore;
411                                    break;
412                                };
413                                self.retry_buf = Some(retry_buf);
414                            }
415                            b'\r' => {
416                                self.mode = Mode::PostCr;
417                                return_event = true;
418                                break;
419                            }
420                            b'\n' => {
421                                self.mode = Mode::Field(FieldMode::new());
422                                return_event = true;
423                                break;
424                            }
425                            _ => {
426                                self.mode = Mode::Ignore;
427                                break;
428                            }
429                        }
430                    }
431
432                    buf.advance(advanced);
433
434                    if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
435                        return Ok(Some(SseEvent::Retry(retry_buf)));
436                    }
437                }
438                Mode::Value(ValueMode::Data) => {
439                    if consume_until_newline(
440                        &mut self.mode,
441                        Some(&mut self.data_buf),
442                        self.max_payload_size,
443                        buf,
444                    )? {
445                        self.data_buf.push(b'\n');
446                    }
447                }
448                Mode::Value(ValueMode::Event) => {
449                    consume_until_newline(
450                        &mut self.mode,
451                        Some(&mut self.event_buf),
452                        self.max_payload_size,
453                        buf,
454                    )?;
455                }
456                Mode::Value(ValueMode::Id) => {
457                    if consume_until_newline(
458                        &mut self.mode,
459                        Some(&mut self.last_event_id_buf),
460                        self.max_payload_size,
461                        buf,
462                    )? && memchr(0, &self.last_event_id_buf).is_none()
463                    {
464                        self.staged_last_event_id = match &*self.last_event_id_buf {
465                            [] => None,
466                            buf => Some(String::from_utf8_lossy(buf).into()),
467                        };
468                    }
469                }
470                Mode::Ignore => {
471                    consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
472                        .expect("there should be no payload to grow too large");
473                }
474                Mode::PostCr => {
475                    if chunk[0] == b'\n' {
476                        buf.advance(1);
477                    }
478                    self.mode = Mode::Field(FieldMode::new());
479                }
480                Mode::PostColon(value) => {
481                    if chunk[0] == b' ' {
482                        buf.advance(1);
483                    }
484                    self.mode = Mode::Value(*value);
485                }
486            }
487        }
488    }
489}
490
491fn consume_until_newline(
492    mode: &mut Mode,
493    mut out: Option<&mut Vec<u8>>,
494    max_size: NonZeroUsize,
495    buf: &mut impl Buf,
496) -> Result<bool, PayloadTooLargeError> {
497    loop {
498        let chunk = buf.chunk();
499        if chunk.is_empty() {
500            return Ok(false);
501        };
502
503        let Some(i) = memchr2(b'\r', b'\n', chunk) else {
504            if let Some(out) = out.as_deref_mut() {
505                if max_size.get() < out.len() + chunk.len() {
506                    *mode = Mode::Ignore;
507                    return Err(PayloadTooLargeError);
508                }
509                out.extend_from_slice(chunk);
510            }
511            buf.advance(chunk.len());
512            continue;
513        };
514
515        if let Some(out) = out {
516            if max_size.get() < out.len() + i {
517                *mode = Mode::Ignore;
518                return Err(PayloadTooLargeError);
519            }
520            out.extend_from_slice(&chunk[..i]);
521        }
522
523        *mode = match chunk[i] {
524            b'\r' => Mode::PostCr,
525            b'\n' => Mode::Field(FieldMode::new()),
526            _ => unreachable!(),
527        };
528
529        buf.advance(i + 1);
530
531        return Ok(true);
532    }
533}
534
535#[test]
536fn hard_parse() -> Result<(), PayloadTooLargeError> {
537    use std::slice;
538
539    // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
540    let bytes = "
541
542:
543
544event: my-event\r
545data:line1
546data: line2
547:
548id: my-id
549:should be ignored too\rretry:42
550retry:
551
552data:second
553
554data:ignored
555";
556
557    let mut decoder = SseDecoder::new();
558
559    let events = bytes
560        .bytes()
561        .filter_map(|b| decoder.next(&mut slice::from_ref(&b)).transpose())
562        .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
563
564    let id = Some("my-id".into());
565
566    assert_eq!(
567        events,
568        &[
569            SseEvent::Retry(42),
570            SseEvent::Message(MessageEvent {
571                event: "my-event".into(),
572                data: "line1\nline2".into(),
573                last_event_id: id.clone()
574            }),
575            SseEvent::Message(MessageEvent {
576                event: "message".into(),
577                data: "second".into(),
578                last_event_id: id.clone()
579            })
580        ]
581    );
582    Ok(())
583}