1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{fmt, num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12 pub event: Cow<'static, str>,
14 pub data: String,
16 pub last_event_id: Option<Arc<str>>,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24 Message(MessageEvent),
26 Retry(u32),
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36const MAX_DEBUG_SIZE: usize = 200;
37
38struct ShowBigStr<'a>(&'a str);
39
40impl fmt::Debug for ShowBigStr<'_> {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 let mut end = self.0.len().min(MAX_DEBUG_SIZE);
43 while !self.0.is_char_boundary(end) {
44 end -= 1;
45 }
46 let s = &self.0[..end];
47
48 fmt::Debug::fmt(s, f)?;
49 if end < self.0.len() {
50 write!(f, "... ({} bytes total)", self.0.len())?;
51 }
52
53 Ok(())
54 }
55}
56
57struct ShowBigBuf<'a>(&'a [u8]);
58
59impl fmt::Debug for ShowBigBuf<'_> {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 let (buf, truncated) = match self.0.len() {
62 ..=MAX_DEBUG_SIZE => (self.0, false),
63 _ => (&self.0[..MAX_DEBUG_SIZE], true),
64 };
65
66 let mut chunks = buf.utf8_chunks().peekable();
67
68 f.write_str("\"")?;
69 while let Some(chunk) = chunks.next() {
70 fmt::Display::fmt(&chunk.valid().escape_debug(), f)?;
71
72 let invalid = chunk.invalid();
73 if invalid.is_empty() {
74 continue;
75 }
76
77 if truncated && chunks.peek().is_none() {
80 break;
81 }
82
83 for &byte in invalid {
84 write!(f, "\\x{byte:02X}")?;
85 }
86 }
87
88 if truncated {
89 write!(f, "\"... ({} bytes total)", self.0.len())?;
90 } else {
91 f.write_str("\"")?;
92 }
93
94 Ok(())
95 }
96}
97
98#[derive(Clone, Copy, Default)]
99struct FieldMode {
100 len: u8,
101 buf: [u8; 5],
102}
103
104impl FieldMode {
105 #[inline]
106 const fn new() -> Self {
107 Self {
108 len: 0,
109 buf: [0; 5],
110 }
111 }
112
113 #[inline]
114 fn try_extend(&mut self, src: &[u8]) -> bool {
115 let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
116 return false;
117 };
118 dst.copy_from_slice(src);
119 self.len += src.len() as u8;
120 true
121 }
122
123 #[inline]
124 fn as_slice(&self) -> &[u8] {
125 &self.buf[..self.len as usize]
126 }
127}
128
129impl fmt::Debug for FieldMode {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.debug_tuple("FieldMode")
132 .field(&ShowBigBuf(self.as_slice()))
133 .finish()
134 }
135}
136
137#[derive(Debug, Clone, Copy)]
138enum ValueMode {
139 Data,
140 Event,
141 Retry,
142 Id,
143}
144
145#[derive(Debug, Clone, Copy)]
146enum Mode {
147 Bom { bytes_read: u8 },
148 Field(FieldMode),
149 Value(ValueMode),
150 Ignore,
151 PostCr,
152 PostColon(ValueMode),
153}
154
155#[derive(Clone)]
160pub struct SseDecoder {
161 mode: Mode,
162 last_event_id: Option<Arc<str>>,
163 staged_last_event_id: Option<Arc<str>>,
164 last_event_id_buf: Vec<u8>,
165 event_buf: Vec<u8>,
166 data_buf: Vec<u8>,
167 retry_buf: Option<u32>,
168 max_payload_size: NonZeroUsize,
169 corrupted: bool,
170}
171
172impl SseDecoder {
173 #[inline]
189 #[must_use]
190 pub fn new() -> Self {
191 Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
192 }
193
194 #[inline]
218 #[must_use]
219 pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
220 Self {
221 mode: Mode::Bom { bytes_read: 0 },
222 last_event_id: None,
223 staged_last_event_id: None,
224 last_event_id_buf: vec![],
225 event_buf: vec![],
226 data_buf: vec![],
227 retry_buf: None,
228 max_payload_size,
229 corrupted: false,
230 }
231 }
232
233 #[inline]
235 #[must_use]
236 pub fn last_event_id(&self) -> Option<&Arc<str>> {
237 self.last_event_id.as_ref()
238 }
239
240 #[inline]
251 pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
252 self.last_event_id = id;
253 self.reconnect();
254 }
255
256 #[inline]
265 pub fn clear(&mut self) {
266 self.reconnect_with_id(None);
267 }
268
269 #[inline]
278 pub fn reconnect(&mut self) {
279 self.mode = Mode::Bom { bytes_read: 0 };
280 self.clear_bufs();
281 self.corrupted = false;
282 }
283
284 fn mark_corrupted(&mut self) {
285 self.clear_bufs();
286 self.corrupted = true
287 }
288
289 #[inline]
290 fn clear_bufs(&mut self) {
291 self.data_buf.clear();
292 self.event_buf.clear();
293 self.last_event_id_buf.clear();
294 self.staged_last_event_id = self.last_event_id.clone();
295 }
296
297 fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
298 self.mode = match cr {
299 true => Mode::PostCr,
300 false => Mode::Field(FieldMode::new()),
301 };
302
303 if self.corrupted {
304 self.corrupted = false;
305 return None;
307 }
308
309 self.last_event_id = self.staged_last_event_id.clone();
310
311 match self.data_buf.last() {
312 Some(b'\n') => {
313 self.data_buf.pop();
314 }
315 Some(_) => {}
316 None => {
317 self.event_buf.clear();
318 return None;
319 }
320 }
321
322 let data = String::from_utf8_lossy(&self.data_buf).into_owned();
323 self.data_buf.clear();
324
325 let event = match &*self.event_buf {
326 b"" => Cow::Borrowed("message"),
327 event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
328 };
329 self.event_buf.clear();
330
331 Some(SseEvent::Message(MessageEvent {
332 data,
333 event,
334 last_event_id: self.last_event_id.clone(),
335 }))
336 }
337
338 pub fn next(&mut self, buf: &mut impl Buf) -> Option<Result<SseEvent, PayloadTooLargeError>> {
370 loop {
390 let chunk = buf.chunk();
391 if chunk.is_empty() {
392 return None;
393 }
394
395 match &mut self.mode {
396 Mode::Bom { bytes_read } => {
397 let b0 = chunk[0];
398
399 const BOM: &[u8; 3] = b"\xef\xbb\xbf";
400
401 if b0 != BOM[*bytes_read as usize] {
402 self.mode = match *bytes_read {
403 0 => Mode::Field(FieldMode::new()),
404 _ => Mode::Ignore,
405 };
406 continue;
407 }
408
409 buf.advance(1);
410 *bytes_read += 1;
411
412 if BOM.len() <= *bytes_read as usize {
413 self.mode = Mode::Field(FieldMode::new());
414 }
415 }
416 Mode::Field(field) => {
417 let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
418 if !field.try_extend(chunk) {
419 self.mode = Mode::Ignore;
420 }
421 buf.advance(chunk.len());
422 continue;
423 };
424
425 let subchunk = &chunk[..field_end];
426 let b0 = chunk[field_end];
427
428 if !field.try_extend(subchunk) {
429 self.mode = Mode::Ignore;
430 buf.advance(subchunk.len());
431 continue;
432 }
433
434 buf.advance(subchunk.len() + 1);
435
436 let value = match field.as_slice() {
437 b"data" if !self.corrupted => ValueMode::Data,
438 b"event" if !self.corrupted => {
439 self.event_buf.clear();
440 ValueMode::Event
441 }
442 b"id" if !self.corrupted => {
443 self.last_event_id_buf.clear();
444 ValueMode::Id
445 }
446 b"retry" => {
447 self.retry_buf = None;
448 ValueMode::Retry
449 }
450 b"" => match b0 {
451 b':' => {
452 self.mode = Mode::Ignore;
453 continue;
454 }
455 b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
456 Some(ev) => return Some(Ok(ev)),
457 None => continue,
458 },
459 _ => unreachable!(),
460 },
461 _ => {
462 self.mode = Mode::Ignore;
463 continue;
464 }
465 };
466
467 match b0 {
468 b'\n' => self.mode = Mode::Field(FieldMode::new()),
469 b'\r' => self.mode = Mode::PostCr,
470 b':' => {
471 self.mode = Mode::PostColon(value);
472 continue;
473 }
474 _ => unreachable!(),
475 }
476
477 match value {
478 ValueMode::Data => self.data_buf.push(b'\n'),
479 ValueMode::Id => self.last_event_id_buf.clear(),
480 ValueMode::Event | ValueMode::Retry => {}
481 }
482 }
483 Mode::Value(ValueMode::Retry) => {
484 let mut advanced = 0;
485 let mut return_event = false;
486
487 for &b in chunk {
488 advanced += 1;
489 match b {
490 b'0'..=b'9' => {
491 let digit = (b & 0xf) as _;
492
493 let retry_buf = self.retry_buf.unwrap_or(0);
494 let Some(retry_buf) = retry_buf.checked_mul(10) else {
495 self.mode = Mode::Ignore;
496 break;
497 };
498 let Some(retry_buf) = retry_buf.checked_add(digit) else {
499 self.mode = Mode::Ignore;
500 break;
501 };
502 self.retry_buf = Some(retry_buf);
503 }
504 b'\r' => {
505 self.mode = Mode::PostCr;
506 return_event = true;
507 break;
508 }
509 b'\n' => {
510 self.mode = Mode::Field(FieldMode::new());
511 return_event = true;
512 break;
513 }
514 _ => {
515 self.mode = Mode::Ignore;
516 break;
517 }
518 }
519 }
520
521 buf.advance(advanced);
522
523 if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
524 return Some(Ok(SseEvent::Retry(retry_buf)));
525 }
526 }
527 Mode::Value(ValueMode::Data) => {
528 match consume_until_newline(
529 &mut self.mode,
530 Some(&mut self.data_buf),
531 self.max_payload_size,
532 buf,
533 ) {
534 Ok(true) => self.data_buf.push(b'\n'),
535 Ok(false) => {}
536 Err(err) => {
537 self.mark_corrupted();
538 return Some(Err(err));
539 }
540 }
541 }
542 Mode::Value(ValueMode::Event) => {
543 if let Err(err) = consume_until_newline(
544 &mut self.mode,
545 Some(&mut self.event_buf),
546 self.max_payload_size,
547 buf,
548 ) {
549 self.mark_corrupted();
550 return Some(Err(err));
551 }
552 }
553 Mode::Value(ValueMode::Id) => {
554 match consume_until_newline(
555 &mut self.mode,
556 Some(&mut self.last_event_id_buf),
557 self.max_payload_size,
558 buf,
559 ) {
560 Ok(true) => {
561 if memchr(0, &self.last_event_id_buf).is_none() {
562 self.staged_last_event_id = match &*self.last_event_id_buf {
563 [] => None,
564 buf => Some(String::from_utf8_lossy(buf).into()),
565 };
566 }
567 self.last_event_id_buf.clear();
568 }
569 Ok(false) => {}
570 Err(err) => {
571 self.mark_corrupted();
572 return Some(Err(err));
573 }
574 }
575 }
576 Mode::Ignore => {
577 consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
578 .expect("there should be no payload to grow too large");
579 }
580 Mode::PostCr => {
581 if chunk[0] == b'\n' {
582 buf.advance(1);
583 }
584 self.mode = Mode::Field(FieldMode::new());
585 }
586 Mode::PostColon(value) => {
587 if chunk[0] == b' ' {
588 buf.advance(1);
589 }
590 self.mode = Mode::Value(*value);
591 }
592 }
593 }
594 }
595}
596
597impl Default for SseDecoder {
598 fn default() -> Self {
599 Self::new()
600 }
601}
602
603impl fmt::Debug for SseDecoder {
604 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
605 f.debug_struct("SseDecoder")
606 .field("mode", &self.mode)
607 .field(
608 "last_event_id",
609 &self.last_event_id.as_deref().map(ShowBigStr),
610 )
611 .field(
612 "staged_last_event_id",
613 &self.staged_last_event_id.as_deref().map(ShowBigStr),
614 )
615 .field("last_event_id_buf", &ShowBigBuf(&self.last_event_id_buf))
616 .field("event_buf", &ShowBigBuf(&self.event_buf))
617 .field("data_buf", &ShowBigBuf(&self.data_buf))
618 .field("retry_buf", &self.retry_buf)
619 .field("max_payload_size", &self.max_payload_size)
620 .finish()
621 }
622}
623
624fn consume_until_newline(
625 mode: &mut Mode,
626 mut out: Option<&mut Vec<u8>>,
627 max_size: NonZeroUsize,
628 buf: &mut impl Buf,
629) -> Result<bool, PayloadTooLargeError> {
630 loop {
631 let chunk = buf.chunk();
632 if chunk.is_empty() {
633 return Ok(false);
634 };
635
636 let Some(i) = memchr2(b'\r', b'\n', chunk) else {
637 if let Some(out) = out.as_deref_mut() {
638 if max_size.get() < out.len() + chunk.len() {
639 out.clear();
640 *mode = Mode::Ignore;
641 return Err(PayloadTooLargeError);
642 }
643 out.extend_from_slice(chunk);
644 }
645 buf.advance(chunk.len());
646 continue;
647 };
648
649 if let Some(out) = out {
650 if max_size.get() < out.len() + i {
651 out.clear();
652 *mode = Mode::Ignore;
653 return Err(PayloadTooLargeError);
654 }
655 out.extend_from_slice(&chunk[..i]);
656 }
657
658 *mode = match chunk[i] {
659 b'\r' => Mode::PostCr,
660 b'\n' => Mode::Field(FieldMode::new()),
661 _ => unreachable!(),
662 };
663
664 buf.advance(i + 1);
665
666 return Ok(true);
667 }
668}
669
670#[test]
671fn hard_parse() -> Result<(), PayloadTooLargeError> {
672 use core::slice;
673
674 let bytes = "\u{FEFF}data: x
676
677:
678
679event: my-event\r
680data:line1
681data: line2
682:
683id: my-id
684:should be ignored too\rretry:42
685retry:
686
687data:second
688
689data:ignored
690";
691
692 let mut decoder = SseDecoder::new();
693
694 let events = bytes
695 .bytes()
696 .filter_map(|b| decoder.next(&mut slice::from_ref(&b)))
697 .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
698
699 let id = Some("my-id".into());
700
701 assert_eq!(
702 events,
703 &[
704 SseEvent::Message(MessageEvent {
705 event: "message".into(),
706 data: "x".into(),
707 last_event_id: None
708 }),
709 SseEvent::Retry(42),
710 SseEvent::Message(MessageEvent {
711 event: "my-event".into(),
712 data: "line1\nline2".into(),
713 last_event_id: id.clone()
714 }),
715 SseEvent::Message(MessageEvent {
716 event: "message".into(),
717 data: "second".into(),
718 last_event_id: id.clone()
719 })
720 ]
721 );
722 Ok(())
723}
724
725#[test]
726fn test_reconnect() {
727 let mut stream1: &[u8] = b"
728id: my-id
729
730event: my-event
731data:line1
732:
733data: line2
734id: ignored1
735";
736
737 let mut stream2: &[u8] = b"
738
739data: data
740
741id: final
742
743id: ignored2
744";
745
746 let my_id = Some("my-id".into());
747
748 let mut decoder = SseDecoder::new();
749
750 assert_eq!(decoder.next(&mut stream1), None);
751 assert_eq!(decoder.last_event_id(), my_id.as_ref());
752 assert!(stream1.is_empty());
753
754 decoder.reconnect();
755
756 assert_eq!(
758 decoder.next(&mut stream2),
759 Some(Ok(SseEvent::Message(MessageEvent {
760 event: "message".into(),
761 data: "data".into(),
762 last_event_id: my_id,
763 }))),
764 );
765 assert_eq!(decoder.next(&mut stream2), None);
766 assert_eq!(decoder.last_event_id().map(|id| &**id), Some("final"));
767 assert!(stream2.is_empty());
768}
769
770#[test]
771fn test_limits() {
772 let mut stream: &[u8] = b"
773data: 0123456789
774id: my-id
775
776id: 01234567890
777data: thing
778event: ev
779
780data: mid
781
782event: jojo
783id: ignored
784data: 01234567890
785retry: 10
786
787event: final
788data:
789
790";
791
792 let my_id = Some("my-id".into());
793
794 let mut decoder = SseDecoder::with_limit(NonZeroUsize::new(10).unwrap());
795
796 assert_eq!(
797 decoder.next(&mut stream),
798 Some(Ok(SseEvent::Message(MessageEvent {
799 event: "message".into(),
800 data: "0123456789".into(),
801 last_event_id: my_id.clone(),
802 })))
803 );
804 assert_eq!(decoder.next(&mut stream), Some(Err(PayloadTooLargeError)));
805 assert_eq!(
806 decoder.next(&mut stream),
807 Some(Ok(SseEvent::Message(MessageEvent {
808 event: "message".into(),
809 data: "mid".into(),
810 last_event_id: my_id.clone()
811 })))
812 );
813 assert_eq!(decoder.next(&mut stream), Some(Err(PayloadTooLargeError)));
814 assert_eq!(decoder.next(&mut stream), Some(Ok(SseEvent::Retry(10))));
815 assert_eq!(
816 decoder.next(&mut stream),
817 Some(Ok(SseEvent::Message(MessageEvent {
818 event: "final".into(),
819 data: "".into(),
820 last_event_id: my_id.clone()
821 })))
822 );
823 assert!(stream.is_empty());
824}