sse_core/decode.rs
1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{fmt, num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8/// Represents a single Server-Sent Event message.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12 /// The event name (defaults to `"message"`).
13 pub event: Cow<'static, str>,
14 /// The payload data.
15 pub data: String,
16 /// The `Last-Event-ID` sent by the server, if any.
17 pub last_event_id: Option<Arc<str>>,
18}
19
20/// Commands and payloads yielded by the SSE stream.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24 /// A standard data message.
25 Message(MessageEvent),
26 /// A server request to change the client's reconnect time (in milliseconds).
27 Retry(u32),
28}
29
30/// Error indicating that a parsed field exceeded the maximum allowed buffer size.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36const MAX_DEBUG_SIZE: usize = 200;
37
38struct ShowBigStr<'a>(&'a str);
39
40impl fmt::Debug for ShowBigStr<'_> {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 let mut end = self.0.len().min(MAX_DEBUG_SIZE);
43 while !self.0.is_char_boundary(end) {
44 end -= 1;
45 }
46 let s = &self.0[..end];
47
48 fmt::Debug::fmt(s, f)?;
49 if end < self.0.len() {
50 write!(f, "... ({} bytes total)", self.0.len())?;
51 }
52
53 Ok(())
54 }
55}
56
57struct ShowBigBuf<'a>(&'a [u8]);
58
59impl fmt::Debug for ShowBigBuf<'_> {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 let (buf, truncated) = match self.0.len() {
62 ..=MAX_DEBUG_SIZE => (self.0, false),
63 _ => (&self.0[..MAX_DEBUG_SIZE], true),
64 };
65
66 let mut chunks = buf.utf8_chunks().peekable();
67
68 f.write_str("\"")?;
69 while let Some(chunk) = chunks.next() {
70 fmt::Display::fmt(&chunk.valid().escape_debug(), f)?;
71
72 let invalid = chunk.invalid();
73 if invalid.is_empty() {
74 continue;
75 }
76
77 // If we truncated and this is the very last chunk, the invalid bytes
78 // are almost certainly just a sliced multi-byte UTF-8 character.
79 if truncated && chunks.peek().is_none() {
80 break;
81 }
82
83 for &byte in invalid {
84 write!(f, "\\x{byte:02X}")?;
85 }
86 }
87
88 if truncated {
89 write!(f, "\"... ({} bytes total)", self.0.len())?;
90 } else {
91 f.write_str("\"")?;
92 }
93
94 Ok(())
95 }
96}
97
98#[derive(Clone, Copy, Default)]
99struct FieldMode {
100 len: u8,
101 buf: [u8; 5],
102}
103
104impl FieldMode {
105 #[inline]
106 const fn new() -> Self {
107 Self {
108 len: 0,
109 buf: [0; 5],
110 }
111 }
112
113 #[inline]
114 fn try_extend(&mut self, src: &[u8]) -> bool {
115 let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
116 return false;
117 };
118 dst.copy_from_slice(src);
119 self.len += src.len() as u8;
120 true
121 }
122
123 #[inline]
124 fn as_slice(&self) -> &[u8] {
125 &self.buf[..self.len as usize]
126 }
127}
128
129impl fmt::Debug for FieldMode {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.debug_tuple("FieldMode")
132 .field(&ShowBigBuf(self.as_slice()))
133 .finish()
134 }
135}
136
137#[derive(Debug, Clone, Copy)]
138enum ValueMode {
139 Data,
140 Event,
141 Retry,
142 Id,
143}
144
145#[derive(Debug, Clone, Copy)]
146enum Mode {
147 Bom { bytes_read: u8 },
148 Field(FieldMode),
149 Value(ValueMode),
150 Ignore,
151 PostCr,
152 PostColon(ValueMode),
153}
154
155/// The core state-machine parser for SSE.
156///
157/// This decoder does not perform any I/O. It consumes bytes from a given buffer
158/// and yields parsed [`SseEvent`]s. It is suitable for `no_std` environments.
159#[derive(Clone)]
160pub struct SseDecoder {
161 mode: Mode,
162 last_event_id: Option<Arc<str>>,
163 staged_last_event_id: Option<Arc<str>>,
164 last_event_id_buf: Vec<u8>,
165 event_buf: Vec<u8>,
166 data_buf: Vec<u8>,
167 retry_buf: Option<u32>,
168 max_payload_size: NonZeroUsize,
169}
170
171impl SseDecoder {
172 /// Creates a new decoder with the default payload size limit of 512KiB.
173 ///
174 /// # Example
175 /// ```rust
176 /// # use bytes::{Buf, Bytes};
177 /// # use sse_core::{SseDecoder, SseEvent};
178 /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
179 /// let mut decoder = SseDecoder::new();
180 /// let mut buf = Bytes::from("data: standard stream\n\n");
181 ///
182 /// let event = decoder.next(&mut buf)?;
183 /// assert!(event.is_some());
184 /// # Ok(())
185 /// # }
186 /// ```
187 #[inline]
188 #[must_use]
189 pub fn new() -> Self {
190 Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
191 }
192
193 /// Creates a new decoder with a custom maximum payload size limit.
194 ///
195 /// This is useful in memory-constrained environments or when connecting to
196 /// untrusted servers to prevent memory exhaustion from infinitely long lines.
197 ///
198 /// # Example
199 /// ```rust
200 /// # use core::num::NonZeroUsize;
201 /// # use bytes::Bytes;
202 /// # use sse_core::{SseDecoder, SseEvent};
203 /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
204 /// // Create a strict decoder that rejects payloads over 1024 bytes
205 /// let limit = NonZeroUsize::new(1024).unwrap();
206 /// let mut decoder = SseDecoder::with_limit(limit);
207 ///
208 /// let mut buf = Bytes::from("data: small payload\n\n");
209 /// let _event = decoder.next(&mut buf)?;
210 /// # Ok(())
211 /// # }
212 /// ```
213 #[inline]
214 #[must_use]
215 pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
216 Self {
217 mode: Mode::Bom { bytes_read: 0 },
218 last_event_id: None,
219 staged_last_event_id: None,
220 last_event_id_buf: vec![],
221 event_buf: vec![],
222 data_buf: vec![],
223 retry_buf: None,
224 max_payload_size,
225 }
226 }
227
228 /// Returns the current `Last-Event-ID` known to the decoder, if any.
229 #[inline]
230 #[must_use]
231 pub fn last_event_id(&self) -> Option<&Arc<str>> {
232 self.last_event_id.as_ref()
233 }
234
235 /// Resets the decoder state for a new connection, explicitly overriding
236 /// the currently tracked `Last-Event-ID`.
237 ///
238 /// This method clears all internal byte buffers and resets the parser, but
239 /// instead of keeping the previous ID (like [`reconnect()`](Self::reconnect))
240 /// or dropping it (like [`clear()`](Self::clear)), it injects the provided ID.
241 ///
242 /// It is typically used to prime the state machine with a known ID
243 /// (e.g., from a local database) right before feeding the decoder bytes
244 /// from a newly established connection.
245 #[inline]
246 pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
247 self.last_event_id = id;
248 self.reconnect();
249 }
250
251 /// Resets the decoder state completely, dropping the current `Last-Event-ID`.
252 ///
253 /// This clears all internal byte buffers and purges the parser's state,
254 /// effectively starting fresh. Because it drops the `Last-Event-ID`, the
255 /// next connection will start from the present moment rather than resuming.
256 ///
257 /// * To reset the state but **keep** the current ID, use [`reconnect()`](Self::reconnect).
258 /// * To reset the state and **inject** a specific ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
259 #[inline]
260 pub fn clear(&mut self) {
261 self.reconnect_with_id(None);
262 }
263
264 /// Resets the buffer state for a new connection while retaining the `Last-Event-ID`.
265 ///
266 /// This clears the internal byte buffers to prepare for a fresh stream of data,
267 /// but safely preserves the most recently parsed `Last-Event-ID`. This ensures
268 /// that when you reconnect to the server, you can resume exactly where you left off.
269 ///
270 /// * To reset the state and **drop** the ID, use [`clear()`](Self::clear).
271 /// * To reset the state and **override** the ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
272 #[inline]
273 pub fn reconnect(&mut self) {
274 self.mode = Mode::Bom { bytes_read: 0 };
275 self.data_buf.clear();
276 }
277
278 fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
279 self.last_event_id = self.staged_last_event_id.clone();
280
281 self.mode = match cr {
282 true => Mode::PostCr,
283 false => Mode::Field(FieldMode::new()),
284 };
285
286 match self.data_buf.last() {
287 Some(b'\n') => {
288 self.data_buf.pop();
289 }
290 Some(_) => {}
291 None => {
292 self.event_buf.clear();
293 return None;
294 }
295 }
296
297 let data = String::from_utf8_lossy(&self.data_buf).into_owned();
298 self.data_buf.clear();
299
300 let event = match &*self.event_buf {
301 b"" => Cow::Borrowed("message"),
302 event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
303 };
304 self.event_buf.clear();
305
306 Some(SseEvent::Message(MessageEvent {
307 data,
308 event,
309 last_event_id: self.last_event_id.clone(),
310 }))
311 }
312
313 /// Consumes bytes from the provided buffer and attempts to yield an event.
314 ///
315 /// The decoder does not store unparsed bytes internally. It reads directly
316 /// from the provided buffer, advancing the buffer's cursor only for the bytes
317 /// it successfully parses.
318 ///
319 /// If `Ok(None)` is returned, the provided buffer has been exhausted and
320 /// more bytes are needed to complete the current event. You should fetch more
321 /// data, append it to your buffer, and call `next()` again.
322 ///
323 /// # Example
324 /// ```
325 /// use bytes::{Buf, Bytes};
326 /// # use sse_core::{SseDecoder, SseEvent};
327 ///
328 /// let mut decoder = SseDecoder::new();
329 /// let mut buffer = Bytes::from("data: hello\n\n");
330 ///
331 /// // Call next() in a loop to drain all available events
332 /// while let Some(event) = decoder.next(&mut buffer).unwrap() {
333 /// println!("Received: {:?}", event);
334 /// }
335 ///
336 /// // When next() returns None, the decoder is waiting for more data.
337 /// assert!(buffer.is_empty());
338 /// ```
339 ///
340 /// # Errors
341 ///
342 /// Returns a [`PayloadTooLargeError`] if a single field (like data or event name)
343 /// exceeds the maximum payload size limit configured for this decoder.
344 pub fn next(&mut self, buf: &mut impl Buf) -> Result<Option<SseEvent>, PayloadTooLargeError> {
345 // # 9.2.5 Parsing an event stream
346 //
347 // stream = [ bom ] *event
348 // event = *( comment / field ) end-of-line
349 // comment = colon *any-char end-of-line
350 // field = 1*name-char [ colon [ space ] *any-char ] end-of-line
351 // end-of-line = ( cr lf / cr / lf )
352 //
353 // ; characters
354 // lf = %x000A ; U+000A LINE FEED (LF)
355 // cr = %x000D ; U+000D CARRIAGE RETURN (CR)
356 // space = %x0020 ; U+0020 SPACE
357 // colon = %x003A ; U+003A COLON (:)
358 // bom = %xFEFF ; U+FEFF BYTE ORDER MARK
359 // name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
360 // ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
361 // any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
362 // ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
363
364 loop {
365 let chunk = buf.chunk();
366 if chunk.is_empty() {
367 return Ok(None);
368 }
369
370 match &mut self.mode {
371 Mode::Bom { bytes_read } => {
372 let b0 = chunk[0];
373
374 const BOM: &[u8; 3] = b"\xef\xbb\xbf";
375
376 if b0 != BOM[*bytes_read as usize] {
377 self.mode = match *bytes_read {
378 0 => Mode::Field(FieldMode::new()),
379 _ => Mode::Ignore,
380 };
381 continue;
382 }
383
384 buf.advance(1);
385 *bytes_read += 1;
386
387 if BOM.len() <= *bytes_read as usize {
388 self.mode = Mode::Field(FieldMode::new());
389 }
390 }
391 Mode::Field(field) => {
392 let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
393 if !field.try_extend(chunk) {
394 self.mode = Mode::Ignore;
395 }
396 buf.advance(chunk.len());
397 continue;
398 };
399
400 let subchunk = &chunk[..field_end];
401 let b0 = chunk[field_end];
402
403 if !field.try_extend(subchunk) {
404 self.mode = Mode::Ignore;
405 buf.advance(subchunk.len());
406 continue;
407 }
408
409 buf.advance(subchunk.len() + 1);
410
411 let value = match field.as_slice() {
412 b"data" => ValueMode::Data,
413 b"event" => {
414 self.event_buf.clear();
415 ValueMode::Event
416 }
417 b"retry" => {
418 self.retry_buf = None;
419 ValueMode::Retry
420 }
421 b"id" => {
422 self.last_event_id_buf.clear();
423 ValueMode::Id
424 }
425 b"" => match b0 {
426 b':' => {
427 self.mode = Mode::Ignore;
428 continue;
429 }
430 b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
431 Some(ev) => return Ok(Some(ev)),
432 None => continue,
433 },
434 _ => unreachable!(),
435 },
436 _ => {
437 self.mode = Mode::Ignore;
438 continue;
439 }
440 };
441
442 match b0 {
443 b'\n' => self.mode = Mode::Field(FieldMode::new()),
444 b'\r' => self.mode = Mode::PostCr,
445 b':' => {
446 self.mode = Mode::PostColon(value);
447 continue;
448 }
449 _ => unreachable!(),
450 }
451
452 match value {
453 ValueMode::Data => self.data_buf.push(b'\n'),
454 ValueMode::Id => self.last_event_id_buf.clear(),
455 ValueMode::Event | ValueMode::Retry => {}
456 }
457 }
458 Mode::Value(ValueMode::Retry) => {
459 let mut advanced = 0;
460 let mut return_event = false;
461
462 for &b in chunk {
463 advanced += 1;
464 match b {
465 b'0'..=b'9' => {
466 let digit = (b & 0xf) as _;
467
468 let retry_buf = self.retry_buf.unwrap_or(0);
469 let Some(retry_buf) = retry_buf.checked_mul(10) else {
470 self.mode = Mode::Ignore;
471 break;
472 };
473 let Some(retry_buf) = retry_buf.checked_add(digit) else {
474 self.mode = Mode::Ignore;
475 break;
476 };
477 self.retry_buf = Some(retry_buf);
478 }
479 b'\r' => {
480 self.mode = Mode::PostCr;
481 return_event = true;
482 break;
483 }
484 b'\n' => {
485 self.mode = Mode::Field(FieldMode::new());
486 return_event = true;
487 break;
488 }
489 _ => {
490 self.mode = Mode::Ignore;
491 break;
492 }
493 }
494 }
495
496 buf.advance(advanced);
497
498 if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
499 return Ok(Some(SseEvent::Retry(retry_buf)));
500 }
501 }
502 Mode::Value(ValueMode::Data) => {
503 if consume_until_newline(
504 &mut self.mode,
505 Some(&mut self.data_buf),
506 self.max_payload_size,
507 buf,
508 )? {
509 self.data_buf.push(b'\n');
510 }
511 }
512 Mode::Value(ValueMode::Event) => {
513 consume_until_newline(
514 &mut self.mode,
515 Some(&mut self.event_buf),
516 self.max_payload_size,
517 buf,
518 )?;
519 }
520 Mode::Value(ValueMode::Id) => {
521 if consume_until_newline(
522 &mut self.mode,
523 Some(&mut self.last_event_id_buf),
524 self.max_payload_size,
525 buf,
526 )? && memchr(0, &self.last_event_id_buf).is_none()
527 {
528 self.staged_last_event_id = match &*self.last_event_id_buf {
529 [] => None,
530 buf => Some(String::from_utf8_lossy(buf).into()),
531 };
532 }
533 }
534 Mode::Ignore => {
535 consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
536 .expect("there should be no payload to grow too large");
537 }
538 Mode::PostCr => {
539 if chunk[0] == b'\n' {
540 buf.advance(1);
541 }
542 self.mode = Mode::Field(FieldMode::new());
543 }
544 Mode::PostColon(value) => {
545 if chunk[0] == b' ' {
546 buf.advance(1);
547 }
548 self.mode = Mode::Value(*value);
549 }
550 }
551 }
552 }
553}
554
555impl Default for SseDecoder {
556 fn default() -> Self {
557 Self::new()
558 }
559}
560
561impl fmt::Debug for SseDecoder {
562 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563 f.debug_struct("SseDecoder")
564 .field("mode", &self.mode)
565 .field(
566 "last_event_id",
567 &self.last_event_id.as_deref().map(ShowBigStr),
568 )
569 .field(
570 "staged_last_event_id",
571 &self.staged_last_event_id.as_deref().map(ShowBigStr),
572 )
573 .field("last_event_id_buf", &ShowBigBuf(&self.last_event_id_buf))
574 .field("event_buf", &ShowBigBuf(&self.event_buf))
575 .field("data_buf", &ShowBigBuf(&self.data_buf))
576 .field("retry_buf", &self.retry_buf)
577 .field("max_payload_size", &self.max_payload_size)
578 .finish()
579 }
580}
581
582fn consume_until_newline(
583 mode: &mut Mode,
584 mut out: Option<&mut Vec<u8>>,
585 max_size: NonZeroUsize,
586 buf: &mut impl Buf,
587) -> Result<bool, PayloadTooLargeError> {
588 loop {
589 let chunk = buf.chunk();
590 if chunk.is_empty() {
591 return Ok(false);
592 };
593
594 let Some(i) = memchr2(b'\r', b'\n', chunk) else {
595 if let Some(out) = out.as_deref_mut() {
596 if max_size.get() < out.len() + chunk.len() {
597 *mode = Mode::Ignore;
598 return Err(PayloadTooLargeError);
599 }
600 out.extend_from_slice(chunk);
601 }
602 buf.advance(chunk.len());
603 continue;
604 };
605
606 if let Some(out) = out {
607 if max_size.get() < out.len() + i {
608 *mode = Mode::Ignore;
609 return Err(PayloadTooLargeError);
610 }
611 out.extend_from_slice(&chunk[..i]);
612 }
613
614 *mode = match chunk[i] {
615 b'\r' => Mode::PostCr,
616 b'\n' => Mode::Field(FieldMode::new()),
617 _ => unreachable!(),
618 };
619
620 buf.advance(i + 1);
621
622 return Ok(true);
623 }
624}
625
626#[test]
627fn hard_parse() -> Result<(), PayloadTooLargeError> {
628 use core::slice;
629
630 // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
631 let bytes = "
632
633:
634
635event: my-event\r
636data:line1
637data: line2
638:
639id: my-id
640:should be ignored too\rretry:42
641retry:
642
643data:second
644
645data:ignored
646";
647
648 let mut decoder = SseDecoder::new();
649
650 let events = bytes
651 .bytes()
652 .filter_map(|b| decoder.next(&mut slice::from_ref(&b)).transpose())
653 .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
654
655 let id = Some("my-id".into());
656
657 assert_eq!(
658 events,
659 &[
660 SseEvent::Retry(42),
661 SseEvent::Message(MessageEvent {
662 event: "my-event".into(),
663 data: "line1\nline2".into(),
664 last_event_id: id.clone()
665 }),
666 SseEvent::Message(MessageEvent {
667 event: "message".into(),
668 data: "second".into(),
669 last_event_id: id.clone()
670 })
671 ]
672 );
673 Ok(())
674}