sse_core/decode.rs
1use alloc::{borrow::Cow, string::String, sync::Arc, vec, vec::Vec};
2use core::{num::NonZeroUsize, str};
3use thiserror::Error;
4
5use bytes::Buf;
6use memchr::{memchr, memchr2, memchr3};
7
8/// Represents a single Server-Sent Event message.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
11pub struct MessageEvent {
12 /// The event name (defaults to `"message"`).
13 pub event: Cow<'static, str>,
14 /// The payload data.
15 pub data: String,
16 /// The `Last-Event-ID` sent by the server, if any.
17 pub last_event_id: Option<Arc<str>>,
18}
19
20/// Commands and payloads yielded by the SSE stream.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
23pub enum SseEvent {
24 /// A standard data message.
25 Message(MessageEvent),
26 /// A server request to change the client's reconnect time (in milliseconds).
27 Retry(u32),
28}
29
30/// Error indicating that a parsed field exceeded the maximum allowed buffer size.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash, Error)]
32#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
33#[error("Payload exceeded the allotted buffer size limit")]
34pub struct PayloadTooLargeError;
35
36#[derive(Debug, Clone, Copy, Default)]
37struct FieldMode {
38 len: u8,
39 buf: [u8; 5],
40}
41
42impl FieldMode {
43 #[inline]
44 const fn new() -> Self {
45 Self {
46 len: 0,
47 buf: [0; 5],
48 }
49 }
50
51 #[inline]
52 fn try_extend(&mut self, src: &[u8]) -> bool {
53 let Some(dst) = (self.buf).get_mut(self.len as usize..self.len as usize + src.len()) else {
54 return false;
55 };
56 dst.copy_from_slice(src);
57 self.len += src.len() as u8;
58 true
59 }
60
61 #[inline]
62 fn as_slice(&self) -> &[u8] {
63 &self.buf[..self.len as usize]
64 }
65}
66
67#[derive(Debug, Clone, Copy)]
68enum ValueMode {
69 Data,
70 Event,
71 Retry,
72 Id,
73}
74
75#[derive(Debug, Clone, Copy)]
76enum Mode {
77 Bom { bytes_read: u8 },
78 Field(FieldMode),
79 Value(ValueMode),
80 Ignore,
81 PostCr,
82 PostColon(ValueMode),
83}
84
85/// The core state-machine parser for SSE.
86///
87/// This decoder does not perform any I/O. It consumes bytes from a given buffer
88/// and yields parsed [`SseEvent`]s. It is suitable for `no_std` environments.
89#[derive(Debug, Clone)]
90pub struct SseDecoder {
91 mode: Mode,
92 last_event_id: Option<Arc<str>>,
93 staged_last_event_id: Option<Arc<str>>,
94 last_event_id_buf: Vec<u8>,
95 event_buf: Vec<u8>,
96 data_buf: Vec<u8>,
97 retry_buf: Option<u32>,
98 max_payload_size: NonZeroUsize,
99}
100
101impl Default for SseDecoder {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl SseDecoder {
108 /// Creates a new decoder with the default payload size limit of 512KiB.
109 ///
110 /// # Example
111 /// ```rust
112 /// # use bytes::{Buf, Bytes};
113 /// # use sse_core::{SseDecoder, SseEvent};
114 /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
115 /// let mut decoder = SseDecoder::new();
116 /// let mut buf = Bytes::from("data: standard stream\n\n");
117 ///
118 /// let event = decoder.next(&mut buf)?;
119 /// assert!(event.is_some());
120 /// # Ok(())
121 /// # }
122 /// ```
123 #[inline]
124 #[must_use]
125 pub fn new() -> Self {
126 Self::with_limit(NonZeroUsize::new(512 * 1024).unwrap())
127 }
128
129 /// Creates a new decoder with a custom maximum payload size limit.
130 ///
131 /// This is useful in memory-constrained environments or when connecting to
132 /// untrusted servers to prevent memory exhaustion from infinitely long lines.
133 ///
134 /// # Example
135 /// ```rust
136 /// # use core::num::NonZeroUsize;
137 /// # use bytes::Bytes;
138 /// # use sse_core::{SseDecoder, SseEvent};
139 /// # fn main() -> Result<(), sse_core::PayloadTooLargeError> {
140 /// // Create a strict decoder that rejects payloads over 1024 bytes
141 /// let limit = NonZeroUsize::new(1024).unwrap();
142 /// let mut decoder = SseDecoder::with_limit(limit);
143 ///
144 /// let mut buf = Bytes::from("data: small payload\n\n");
145 /// let _event = decoder.next(&mut buf)?;
146 /// # Ok(())
147 /// # }
148 /// ```
149 #[inline]
150 #[must_use]
151 pub fn with_limit(max_payload_size: NonZeroUsize) -> Self {
152 Self {
153 mode: Mode::Bom { bytes_read: 0 },
154 last_event_id: None,
155 staged_last_event_id: None,
156 last_event_id_buf: vec![],
157 event_buf: vec![],
158 data_buf: vec![],
159 retry_buf: None,
160 max_payload_size,
161 }
162 }
163
164 /// Returns the current `Last-Event-ID` known to the decoder, if any.
165 #[inline]
166 #[must_use]
167 pub fn last_event_id(&self) -> Option<&Arc<str>> {
168 self.last_event_id.as_ref()
169 }
170
171 /// Resets the decoder state for a new connection, explicitly overriding
172 /// the currently tracked `Last-Event-ID`.
173 ///
174 /// This method clears all internal byte buffers and resets the parser, but
175 /// instead of keeping the previous ID (like [`reconnect()`](Self::reconnect))
176 /// or dropping it (like [`clear()`](Self::clear)), it injects the provided ID.
177 ///
178 /// It is typically used to prime the state machine with a known ID
179 /// (e.g., from a local database) right before feeding the decoder bytes
180 /// from a newly established connection.
181 #[inline]
182 pub fn reconnect_with_id(&mut self, id: Option<Arc<str>>) {
183 self.last_event_id = id;
184 self.reconnect();
185 }
186
187 /// Resets the decoder state completely, dropping the current `Last-Event-ID`.
188 ///
189 /// This clears all internal byte buffers and purges the parser's state,
190 /// effectively starting fresh. Because it drops the `Last-Event-ID`, the
191 /// next connection will start from the present moment rather than resuming.
192 ///
193 /// * To reset the state but **keep** the current ID, use [`reconnect()`](Self::reconnect).
194 /// * To reset the state and **inject** a specific ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
195 #[inline]
196 pub fn clear(&mut self) {
197 self.reconnect_with_id(None);
198 }
199
200 /// Resets the buffer state for a new connection while retaining the `Last-Event-ID`.
201 ///
202 /// This clears the internal byte buffers to prepare for a fresh stream of data,
203 /// but safely preserves the most recently parsed `Last-Event-ID`. This ensures
204 /// that when you reconnect to the server, you can resume exactly where you left off.
205 ///
206 /// * To reset the state and **drop** the ID, use [`clear()`](Self::clear).
207 /// * To reset the state and **override** the ID, use [`reconnect_with_id()`](Self::reconnect_with_id).
208 #[inline]
209 pub fn reconnect(&mut self) {
210 self.mode = Mode::Bom { bytes_read: 0 };
211 self.data_buf.clear();
212 }
213
214 fn dispatch(&mut self, cr: bool) -> Option<SseEvent> {
215 self.last_event_id = self.staged_last_event_id.clone();
216
217 self.mode = match cr {
218 true => Mode::PostCr,
219 false => Mode::Field(FieldMode::new()),
220 };
221
222 match self.data_buf.last() {
223 Some(b'\n') => {
224 self.data_buf.pop();
225 }
226 Some(_) => {}
227 None => {
228 self.event_buf.clear();
229 return None;
230 }
231 }
232
233 let data = String::from_utf8_lossy(&self.data_buf).into_owned();
234 self.data_buf.clear();
235
236 let event = match &*self.event_buf {
237 b"" => Cow::Borrowed("message"),
238 event_buf => Cow::Owned(String::from_utf8_lossy(event_buf).into_owned()),
239 };
240 self.event_buf.clear();
241
242 Some(SseEvent::Message(MessageEvent {
243 data,
244 event,
245 last_event_id: self.last_event_id.clone(),
246 }))
247 }
248
249 /// Consumes bytes from the provided buffer and attempts to yield an event.
250 ///
251 /// The decoder does not store unparsed bytes internally. It reads directly
252 /// from the provided buffer, advancing the buffer's cursor only for the bytes
253 /// it successfully parses.
254 ///
255 /// If `Ok(None)` is returned, the provided buffer has been exhausted and
256 /// more bytes are needed to complete the current event. You should fetch more
257 /// data, append it to your buffer, and call `next()` again.
258 ///
259 /// # Example
260 /// ```
261 /// use bytes::{Buf, Bytes};
262 /// # use sse_core::{SseDecoder, SseEvent};
263 ///
264 /// let mut decoder = SseDecoder::new();
265 /// let mut buffer = Bytes::from("data: hello\n\n");
266 ///
267 /// // Call next() in a loop to drain all available events
268 /// while let Some(event) = decoder.next(&mut buffer).unwrap() {
269 /// println!("Received: {:?}", event);
270 /// }
271 ///
272 /// // When next() returns None, the decoder is waiting for more data.
273 /// assert!(buffer.is_empty());
274 /// ```
275 ///
276 /// # Errors
277 ///
278 /// Returns a [`PayloadTooLargeError`] if a single field (like data or event name)
279 /// exceeds the maximum payload size limit configured for this decoder.
280 pub fn next(&mut self, buf: &mut impl Buf) -> Result<Option<SseEvent>, PayloadTooLargeError> {
281 // # 9.2.5 Parsing an event stream
282 //
283 // stream = [ bom ] *event
284 // event = *( comment / field ) end-of-line
285 // comment = colon *any-char end-of-line
286 // field = 1*name-char [ colon [ space ] *any-char ] end-of-line
287 // end-of-line = ( cr lf / cr / lf )
288 //
289 // ; characters
290 // lf = %x000A ; U+000A LINE FEED (LF)
291 // cr = %x000D ; U+000D CARRIAGE RETURN (CR)
292 // space = %x0020 ; U+0020 SPACE
293 // colon = %x003A ; U+003A COLON (:)
294 // bom = %xFEFF ; U+FEFF BYTE ORDER MARK
295 // name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
296 // ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
297 // any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
298 // ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
299
300 loop {
301 let chunk = buf.chunk();
302 if chunk.is_empty() {
303 return Ok(None);
304 }
305
306 match &mut self.mode {
307 Mode::Bom { bytes_read } => {
308 let b0 = chunk[0];
309
310 const BOM: &[u8; 3] = b"\xef\xbb\xbf";
311
312 if b0 != BOM[*bytes_read as usize] {
313 self.mode = match *bytes_read {
314 0 => Mode::Field(FieldMode::new()),
315 _ => Mode::Ignore,
316 };
317 continue;
318 }
319
320 buf.advance(1);
321 *bytes_read += 1;
322
323 if BOM.len() <= *bytes_read as usize {
324 self.mode = Mode::Field(FieldMode::new());
325 }
326 }
327 Mode::Field(field) => {
328 let Some(field_end) = memchr3(b':', b'\r', b'\n', chunk) else {
329 if !field.try_extend(chunk) {
330 self.mode = Mode::Ignore;
331 }
332 buf.advance(chunk.len());
333 continue;
334 };
335
336 let subchunk = &chunk[..field_end];
337 let b0 = chunk[field_end];
338
339 if !field.try_extend(subchunk) {
340 self.mode = Mode::Ignore;
341 buf.advance(subchunk.len());
342 continue;
343 }
344
345 buf.advance(subchunk.len() + 1);
346
347 let value = match field.as_slice() {
348 b"data" => ValueMode::Data,
349 b"event" => {
350 self.event_buf.clear();
351 ValueMode::Event
352 }
353 b"retry" => {
354 self.retry_buf = None;
355 ValueMode::Retry
356 }
357 b"id" => {
358 self.last_event_id_buf.clear();
359 ValueMode::Id
360 }
361 b"" => match b0 {
362 b':' => {
363 self.mode = Mode::Ignore;
364 continue;
365 }
366 b'\r' | b'\n' => match self.dispatch(b0 == b'\r') {
367 Some(ev) => return Ok(Some(ev)),
368 None => continue,
369 },
370 _ => unreachable!(),
371 },
372 _ => {
373 self.mode = Mode::Ignore;
374 continue;
375 }
376 };
377
378 match b0 {
379 b'\n' => self.mode = Mode::Field(FieldMode::new()),
380 b'\r' => self.mode = Mode::PostCr,
381 b':' => {
382 self.mode = Mode::PostColon(value);
383 continue;
384 }
385 _ => unreachable!(),
386 }
387
388 match value {
389 ValueMode::Data => self.data_buf.push(b'\n'),
390 ValueMode::Id => self.last_event_id_buf.clear(),
391 ValueMode::Event | ValueMode::Retry => {}
392 }
393 }
394 Mode::Value(ValueMode::Retry) => {
395 let mut advanced = 0;
396 let mut return_event = false;
397
398 for &b in chunk {
399 advanced += 1;
400 match b {
401 b'0'..=b'9' => {
402 let digit = (b & 0xf) as _;
403
404 let retry_buf = self.retry_buf.unwrap_or(0);
405 let Some(retry_buf) = retry_buf.checked_mul(10) else {
406 self.mode = Mode::Ignore;
407 break;
408 };
409 let Some(retry_buf) = retry_buf.checked_add(digit) else {
410 self.mode = Mode::Ignore;
411 break;
412 };
413 self.retry_buf = Some(retry_buf);
414 }
415 b'\r' => {
416 self.mode = Mode::PostCr;
417 return_event = true;
418 break;
419 }
420 b'\n' => {
421 self.mode = Mode::Field(FieldMode::new());
422 return_event = true;
423 break;
424 }
425 _ => {
426 self.mode = Mode::Ignore;
427 break;
428 }
429 }
430 }
431
432 buf.advance(advanced);
433
434 if let (true, Some(retry_buf)) = (return_event, self.retry_buf) {
435 return Ok(Some(SseEvent::Retry(retry_buf)));
436 }
437 }
438 Mode::Value(ValueMode::Data) => {
439 if consume_until_newline(
440 &mut self.mode,
441 Some(&mut self.data_buf),
442 self.max_payload_size,
443 buf,
444 )? {
445 self.data_buf.push(b'\n');
446 }
447 }
448 Mode::Value(ValueMode::Event) => {
449 consume_until_newline(
450 &mut self.mode,
451 Some(&mut self.event_buf),
452 self.max_payload_size,
453 buf,
454 )?;
455 }
456 Mode::Value(ValueMode::Id) => {
457 if consume_until_newline(
458 &mut self.mode,
459 Some(&mut self.last_event_id_buf),
460 self.max_payload_size,
461 buf,
462 )? && memchr(0, &self.last_event_id_buf).is_none()
463 {
464 self.staged_last_event_id = match &*self.last_event_id_buf {
465 [] => None,
466 buf => Some(String::from_utf8_lossy(buf).into()),
467 };
468 }
469 }
470 Mode::Ignore => {
471 consume_until_newline(&mut self.mode, None, self.max_payload_size, buf)
472 .expect("there should be no payload to grow too large");
473 }
474 Mode::PostCr => {
475 if chunk[0] == b'\n' {
476 buf.advance(1);
477 }
478 self.mode = Mode::Field(FieldMode::new());
479 }
480 Mode::PostColon(value) => {
481 if chunk[0] == b' ' {
482 buf.advance(1);
483 }
484 self.mode = Mode::Value(*value);
485 }
486 }
487 }
488 }
489}
490
491fn consume_until_newline(
492 mode: &mut Mode,
493 mut out: Option<&mut Vec<u8>>,
494 max_size: NonZeroUsize,
495 buf: &mut impl Buf,
496) -> Result<bool, PayloadTooLargeError> {
497 loop {
498 let chunk = buf.chunk();
499 if chunk.is_empty() {
500 return Ok(false);
501 };
502
503 let Some(i) = memchr2(b'\r', b'\n', chunk) else {
504 if let Some(out) = out.as_deref_mut() {
505 if max_size.get() < out.len() + chunk.len() {
506 *mode = Mode::Ignore;
507 return Err(PayloadTooLargeError);
508 }
509 out.extend_from_slice(chunk);
510 }
511 buf.advance(chunk.len());
512 continue;
513 };
514
515 if let Some(out) = out {
516 if max_size.get() < out.len() + i {
517 *mode = Mode::Ignore;
518 return Err(PayloadTooLargeError);
519 }
520 out.extend_from_slice(&chunk[..i]);
521 }
522
523 *mode = match chunk[i] {
524 b'\r' => Mode::PostCr,
525 b'\n' => Mode::Field(FieldMode::new()),
526 _ => unreachable!(),
527 };
528
529 buf.advance(i + 1);
530
531 return Ok(true);
532 }
533}
534
535#[test]
536fn hard_parse() -> Result<(), PayloadTooLargeError> {
537 use std::slice;
538
539 // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
540 let bytes = "
541
542:
543
544event: my-event\r
545data:line1
546data: line2
547:
548id: my-id
549:should be ignored too\rretry:42
550retry:
551
552data:second
553
554data:ignored
555";
556
557 let mut decoder = SseDecoder::new();
558
559 let events = bytes
560 .bytes()
561 .filter_map(|b| decoder.next(&mut slice::from_ref(&b)).transpose())
562 .collect::<Result<Vec<_>, PayloadTooLargeError>>()?;
563
564 let id = Some("my-id".into());
565
566 assert_eq!(
567 events,
568 &[
569 SseEvent::Retry(42),
570 SseEvent::Message(MessageEvent {
571 event: "my-event".into(),
572 data: "line1\nline2".into(),
573 last_event_id: id.clone()
574 }),
575 SseEvent::Message(MessageEvent {
576 event: "message".into(),
577 data: "second".into(),
578 last_event_id: id.clone()
579 })
580 ]
581 );
582 Ok(())
583}