1use std::borrow::Cow;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct SseEvent {
14 pub event: Cow<'static, str>,
16 pub data: String,
18 pub id: Option<String>,
20 pub retry: Option<u64>,
22}
23
24impl Default for SseEvent {
25 fn default() -> Self {
26 Self {
27 event: Cow::Borrowed("message"),
28 data: String::new(),
29 id: None,
30 retry: None,
31 }
32 }
33}
34
35#[derive(Debug, Default)]
37pub struct SseParser {
38 buffer: String,
39 current: SseEvent,
40 has_data: bool,
41 bom_checked: bool,
43 scanned_len: usize,
45}
46
47impl SseParser {
48 pub fn new() -> Self {
50 Self::default()
51 }
52
53 #[inline]
57 fn intern_event_type(value: &str) -> Cow<'static, str> {
58 match value {
59 "message" => Cow::Borrowed("message"),
60 "message_start" => Cow::Borrowed("message_start"),
61 "message_stop" => Cow::Borrowed("message_stop"),
62 "message_delta" => Cow::Borrowed("message_delta"),
63 "content_block_start" => Cow::Borrowed("content_block_start"),
64 "content_block_delta" => Cow::Borrowed("content_block_delta"),
65 "content_block_stop" => Cow::Borrowed("content_block_stop"),
66 "ping" => Cow::Borrowed("ping"),
67 "error" => Cow::Borrowed("error"),
68 _ => Cow::Owned(value.to_string()),
69 }
70 }
71
72 fn process_line(line: &str, current: &mut SseEvent, has_data: &mut bool) {
74 if current.data.len() > 100 * 1024 * 1024 {
76 return;
77 }
78
79 if let Some(rest) = line.strip_prefix(':') {
80 let _ = rest;
82 } else if let Some((field, value)) = line.split_once(':') {
83 let value = value.strip_prefix(' ').unwrap_or(value);
85 match field {
86 "event" => current.event = Self::intern_event_type(value),
87 "data" => {
88 current.data.push_str(value);
89 current.data.push('\n');
90 *has_data = true;
91 }
92 "id" => {
93 if !value.contains('\0') {
94 current.id = Some(value.to_string());
95 }
96 }
97 "retry" => current.retry = value.parse().ok(),
98 _ => {} }
100 } else {
101 match line {
103 "event" => current.event = Cow::Borrowed(""),
104 "data" => {
105 current.data.push('\n');
106 *has_data = true;
107 }
108 "id" => current.id = Some(String::new()),
109 _ => {}
110 }
111 }
112 }
113
114 #[inline]
117 fn process_source<F>(
118 source: &str,
119 scan_start: usize,
120 bom_checked: &mut bool,
121 current: &mut SseEvent,
122 has_data: &mut bool,
123 emit: &mut F,
124 ) -> usize
125 where
126 F: FnMut(SseEvent),
127 {
128 let bytes = source.as_bytes();
129 let mut start = 0usize;
130 let mut search_pos = scan_start;
131
132 if !*bom_checked && !source.is_empty() {
134 *bom_checked = true;
135 if source.starts_with('\u{FEFF}') {
136 start = 3;
137 if search_pos < 3 {
138 search_pos = 3;
139 }
140 }
141 }
142
143 while let Some(rel_pos) = memchr::memchr2(b'\r', b'\n', &bytes[search_pos..]) {
145 let pos = search_pos + rel_pos;
146 let b = bytes[pos];
147
148 let line_end;
149 let next_start;
150
151 if b == b'\n' {
152 line_end = pos;
154 next_start = pos + 1;
155 } else {
156 if pos + 1 < source.len() {
158 line_end = pos;
159 next_start = if bytes[pos + 1] == b'\n' {
160 pos + 2
162 } else {
163 pos + 1
165 };
166 } else {
167 break;
169 }
170 }
171
172 let line = &source[start..line_end];
173 start = next_start;
174 search_pos = next_start;
175
176 if line.is_empty() {
177 if *has_data {
179 if current.data.ends_with('\n') {
181 current.data.pop();
182 }
183 if current.event.is_empty() {
185 current.event = Cow::Borrowed("message");
186 }
187 emit(std::mem::take(current));
188 *current = SseEvent::default();
189 *has_data = false;
190 }
191 } else {
192 Self::process_line(line, current, has_data);
193 }
194 }
195
196 start
197 }
198
199 fn feed_into<F>(&mut self, data: &str, mut emit: F)
201 where
202 F: FnMut(SseEvent),
203 {
204 const MAX_BUFFER_SIZE: usize = 10 * 1024 * 1024;
205 if self.buffer.len() + data.len() > MAX_BUFFER_SIZE {
206 self.buffer.clear();
207 self.current = SseEvent::default();
208 self.has_data = false;
209 self.bom_checked = false;
210 self.scanned_len = 0;
211 emit(SseEvent {
212 event: Cow::Borrowed("error"),
213 data: "SSE buffer limit exceeded".to_string(),
214 ..Default::default()
215 });
216 return;
217 }
218
219 if self.buffer.is_empty() {
220 let consumed = Self::process_source(
222 data,
223 0,
224 &mut self.bom_checked,
225 &mut self.current,
226 &mut self.has_data,
227 &mut emit,
228 );
229 if consumed < data.len() {
230 self.buffer.push_str(&data[consumed..]);
231 }
232 } else {
233 self.buffer.push_str(data);
235 let scan_start = self.scanned_len.saturating_sub(1);
237 let consumed = Self::process_source(
238 &self.buffer,
239 scan_start,
240 &mut self.bom_checked,
241 &mut self.current,
242 &mut self.has_data,
243 &mut emit,
244 );
245 if consumed > 0 {
246 self.buffer.drain(..consumed);
247 }
248 }
249 self.scanned_len = self.buffer.len();
251 }
252
253 pub fn feed(&mut self, data: &str) -> Vec<SseEvent> {
257 let mut events = Vec::with_capacity(4);
258 self.feed_into(data, |event| events.push(event));
259 events
260 }
261
262 pub fn has_pending(&self) -> bool {
264 !self.buffer.is_empty() || self.has_data
265 }
266
267 pub fn flush(&mut self) -> Option<SseEvent> {
269 if !self.buffer.is_empty() {
271 let line = std::mem::take(&mut self.buffer);
272 let line = line.trim_end_matches('\r');
273 Self::process_line(line, &mut self.current, &mut self.has_data);
274 }
275
276 if self.has_data {
277 if self.current.data.ends_with('\n') {
278 self.current.data.pop();
279 }
280 if self.current.event.is_empty() {
281 self.current.event = Cow::Borrowed("message");
282 }
283 let event = std::mem::take(&mut self.current);
284 self.current = SseEvent::default();
285 self.has_data = false;
286 Some(event)
287 } else {
288 None
289 }
290 }
291}
292
293pub struct SseStream<S> {
297 inner: S,
298 parser: SseParser,
299 pending_events: VecDeque<SseEvent>,
300 pending_error: Option<std::io::Error>,
301 utf8_buffer: Vec<u8>,
302}
303
304impl<S> SseStream<S> {
305 pub fn new(inner: S) -> Self {
307 Self {
308 inner,
309 parser: SseParser::new(),
310 pending_events: VecDeque::new(),
311 pending_error: None,
312 utf8_buffer: Vec::new(),
313 }
314 }
315}
316
317impl<S> SseStream<S>
318where
319 S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
320{
321 #[inline]
322 fn invalid_utf8_error() -> std::io::Error {
323 std::io::Error::new(
324 std::io::ErrorKind::InvalidData,
325 "Invalid UTF-8 in SSE stream",
326 )
327 }
328
329 fn feed_parsed_chunk(parser: &mut SseParser, pending: &mut VecDeque<SseEvent>, s: &str) {
330 parser.feed_into(s, |event| pending.push_back(event));
331 }
332
333 fn feed_to_pending(&mut self, s: &str) {
334 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
335 }
336
337 fn process_chunk_without_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
338 let mut processed = 0;
339 let mut first_error: Option<std::io::Error> = None;
340 loop {
341 match std::str::from_utf8(&bytes[processed..]) {
342 Ok(s) => {
343 if !s.is_empty() {
344 self.feed_to_pending(s);
345 }
346 return first_error.map_or(Ok(()), Err);
347 }
348 Err(err) => {
349 let valid_len = err.valid_up_to();
350 if valid_len > 0 {
351 let s = std::str::from_utf8(&bytes[processed..processed + valid_len])
352 .expect("valid utf8 prefix");
353 self.feed_to_pending(s);
354 processed += valid_len;
355 }
356
357 if let Some(invalid_len) = err.error_len() {
358 processed += invalid_len;
359 if first_error.is_none() {
360 first_error = Some(Self::invalid_utf8_error());
361 }
362 } else {
363 self.utf8_buffer.extend_from_slice(&bytes[processed..]);
364 return first_error.map_or(Ok(()), Err);
365 }
366 }
367 }
368 }
369 }
370
371 fn process_chunk_with_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
372 self.utf8_buffer.extend_from_slice(bytes);
373 let mut processed = 0;
374 let mut first_error: Option<std::io::Error> = None;
375 loop {
376 match std::str::from_utf8(&self.utf8_buffer[processed..]) {
377 Ok(s) => {
378 if !s.is_empty() {
379 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
380 }
381 self.utf8_buffer.clear();
382 return first_error.map_or(Ok(()), Err);
383 }
384 Err(err) => {
385 let valid_len = err.valid_up_to();
386 if valid_len > 0 {
387 let s = std::str::from_utf8(
388 &self.utf8_buffer[processed..processed + valid_len],
389 )
390 .expect("valid utf8 prefix");
391 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
392 processed += valid_len;
393 }
394
395 if let Some(invalid_len) = err.error_len() {
396 processed += invalid_len;
397 if first_error.is_none() {
398 first_error = Some(Self::invalid_utf8_error());
399 }
400 } else {
401 let remaining = self.utf8_buffer.len() - processed;
403 self.utf8_buffer.copy_within(processed.., 0);
404 self.utf8_buffer.truncate(remaining);
405 return first_error.map_or(Ok(()), Err);
406 }
407 }
408 }
409 }
410 }
411
412 fn process_chunk(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
413 if self.utf8_buffer.is_empty() {
414 self.process_chunk_without_utf8_tail(bytes)
415 } else {
416 self.process_chunk_with_utf8_tail(bytes)
417 }
418 }
419
420 fn poll_stream_end(&mut self) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
421 if !self.utf8_buffer.is_empty() {
422 self.utf8_buffer.clear();
425 self.pending_events.clear();
426 self.pending_error = None;
427 self.parser = SseParser::new();
428 return Poll::Ready(Some(Err(std::io::Error::new(
429 std::io::ErrorKind::InvalidData,
430 "Stream ended with incomplete UTF-8 sequence",
431 ))));
432 }
433
434 if let Some(event) = self.parser.flush() {
435 return Poll::Ready(Some(Ok(event)));
436 }
437 Poll::Ready(None)
438 }
439
440 pub fn poll_next_event(
442 mut self: Pin<&mut Self>,
443 cx: &mut Context<'_>,
444 ) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
445 if let Some(event) = self.pending_events.pop_front() {
446 return Poll::Ready(Some(Ok(event)));
447 }
448 if let Some(err) = self.pending_error.take() {
449 return Poll::Ready(Some(Err(err)));
450 }
451
452 loop {
453 match Pin::new(&mut self.inner).poll_next(cx) {
454 Poll::Ready(Some(Ok(bytes))) => {
455 if let Err(err) = self.process_chunk(&bytes) {
456 if let Some(event) = self.pending_events.pop_front() {
457 self.pending_error = Some(err);
458 return Poll::Ready(Some(Ok(event)));
459 }
460 return Poll::Ready(Some(Err(err)));
461 }
462
463 if let Some(event) = self.pending_events.pop_front() {
464 return Poll::Ready(Some(Ok(event)));
465 }
466 }
467 Poll::Ready(Some(Err(e))) => {
468 return Poll::Ready(Some(Err(e)));
469 }
470 Poll::Ready(None) => {
471 return self.poll_stream_end();
472 }
473 Poll::Pending => {
474 return Poll::Pending;
475 }
476 }
477 }
478 }
479}
480
481impl<S> futures::Stream for SseStream<S>
482where
483 S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
484{
485 type Item = Result<SseEvent, std::io::Error>;
486
487 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
488 self.poll_next_event(cx)
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use futures::StreamExt;
496 use futures::stream;
497 use proptest::prelude::*;
498 use serde_json::json;
499 use std::fmt::Write as _;
500 use std::io::ErrorKind;
501
502 #[derive(Debug, Clone)]
503 struct GeneratedEvent {
504 event: Option<String>,
505 id: Option<String>,
506 retry: Option<u32>,
507 data: Vec<String>,
508 comment: Option<String>,
509 }
510
511 #[derive(Debug, Clone, Copy)]
512 enum LineEnding {
513 Lf,
514 Cr,
515 CrLf,
516 }
517
518 impl LineEnding {
519 fn as_str(self) -> &'static str {
520 match self {
521 Self::Lf => "\n",
522 Self::Cr => "\r",
523 Self::CrLf => "\r\n",
524 }
525 }
526 }
527
528 impl GeneratedEvent {
529 fn render(&self) -> String {
530 let mut out = String::new();
531 if let Some(comment) = &self.comment {
532 out.push(':');
533 out.push_str(comment);
534 out.push('\n');
535 }
536 if let Some(event) = &self.event {
537 out.push_str("event: ");
538 out.push_str(event);
539 out.push('\n');
540 }
541 if let Some(id) = &self.id {
542 out.push_str("id: ");
543 out.push_str(id);
544 out.push('\n');
545 }
546 if let Some(retry) = &self.retry {
547 out.push_str("retry: ");
548 out.push_str(&retry.to_string());
549 out.push('\n');
550 }
551 for line in &self.data {
552 out.push_str("data: ");
553 out.push_str(line);
554 out.push('\n');
555 }
556 out.push('\n');
557 out
558 }
559 }
560
561 fn ascii_line() -> impl Strategy<Value = String> {
562 "[ -~]{0,24}".prop_map(|s| s)
564 }
565
566 fn event_strategy() -> impl Strategy<Value = GeneratedEvent> {
567 (
568 prop::option::of("[a-z_]{1,12}"),
569 prop::option::of("[0-9]{1,8}"),
570 prop::option::of(0u32..5000),
571 prop::collection::vec(ascii_line(), 1..4),
572 prop::option::of(ascii_line()),
573 )
574 .prop_map(|(event, id, retry, data, comment)| GeneratedEvent {
575 event,
576 id,
577 retry,
578 data,
579 comment,
580 })
581 }
582
583 fn line_ending_strategy() -> impl Strategy<Value = LineEnding> {
584 prop_oneof![
585 Just(LineEnding::Lf),
586 Just(LineEnding::Cr),
587 Just(LineEnding::CrLf),
588 ]
589 }
590
591 fn unicode_line() -> impl Strategy<Value = String> {
592 prop::collection::vec(
593 any::<char>().prop_filter("no CR/LF", |c| *c != '\r' && *c != '\n'),
594 0..24,
595 )
596 .prop_map(|chars| chars.into_iter().collect())
597 }
598
599 fn id_field_strategy() -> impl Strategy<Value = String> {
600 prop_oneof![
601 4 => "[ -~]{0,24}".prop_map(|s| s),
602 1 => ("[ -~]{0,12}", "[ -~]{0,12}").prop_map(|(head, tail)| format!("{head}\0{tail}")),
603 ]
604 }
605
606 fn retry_field_strategy() -> impl Strategy<Value = String> {
607 prop_oneof![
608 6 => (0u64..=50_000u64).prop_map(|n| n.to_string()),
609 2 => (u64::MAX - 10..=u64::MAX).prop_map(|n| n.to_string()),
610 2 => "[a-zA-Z]{1,16}".prop_map(|s| s),
611 1 => "-[0-9]{1,24}".prop_map(|s| s),
612 1 => ((u128::from(u64::MAX) + 1)..=(u128::from(u64::MAX) + 50_000))
613 .prop_map(|n| n.to_string()),
614 1 => Just(String::new()),
615 ]
616 }
617
618 fn oversized_data_len_strategy() -> impl Strategy<Value = usize> {
619 prop_oneof![
621 10 => 1024usize..=65_536usize,
622 5 => 65_537usize..=262_144usize,
623 2 => 262_145usize..=1_048_576usize,
624 1 => 1_048_577usize..=3_145_728usize,
625 ]
626 }
627
628 fn render_stream(events: &[GeneratedEvent], terminal_delimiter: bool) -> String {
629 let mut out = String::new();
630 for event in events {
631 out.push_str(&event.render());
632 }
633 if !terminal_delimiter && out.ends_with('\n') {
634 out.pop();
635 }
636 out
637 }
638
639 fn render_stream_with_line_endings(
640 events: &[GeneratedEvent],
641 terminal_delimiter: bool,
642 line_ending: LineEnding,
643 ) -> String {
644 let canonical = render_stream(events, terminal_delimiter);
645 if matches!(line_ending, LineEnding::Lf) {
646 canonical
647 } else {
648 canonical.replace('\n', line_ending.as_str())
649 }
650 }
651
652 fn parse_all(input: &str) -> Vec<SseEvent> {
653 let mut parser = SseParser::new();
654 let mut events = parser.feed(input);
655 if let Some(event) = parser.flush() {
656 events.push(event);
657 }
658 events
659 }
660
661 fn parse_chunked(input: &str, chunk_sizes: &[usize]) -> Vec<SseEvent> {
662 let mut parser = SseParser::new();
663 let mut events = Vec::new();
664 let bytes = input.as_bytes();
665 let mut start = 0usize;
666
667 for &size in chunk_sizes {
668 if start >= bytes.len() {
669 break;
670 }
671 let end = (start + size).min(bytes.len());
672 let chunk = std::str::from_utf8(&bytes[start..end]).expect("ascii chunks");
673 events.extend(parser.feed(chunk));
674 start = end;
675 }
676
677 if start < bytes.len() {
678 let chunk = std::str::from_utf8(&bytes[start..]).expect("ascii remainder");
679 events.extend(parser.feed(chunk));
680 }
681
682 if let Some(event) = parser.flush() {
683 events.push(event);
684 }
685
686 events
687 }
688
689 fn split_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
690 let mut chunks = Vec::new();
691 let mut start = 0usize;
692
693 for &size in chunk_sizes {
694 if start >= input.len() {
695 break;
696 }
697 let end = (start + size).min(input.len());
698 chunks.push(input[start..end].to_vec());
699 start = end;
700 }
701
702 if start < input.len() {
703 chunks.push(input[start..].to_vec());
704 }
705
706 chunks
707 }
708
709 fn parse_stream_chunks(chunks: Vec<Vec<u8>>) -> (Vec<SseEvent>, Vec<ErrorKind>) {
710 let mut stream = SseStream::new(stream::iter(
711 chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
712 ));
713 let mut events = Vec::new();
714 let mut errors = Vec::new();
715
716 futures::executor::block_on(async {
717 while let Some(item) = stream.next().await {
718 match item {
719 Ok(event) => events.push(event),
720 Err(err) => errors.push(err.kind()),
721 }
722 }
723 });
724
725 (events, errors)
726 }
727
728 fn parse_stream_chunks_limited(
729 chunks: Vec<Vec<u8>>,
730 max_items: usize,
731 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
732 let mut stream = SseStream::new(stream::iter(
733 chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
734 ));
735 let mut events = Vec::new();
736 let mut errors = Vec::new();
737
738 futures::executor::block_on(async {
739 for _ in 0..max_items {
740 let Some(item) = stream.next().await else {
741 break;
742 };
743 match item {
744 Ok(event) => events.push(event),
745 Err(err) => errors.push(err.kind()),
746 }
747 }
748 });
749
750 (events, errors)
751 }
752
753 fn parse_stream_single_chunk(input: &[u8]) -> (Vec<SseEvent>, Vec<ErrorKind>) {
754 parse_stream_chunks(vec![input.to_vec()])
755 }
756
757 fn parse_stream_chunked(
758 input: &[u8],
759 chunk_sizes: &[usize],
760 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
761 let chunks = split_bytes(input, chunk_sizes);
762 parse_stream_chunks(chunks)
763 }
764
765 fn parse_stream_chunked_limited(
766 input: &[u8],
767 chunk_sizes: &[usize],
768 max_items: usize,
769 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
770 let chunks = split_bytes(input, chunk_sizes);
771 parse_stream_chunks_limited(chunks, max_items)
772 }
773
774 fn diag_json(
775 fixture_id: &str,
776 parser: &SseParser,
777 input: &str,
778 expected: &str,
779 actual: &str,
780 ) -> String {
781 json!({
782 "fixture_id": fixture_id,
783 "seed": "deterministic-static",
784 "env": {
785 "os": std::env::consts::OS,
786 "arch": std::env::consts::ARCH,
787 "cwd": std::env::current_dir().ok().map(|path| path.display().to_string()),
788 },
789 "input_preview": input,
790 "parser_state": {
791 "has_pending": parser.has_pending(),
792 },
793 "expected": expected,
794 "actual": actual,
795 })
796 .to_string()
797 }
798
799 #[test]
800 fn test_simple_event() {
801 let mut parser = SseParser::new();
802 let events = parser.feed("data: hello\n\n");
803 assert_eq!(events.len(), 1);
804 assert_eq!(events[0].event, "message");
805 assert_eq!(events[0].data, "hello");
806 }
807
808 #[test]
809 fn test_multiline_data() {
810 let mut parser = SseParser::new();
811 let events = parser.feed("data: line1\ndata: line2\n\n");
812 assert_eq!(events.len(), 1);
813 assert_eq!(events[0].data, "line1\nline2");
814 }
815
816 #[test]
817 fn test_named_event() {
818 let mut parser = SseParser::new();
819 let events = parser.feed("event: ping\ndata: {}\n\n");
820 assert_eq!(events.len(), 1);
821 assert_eq!(events[0].event, "ping");
822 assert_eq!(events[0].data, "{}");
823 }
824
825 #[test]
826 fn test_event_with_id() {
827 let mut parser = SseParser::new();
828 let events = parser.feed("id: 123\ndata: test\n\n");
829 assert_eq!(events.len(), 1);
830 assert_eq!(events[0].id, Some("123".to_string()));
831 assert_eq!(events[0].data, "test");
832 }
833
834 #[test]
835 fn test_multiple_events() {
836 let mut parser = SseParser::new();
837 let events = parser.feed("data: first\n\ndata: second\n\n");
838 assert_eq!(events.len(), 2);
839 assert_eq!(events[0].data, "first");
840 assert_eq!(events[1].data, "second");
841 }
842
843 #[test]
844 fn test_incremental_feed() {
845 let mut parser = SseParser::new();
846
847 let events = parser.feed("data: hel");
849 assert!(events.is_empty());
850
851 let events = parser.feed("lo\n");
853 assert!(events.is_empty());
854
855 let events = parser.feed("\n");
857 assert_eq!(events.len(), 1);
858 assert_eq!(events[0].data, "hello");
859 }
860
861 #[test]
862 fn test_comment_ignored() {
863 let mut parser = SseParser::new();
864 let events = parser.feed(":this is a comment\ndata: actual\n\n");
865 assert_eq!(events.len(), 1);
866 assert_eq!(events[0].data, "actual");
867 }
868
869 #[test]
870 fn test_retry_field() {
871 let mut parser = SseParser::new();
872 let events = parser.feed("retry: 3000\ndata: test\n\n");
873 assert_eq!(events.len(), 1);
874 assert_eq!(events[0].retry, Some(3000));
875 }
876
877 #[test]
878 fn test_keep_alive_comment_does_not_emit_event() {
879 let mut parser = SseParser::new();
880 let events = parser.feed(": keepalive\n\n");
881 assert!(events.is_empty());
882 }
883
884 #[test]
885 fn test_crlf_handling() {
886 let mut parser = SseParser::new();
887 let events = parser.feed("data: hello\r\n\r\n");
888 assert_eq!(events.len(), 1);
889 assert_eq!(events[0].data, "hello");
890 }
891
892 #[test]
893 fn test_flush_pending() {
894 let mut parser = SseParser::new();
895 let events = parser.feed("data: incomplete");
896 assert!(events.is_empty());
897 assert!(parser.has_pending());
898
899 let event = parser.flush();
901 assert!(event.is_some());
902 assert_eq!(event.unwrap().data, "incomplete");
903 }
904
905 #[test]
906 fn test_event_without_data_is_ignored() {
907 let mut parser = SseParser::new();
908 let events = parser.feed("event: ping\n\n");
909 assert!(
910 events.is_empty(),
911 "event block without data should not emit an event"
912 );
913 }
914
915 #[test]
916 fn test_unknown_field_is_ignored() {
917 let mut parser = SseParser::new();
918 let events = parser.feed("foo: bar\ndata: hello\n\n");
919 assert_eq!(events.len(), 1);
920 assert_eq!(events[0].data, "hello");
921 assert_eq!(events[0].event, "message");
922 }
923
924 #[test]
925 fn test_error_event_parsing() {
926 let mut parser = SseParser::new();
927 let events = parser.feed("event: error\ndata: {\"message\":\"boom\"}\n\n");
928 assert_eq!(events.len(), 1);
929 assert_eq!(events[0].event, "error");
930 assert_eq!(events[0].data, "{\"message\":\"boom\"}");
931 }
932
933 #[test]
934 fn test_empty_event_field_defaults_to_message() {
935 let mut parser = SseParser::new();
936 let input = "event\ndata: hello\n\n";
937 let events = parser.feed(input);
938 let diag = diag_json(
939 "sse-empty-event-field-default",
940 &parser,
941 input,
942 r#"{"event":"message","data":"hello"}"#,
943 &format!("{events:?}"),
944 );
945
946 assert_eq!(events.len(), 1, "{diag}");
947 assert_eq!(events[0].event, "message", "{diag}");
948 assert_eq!(events[0].data, "hello", "{diag}");
949 }
950
951 #[test]
952 fn test_large_payload_event() {
953 let mut parser = SseParser::new();
954 let payload = "x".repeat(128 * 1024);
955 let input = format!("data: {payload}\n\n");
956 let events = parser.feed(&input);
957 assert_eq!(events.len(), 1);
958 assert_eq!(events[0].data.len(), payload.len());
959 assert_eq!(events[0].data, payload);
960 }
961
962 #[test]
963 fn test_buffer_limit_overflow_resets_parser_state() {
964 let mut parser = SseParser::new();
965 assert!(parser.feed("data: stale\n").is_empty());
966
967 let oversized = "x".repeat(10 * 1024 * 1024 + 1);
968 let overflow_events = parser.feed(&oversized);
969 assert_eq!(overflow_events.len(), 1);
970 assert_eq!(overflow_events[0].event, "error");
971 assert_eq!(overflow_events[0].data, "SSE buffer limit exceeded");
972
973 assert!(!parser.has_pending());
974 assert!(parser.flush().is_none());
975
976 let fresh = parser.feed("data: fresh\n\n");
977 assert_eq!(fresh.len(), 1);
978 assert_eq!(fresh[0].data, "fresh");
979 }
980
981 #[test]
982 fn test_rapid_sequential_events() {
983 let mut parser = SseParser::new();
984 let mut input = String::new();
985 for i in 0..200 {
986 let _ = write!(&mut input, "event: e{i}\ndata: payload{i}\n\n");
987 }
988 let events = parser.feed(&input);
989 assert_eq!(events.len(), 200);
990 assert_eq!(events[0].event, "e0");
991 assert_eq!(events[0].data, "payload0");
992 assert_eq!(events[199].event, "e199");
993 assert_eq!(events[199].data, "payload199");
994 }
995
996 #[test]
997 fn test_stream_event_name_matrix() {
998 let names = [
999 "message_start",
1000 "content_block_start",
1001 "content_block_delta",
1002 "content_block_stop",
1003 "message_delta",
1004 "message_stop",
1005 "message",
1006 "error",
1007 "ping",
1008 "response.created",
1009 "response.output_text.delta",
1010 "response.completed",
1011 ];
1012
1013 let mut parser = SseParser::new();
1014 let mut input = String::new();
1015 for name in names {
1016 let _ = write!(&mut input, "event: {name}\ndata: {{}}\n\n");
1017 }
1018
1019 let events = parser.feed(&input);
1020 assert_eq!(events.len(), names.len());
1021 for (idx, name) in names.iter().enumerate() {
1022 assert_eq!(events[idx].event, *name);
1023 assert_eq!(events[idx].data, "{}");
1024 }
1025 }
1026
1027 #[test]
1028 fn test_anthropic_style_events() {
1029 let mut parser = SseParser::new();
1030
1031 let events = parser.feed(
1033 r#"event: message_start
1034data: {"type":"message_start","message":{"id":"msg_123"}}
1035
1036event: content_block_start
1037data: {"type":"content_block_start","index":0,"content_block":{"type":"text"}}
1038
1039event: content_block_delta
1040data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
1041
1042event: content_block_stop
1043data: {"type":"content_block_stop","index":0}
1044
1045event: message_stop
1046data: {"type":"message_stop"}
1047
1048"#,
1049 );
1050
1051 assert_eq!(events.len(), 5);
1052 assert_eq!(events[0].event, "message_start");
1053 assert!(events[0].data.contains("message_start"));
1054 assert_eq!(events[1].event, "content_block_start");
1055 assert_eq!(events[2].event, "content_block_delta");
1056 assert!(events[2].data.contains("Hello"));
1057 assert_eq!(events[3].event, "content_block_stop");
1058 assert_eq!(events[4].event, "message_stop");
1059 }
1060
1061 #[test]
1062 fn test_stream_yields_multiple_events_from_one_chunk() {
1063 let bytes = b"data: first\n\ndata: second\n\n".to_vec();
1064 let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1065
1066 futures::executor::block_on(async {
1067 let first = stream.next().await.expect("first event").expect("ok");
1068 assert_eq!(first.data, "first");
1069
1070 let second = stream.next().await.expect("second event").expect("ok");
1071 assert_eq!(second.data, "second");
1072
1073 assert!(stream.next().await.is_none());
1074 });
1075 }
1076
1077 #[test]
1078 fn test_stream_handles_utf8_split_across_chunks() {
1079 let chunks = vec![Ok(b"data: \xE2".to_vec()), Ok(b"\x98\x83\n\n".to_vec())];
1081 let mut stream = SseStream::new(stream::iter(chunks));
1082
1083 futures::executor::block_on(async {
1084 let event = stream.next().await.expect("event").expect("ok");
1085 assert_eq!(event.data, "☃");
1086 assert!(stream.next().await.is_none());
1087 });
1088 }
1089
1090 #[test]
1091 fn test_stream_handles_crlf_split_across_partial_frames() {
1092 let chunks = vec![
1093 Ok(b"data: first\r".to_vec()),
1094 Ok(b"\n".to_vec()),
1095 Ok(b"\r".to_vec()),
1096 Ok(b"\n".to_vec()),
1097 ];
1098 let mut stream = SseStream::new(stream::iter(chunks));
1099
1100 futures::executor::block_on(async {
1101 let first = stream.next().await.expect("first event").expect("ok");
1102 let diag = json!({
1103 "fixture_id": "sse-crlf-split-across-chunks",
1104 "seed": "deterministic-static",
1105 "expected": {"event": "message", "data": "first"},
1106 "actual": {"event": first.event, "data": first.data},
1107 })
1108 .to_string();
1109 assert_eq!(first.data, "first", "{diag}");
1110 assert!(stream.next().await.is_none(), "{diag}");
1111 });
1112 }
1113
1114 #[test]
1115 fn test_stream_flushes_pending_event_at_end() {
1116 let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: last".to_vec())]));
1117
1118 futures::executor::block_on(async {
1119 let event = stream.next().await.expect("event").expect("ok");
1120 assert_eq!(event.data, "last");
1121 assert!(stream.next().await.is_none());
1122 });
1123 }
1124
1125 #[test]
1126 fn test_stream_errors_on_incomplete_utf8_at_end() {
1127 let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: \xE2".to_vec())]));
1128
1129 futures::executor::block_on(async {
1130 let err = stream
1131 .next()
1132 .await
1133 .expect("expected a result")
1134 .expect_err("expected utf8 error");
1135 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1136 assert!(
1137 stream.next().await.is_none(),
1138 "incomplete UTF-8 at EOF should produce a terminal error"
1139 );
1140 });
1141 }
1142
1143 #[test]
1144 fn test_stream_surfaces_pending_event_before_utf8_error() {
1145 let chunks = vec![Ok(b"data: ok\n\ndata: \xFF\n\n".to_vec())];
1150 let mut stream = SseStream::new(stream::iter(chunks));
1151
1152 futures::executor::block_on(async {
1153 let first = stream.next().await.expect("first item").expect("first ok");
1154 let diag = json!({
1155 "fixture_id": "sse-valid-event-before-invalid-utf8",
1156 "seed": "deterministic-static",
1157 "expected_sequence": ["Ok(data=ok)", "Ok(data=)", "Err(invalid utf8)"],
1158 "actual_first": {"event": first.event, "data": first.data},
1159 })
1160 .to_string();
1161 assert_eq!(first.data, "ok", "{diag}");
1162
1163 let second = stream
1166 .next()
1167 .await
1168 .expect("second item")
1169 .expect("second ok");
1170 assert_eq!(second.data, "", "{diag}");
1171
1172 let err = stream
1173 .next()
1174 .await
1175 .expect("third item")
1176 .expect_err("third should be utf8 error");
1177 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData, "{diag}");
1178 });
1179 }
1180
1181 #[test]
1182 fn test_stream_resumes_parsing_remainder_after_utf8_error() {
1183 let mut bytes = b"data: ok\n\n".to_vec();
1190 bytes.push(0xFF);
1191 bytes.extend_from_slice(b"data: after\n\n");
1192
1193 let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1194
1195 futures::executor::block_on(async {
1196 let first = stream.next().await.expect("1").expect("ok");
1198 assert_eq!(first.data, "ok");
1199
1200 let second = stream.next().await.expect("2").expect("after");
1202 assert_eq!(second.data, "after");
1203
1204 let err = stream.next().await.expect("3").expect_err("error");
1206 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1207 });
1208 }
1209
1210 #[test]
1211 fn test_bom_stripping_with_preceding_empty_chunk() {
1212 let mut parser = SseParser::new();
1213 let events = parser.feed("");
1215 assert!(events.is_empty());
1216
1217 let events = parser.feed("\u{FEFF}data: hello\n\n");
1219 assert_eq!(events.len(), 1);
1220 assert_eq!(events[0].data, "hello");
1221 assert_eq!(events[0].event, "message");
1223 }
1224
1225 #[test]
1231 fn test_fuzz_regression_crash_28de6b() {
1232 let data: &[u8] = &[
1233 0x64, 0x3d, 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x31, 0x0a, 0x5a, 0x61,
1234 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x32, 0x0a, 0x64, 0x61, 0x74, 0x61,
1235 0x3a, 0x20, 0x6c, 0x9f, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1236 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x28, 0xcd, 0xcd, 0xa1,
1237 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1,
1238 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1239 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82,
1240 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x40, 0x82, 0xcd,
1241 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x91,
1242 0x9a, 0x93, 0x69, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1243 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1244 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
1245 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1246 0x00, 0x00, 0x00,
1247 ];
1248 let input = String::from_utf8_lossy(data);
1249
1250 let mut parser_whole = SseParser::new();
1252 let events_whole = parser_whole.feed(&input);
1253 let flush_whole = parser_whole.flush();
1254
1255 let mut parser_char = SseParser::new();
1257 let mut events_char = Vec::new();
1258 for ch in input.chars() {
1259 let mut buf = [0u8; 4];
1260 events_char.extend(parser_char.feed(ch.encode_utf8(&mut buf)));
1261 }
1262 let flush_char = parser_char.flush();
1263
1264 let mid = input.len() / 2;
1266 let mut split_at = mid;
1267 while !input.is_char_boundary(split_at) && split_at < input.len() {
1268 split_at += 1;
1269 }
1270 let (part1, part2) = input.split_at(split_at);
1271 let mut parser_split = SseParser::new();
1272 let mut events_split: Vec<_> = parser_split.feed(part1);
1273 events_split.extend(parser_split.feed(part2));
1274 let flush_split = parser_split.flush();
1275
1276 assert_eq!(events_whole, events_split, "whole vs split events");
1277 assert_eq!(flush_whole, flush_split, "whole vs split flush");
1278 assert_eq!(events_whole, events_char, "whole vs char events");
1279 assert_eq!(flush_whole, flush_char, "whole vs char flush");
1280 }
1281
1282 proptest! {
1283 #![proptest_config(ProptestConfig {
1284 cases: 256,
1285 max_shrink_iters: 200,
1286 .. ProptestConfig::default()
1287 })]
1288
1289 #[test]
1290 fn sse_chunking_invariant(
1291 events in prop::collection::vec(event_strategy(), 1..10),
1292 chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1293 terminal_delimiter in any::<bool>(),
1294 ) {
1295 let input = render_stream(&events, terminal_delimiter);
1296 let expected = parse_all(&input);
1297 let actual = parse_chunked(&input, &chunk_sizes);
1298 prop_assert_eq!(actual, expected);
1299 }
1300
1301 #[test]
1302 fn sse_line_ending_chunking_invariant(
1303 events in prop::collection::vec(event_strategy(), 1..10),
1304 chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1305 terminal_delimiter in any::<bool>(),
1306 line_ending in line_ending_strategy(),
1307 ) {
1308 let input = render_stream_with_line_endings(&events, terminal_delimiter, line_ending);
1309 let expected = parse_all(&input);
1310 let actual = parse_chunked(&input, &chunk_sizes);
1311 prop_assert_eq!(actual, expected);
1312 }
1313
1314 #[test]
1315 fn sse_id_with_null_bytes_is_rejected(
1316 id in id_field_strategy(),
1317 data in ascii_line(),
1318 ) {
1319 let input = format!("id: {id}\ndata: {data}\n\n");
1320 let events = parse_all(&input);
1321
1322 prop_assert_eq!(events.len(), 1);
1323 let expected_id = if id.contains('\0') { None } else { Some(id) };
1324 prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1325 }
1326
1327 #[test]
1328 fn sse_retry_field_fuzz_last_assignment_wins(
1329 retry_values in prop::collection::vec(retry_field_strategy(), 1..6),
1330 data in ascii_line(),
1331 ) {
1332 let mut input = String::new();
1333 for value in &retry_values {
1334 let _ = writeln!(&mut input, "retry: {value}");
1335 }
1336 let _ = writeln!(&mut input, "data: {data}");
1337 input.push('\n');
1338
1339 let events = parse_all(&input);
1340 prop_assert_eq!(events.len(), 1);
1341
1342 let expected_retry = retry_values
1343 .last()
1344 .and_then(|value| value.parse::<u64>().ok());
1345 prop_assert_eq!(events[0].retry, expected_retry);
1346 }
1347
1348 #[test]
1349 fn sse_duplicate_fields_apply_spec_semantics(
1350 event_names in prop::collection::vec("[a-z_]{1,12}", 1..6),
1351 ids in prop::collection::vec(id_field_strategy(), 0..6),
1352 data_lines in prop::collection::vec(ascii_line(), 1..6),
1353 ) {
1354 let mut input = String::new();
1355
1356 for event_name in &event_names {
1357 let _ = writeln!(&mut input, "event: {event_name}");
1358 }
1359 for id in &ids {
1360 let _ = writeln!(&mut input, "id: {id}");
1361 }
1362 for line in &data_lines {
1363 let _ = writeln!(&mut input, "data: {line}");
1364 }
1365 input.push('\n');
1366
1367 let events = parse_all(&input);
1368 prop_assert_eq!(events.len(), 1);
1369 prop_assert_eq!(events[0].event.as_ref(), event_names.last().expect("non-empty"));
1370
1371 let expected_id = ids.iter().rfind(|id| !id.contains('\0')).cloned();
1372 prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1373
1374 let expected_data = data_lines.join("\n");
1375 prop_assert_eq!(events[0].data.as_str(), expected_data);
1376 }
1377
1378 #[test]
1379 fn sse_oversized_data_fields_round_trip(len in oversized_data_len_strategy(),) {
1380 let payload = "x".repeat(len);
1381 let input = format!("data: {payload}\n\n");
1382 let events = parse_all(&input);
1383
1384 prop_assert_eq!(events.len(), 1);
1385 prop_assert_eq!(events[0].data.len(), len);
1386 prop_assert_eq!(events[0].data.as_str(), payload);
1387 }
1388
1389 #[test]
1390 fn sse_stream_utf8_valid_input_chunking_invariant(
1391 lines in prop::collection::vec(unicode_line(), 1..5),
1392 chunk_sizes in prop::collection::vec(1usize..16, 0..24),
1393 ) {
1394 let mut input = String::new();
1395 for line in &lines {
1396 let _ = writeln!(&mut input, "data: {line}");
1397 }
1398 input.push('\n');
1399
1400 let (single_events, single_errors) = parse_stream_single_chunk(input.as_bytes());
1401 let (chunked_events, chunked_errors) =
1402 parse_stream_chunked(input.as_bytes(), &chunk_sizes);
1403
1404 prop_assert!(single_errors.is_empty(), "single-chunk had UTF-8 errors");
1405 prop_assert!(chunked_errors.is_empty(), "chunked parse had UTF-8 errors");
1406 prop_assert_eq!(chunked_events, single_events);
1407 }
1408
1409 #[test]
1410 fn sse_stream_bom_start_stripped_embedded_preserved(
1411 left in unicode_line(),
1412 right in unicode_line(),
1413 chunk_sizes in prop::collection::vec(1usize..8, 0..24),
1414 ) {
1415 let start_bom = format!("\u{FEFF}data: {left}{right}\n\n");
1416 let (start_events, start_errors) =
1417 parse_stream_chunked(start_bom.as_bytes(), &chunk_sizes);
1418 prop_assert!(start_errors.is_empty(), "start BOM should be valid UTF-8");
1419 prop_assert_eq!(start_events.len(), 1);
1420 let expected_start = format!("{left}{right}");
1421 prop_assert_eq!(start_events[0].data.as_str(), expected_start);
1422
1423 let embedded_bom = format!("data: {left}\u{FEFF}{right}\n\n");
1424 let (embedded_events, embedded_errors) =
1425 parse_stream_chunked(embedded_bom.as_bytes(), &chunk_sizes);
1426 prop_assert!(embedded_errors.is_empty(), "embedded BOM should be preserved");
1427 prop_assert_eq!(embedded_events.len(), 1);
1428 let expected_embedded = format!("{left}\u{FEFF}{right}");
1429 prop_assert_eq!(embedded_events[0].data.as_str(), expected_embedded);
1430 }
1431
1432 #[test]
1433 fn sse_stream_invalid_utf8_yields_invalid_data_errors(
1434 prefix in ascii_line(),
1435 suffix in ascii_line(),
1436 invalid_len in 1usize..4,
1437 chunk_sizes in prop::collection::vec(1usize..8, 0..20),
1438 ) {
1439 let mut bytes = format!("data: {prefix}\n\n").into_bytes();
1440 bytes.extend(std::iter::repeat_n(0xFFu8, invalid_len));
1441 bytes.extend(format!("data: {suffix}\n\n").as_bytes());
1442
1443 let (events, errors) = parse_stream_chunked_limited(&bytes, &chunk_sizes, 32);
1444 prop_assert!(
1445 events.iter().any(|event| event.data == prefix),
1446 "event before invalid sequence should still be surfaced"
1447 );
1448 prop_assert!(!errors.is_empty(), "invalid UTF-8 should emit at least one error");
1449 prop_assert!(
1450 errors.iter().all(|kind| *kind == ErrorKind::InvalidData),
1451 "all stream decoding errors must be InvalidData"
1452 );
1453 }
1454 }
1455}