1use std::borrow::Cow;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11const MAX_EVENT_DATA_BYTES: usize = 100 * 1024 * 1024;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct SseEvent {
16 pub event: Cow<'static, str>,
18 pub data: String,
20 pub id: Option<String>,
22 pub retry: Option<u64>,
24}
25
26impl Default for SseEvent {
27 fn default() -> Self {
28 Self {
29 event: Cow::Borrowed("message"),
30 data: String::new(),
31 id: None,
32 retry: None,
33 }
34 }
35}
36
37#[derive(Debug)]
39pub struct SseParser {
40 buffer: String,
41 current: SseEvent,
42 has_data: bool,
43 bom_checked: bool,
45 scanned_len: usize,
47 max_event_data_bytes: usize,
49}
50
51impl Default for SseParser {
52 fn default() -> Self {
53 Self {
54 buffer: String::new(),
55 current: SseEvent::default(),
56 has_data: false,
57 bom_checked: false,
58 scanned_len: 0,
59 max_event_data_bytes: MAX_EVENT_DATA_BYTES,
60 }
61 }
62}
63
64impl SseParser {
65 pub fn new() -> Self {
67 Self::default()
68 }
69
70 #[cfg(test)]
72 fn with_max_event_data_bytes(limit: usize) -> Self {
73 Self {
74 max_event_data_bytes: limit,
75 ..Self::default()
76 }
77 }
78
79 #[inline]
83 fn intern_event_type(value: &str) -> Cow<'static, str> {
84 match value {
85 "message" => Cow::Borrowed("message"),
87 "message_start" => Cow::Borrowed("message_start"),
88 "message_stop" => Cow::Borrowed("message_stop"),
89 "message_delta" => Cow::Borrowed("message_delta"),
90 "content_block_start" => Cow::Borrowed("content_block_start"),
91 "content_block_delta" => Cow::Borrowed("content_block_delta"),
92 "content_block_stop" => Cow::Borrowed("content_block_stop"),
93 "response.completed" => Cow::Borrowed("response.completed"),
95 "response.done" => Cow::Borrowed("response.done"),
96 "response.failed" => Cow::Borrowed("response.failed"),
97 "response.incomplete" => Cow::Borrowed("response.incomplete"),
98 "response.output_text.delta" => Cow::Borrowed("response.output_text.delta"),
99 "response.output_text.done" => Cow::Borrowed("response.output_text.done"),
100 "response.output_item.added" => Cow::Borrowed("response.output_item.added"),
101 "response.output_item.done" => Cow::Borrowed("response.output_item.done"),
102 "response.content_part.done" => Cow::Borrowed("response.content_part.done"),
103 "response.function_call_arguments.delta" => {
104 Cow::Borrowed("response.function_call_arguments.delta")
105 }
106 "response.reasoning_text.delta" => Cow::Borrowed("response.reasoning_text.delta"),
107 "response.reasoning_text.done" => Cow::Borrowed("response.reasoning_text.done"),
108 "response.reasoning_summary_text.delta" => {
109 Cow::Borrowed("response.reasoning_summary_text.delta")
110 }
111 "response.reasoning_summary_text.done" => {
112 Cow::Borrowed("response.reasoning_summary_text.done")
113 }
114 "response.reasoning_summary_part.done" => {
115 Cow::Borrowed("response.reasoning_summary_part.done")
116 }
117 "response.created" => Cow::Borrowed("response.created"),
118 "ping" => Cow::Borrowed("ping"),
120 "error" => Cow::Borrowed("error"),
121 _ => Cow::Owned(value.to_string()),
122 }
123 }
124
125 #[inline]
126 fn append_data_line(
127 current: &mut SseEvent,
128 value: &str,
129 has_data: &mut bool,
130 max_event_data_bytes: usize,
131 ) {
132 let projected_len = current
133 .data
134 .len()
135 .saturating_add(value.len())
136 .saturating_add(1);
137 if projected_len > max_event_data_bytes {
138 *has_data = true;
142 return;
143 }
144 current.data.push_str(value);
145 current.data.push('\n');
146 *has_data = true;
147 }
148
149 fn process_line(
151 line: &str,
152 current: &mut SseEvent,
153 has_data: &mut bool,
154 max_event_data_bytes: usize,
155 ) {
156 if let Some(rest) = line.strip_prefix(':') {
157 let _ = rest;
159 } else if let Some((field, value)) = line.split_once(':') {
160 let value = value.strip_prefix(' ').unwrap_or(value);
162 match field {
163 "event" => current.event = Self::intern_event_type(value),
164 "data" => Self::append_data_line(current, value, has_data, max_event_data_bytes),
165 "id" if !value.contains('\0') => {
166 current.id = Some(value.to_string());
167 }
168 "retry" => current.retry = value.parse().ok(),
169 _ => {} }
171 } else {
172 match line {
174 "event" => current.event = Cow::Borrowed(""),
175 "data" => Self::append_data_line(current, "", has_data, max_event_data_bytes),
176 "id" => current.id = Some(String::new()),
177 _ => {}
178 }
179 }
180 }
181
182 #[inline]
183 fn reset_current_for_next_event(current: &mut SseEvent) {
184 current.event = Cow::Borrowed("message");
185 current.data.clear();
186 }
187
188 #[inline]
189 fn carry_forward_event_state(current: &SseEvent) -> SseEvent {
190 SseEvent {
191 id: current.id.clone(),
192 retry: current.retry,
193 ..Default::default()
194 }
195 }
196
197 #[inline]
198 fn reset_after_buffer_limit<F>(&mut self, emit: &mut F)
199 where
200 F: FnMut(SseEvent),
201 {
202 self.buffer = String::new();
203 self.current = SseEvent::default();
204 self.has_data = false;
205 self.bom_checked = false;
206 self.scanned_len = 0;
207 emit(SseEvent {
208 event: Cow::Borrowed("error"),
209 data: "SSE buffer limit exceeded".to_string(),
210 ..Default::default()
211 });
212 }
213
214 #[inline]
217 fn process_source<F>(
218 source: &str,
219 scan_start: usize,
220 bom_checked: &mut bool,
221 current: &mut SseEvent,
222 has_data: &mut bool,
223 max_event_data_bytes: usize,
224 emit: &mut F,
225 ) -> usize
226 where
227 F: FnMut(SseEvent),
228 {
229 let bytes = source.as_bytes();
230 let mut start = 0usize;
231 let mut search_pos = scan_start;
232
233 if !*bom_checked && !source.is_empty() {
235 *bom_checked = true;
236 if source.starts_with('\u{FEFF}') {
237 start = 3;
238 if search_pos < 3 {
239 search_pos = 3;
240 }
241 }
242 }
243
244 while let Some(rel_pos) = memchr::memchr2(b'\r', b'\n', &bytes[search_pos..]) {
246 let pos = search_pos + rel_pos;
247 let b = bytes[pos];
248
249 let line_end;
250 let next_start;
251
252 if b == b'\n' {
253 line_end = pos;
255 next_start = pos + 1;
256 } else {
257 if pos + 1 < source.len() {
259 line_end = pos;
260 next_start = if bytes[pos + 1] == b'\n' {
261 pos + 2
263 } else {
264 pos + 1
266 };
267 } else {
268 break;
270 }
271 }
272
273 let line = &source[start..line_end];
274 start = next_start;
275 search_pos = next_start;
276
277 if line.is_empty() {
278 if *has_data {
280 if current.data.ends_with('\n') {
282 current.data.pop();
283 }
284 if current.event.is_empty() {
286 current.event = Cow::Borrowed("message");
287 }
288 let next_event = Self::carry_forward_event_state(current);
289 emit(std::mem::take(current));
290 *current = next_event;
291 *has_data = false;
292 } else {
293 Self::reset_current_for_next_event(current);
294 }
295 } else {
296 Self::process_line(line, current, has_data, max_event_data_bytes);
297 }
298 }
299
300 start
301 }
302
303 fn feed_into<F>(&mut self, data: &str, mut emit: F)
305 where
306 F: FnMut(SseEvent),
307 {
308 const MAX_BUFFER_SIZE: usize = 10 * 1024 * 1024;
309 if self.buffer.is_empty() {
310 let consumed = Self::process_source(
312 data,
313 0,
314 &mut self.bom_checked,
315 &mut self.current,
316 &mut self.has_data,
317 self.max_event_data_bytes,
318 &mut emit,
319 );
320 if consumed < data.len() {
321 self.buffer.push_str(&data[consumed..]);
322 if self.buffer.len() > MAX_BUFFER_SIZE {
323 self.reset_after_buffer_limit(&mut emit);
324 return;
325 }
326 }
327 } else {
328 let mut combined = std::mem::take(&mut self.buffer);
331 combined.push_str(data);
332 let scan_start = self.scanned_len.saturating_sub(1);
334 let consumed = Self::process_source(
335 &combined,
336 scan_start,
337 &mut self.bom_checked,
338 &mut self.current,
339 &mut self.has_data,
340 self.max_event_data_bytes,
341 &mut emit,
342 );
343 if consumed < combined.len() {
344 self.buffer = combined[consumed..].to_string();
346 }
347 if self.buffer.len() > MAX_BUFFER_SIZE {
348 self.reset_after_buffer_limit(&mut emit);
349 return;
350 }
351 }
352 self.scanned_len = self.buffer.len();
354 }
355
356 pub fn feed(&mut self, data: &str) -> Vec<SseEvent> {
360 let mut events = Vec::with_capacity(4);
361 self.feed_into(data, |event| events.push(event));
362 events
363 }
364
365 pub fn has_pending(&self) -> bool {
367 !self.buffer.is_empty() || self.has_data
368 }
369
370 pub fn flush(&mut self) -> Option<SseEvent> {
372 if !self.buffer.is_empty() {
374 let line = std::mem::take(&mut self.buffer);
375 let line = line.trim_end_matches('\r');
376 Self::process_line(
377 line,
378 &mut self.current,
379 &mut self.has_data,
380 self.max_event_data_bytes,
381 );
382 }
383
384 if self.has_data {
385 if self.current.data.ends_with('\n') {
386 self.current.data.pop();
387 }
388 if self.current.event.is_empty() {
389 self.current.event = Cow::Borrowed("message");
390 }
391 let event = std::mem::take(&mut self.current);
392 self.current = SseEvent::default();
393 self.has_data = false;
394 Some(event)
395 } else {
396 None
397 }
398 }
399}
400
401pub struct SseStream<S> {
405 inner: S,
406 parser: SseParser,
407 pending_events: VecDeque<SseEvent>,
408 pending_error: Option<std::io::Error>,
409 pending_error_is_terminal: bool,
410 terminated: bool,
411 utf8_buffer: Vec<u8>,
412}
413
414impl<S> SseStream<S> {
415 pub fn new(inner: S) -> Self {
417 Self {
418 inner,
419 parser: SseParser::new(),
420 pending_events: VecDeque::new(),
421 pending_error: None,
422 pending_error_is_terminal: false,
423 terminated: false,
424 utf8_buffer: Vec::new(),
425 }
426 }
427}
428
429impl<S> SseStream<S>
430where
431 S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
432{
433 #[inline]
434 fn invalid_utf8_error() -> std::io::Error {
435 std::io::Error::new(
436 std::io::ErrorKind::InvalidData,
437 "Invalid UTF-8 in SSE stream",
438 )
439 }
440
441 fn feed_parsed_chunk(parser: &mut SseParser, pending: &mut VecDeque<SseEvent>, s: &str) {
442 parser.feed_into(s, |event| pending.push_back(event));
443 }
444
445 fn feed_to_pending(&mut self, s: &str) {
446 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
447 }
448
449 fn process_chunk_without_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
450 let mut processed = 0;
451 let mut first_error: Option<std::io::Error> = None;
452 loop {
453 match std::str::from_utf8(&bytes[processed..]) {
454 Ok(s) => {
455 if !s.is_empty() {
456 self.feed_to_pending(s);
457 }
458 return first_error.map_or(Ok(()), Err);
459 }
460 Err(err) => {
461 let valid_len = err.valid_up_to();
462 if valid_len > 0 {
463 let s = std::str::from_utf8(&bytes[processed..processed + valid_len])
464 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
465 self.feed_to_pending(s);
466 processed += valid_len;
467 }
468
469 if let Some(invalid_len) = err.error_len() {
470 processed += invalid_len;
471 if first_error.is_none() {
472 first_error = Some(Self::invalid_utf8_error());
473 }
474 } else {
475 self.utf8_buffer.extend_from_slice(&bytes[processed..]);
476 return first_error.map_or(Ok(()), Err);
477 }
478 }
479 }
480 }
481 }
482
483 fn process_chunk_with_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
484 self.utf8_buffer.extend_from_slice(bytes);
485 let mut processed = 0;
486 let mut first_error: Option<std::io::Error> = None;
487 loop {
488 match std::str::from_utf8(&self.utf8_buffer[processed..]) {
489 Ok(s) => {
490 if !s.is_empty() {
491 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
492 }
493 self.utf8_buffer.clear();
494 return first_error.map_or(Ok(()), Err);
495 }
496 Err(err) => {
497 let valid_len = err.valid_up_to();
498 if valid_len > 0 {
499 let s = std::str::from_utf8(
500 &self.utf8_buffer[processed..processed + valid_len],
501 )
502 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
503 Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
504 processed += valid_len;
505 }
506
507 if let Some(invalid_len) = err.error_len() {
508 processed += invalid_len;
509 if first_error.is_none() {
510 first_error = Some(Self::invalid_utf8_error());
511 }
512 } else {
513 let remaining = self.utf8_buffer.len() - processed;
515 self.utf8_buffer.copy_within(processed.., 0);
516 self.utf8_buffer.truncate(remaining);
517 return first_error.map_or(Ok(()), Err);
518 }
519 }
520 }
521 }
522 }
523
524 fn process_chunk(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
525 if self.utf8_buffer.is_empty() {
526 self.process_chunk_without_utf8_tail(bytes)
527 } else {
528 self.process_chunk_with_utf8_tail(bytes)
529 }
530 }
531
532 fn poll_stream_end(&mut self) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
533 if !self.utf8_buffer.is_empty() {
534 self.utf8_buffer.clear();
537 self.pending_events.clear();
538 self.pending_error = None;
539 self.parser = SseParser::new();
540 return Poll::Ready(Some(Err(std::io::Error::new(
541 std::io::ErrorKind::InvalidData,
542 "Stream ended with incomplete UTF-8 sequence",
543 ))));
544 }
545
546 if let Some(event) = self.parser.flush() {
547 return Poll::Ready(Some(Ok(event)));
548 }
549 Poll::Ready(None)
550 }
551
552 pub fn poll_next_event(
554 mut self: Pin<&mut Self>,
555 cx: &mut Context<'_>,
556 ) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
557 if let Some(event) = self.pending_events.pop_front() {
558 return Poll::Ready(Some(Ok(event)));
559 }
560 if let Some(err) = self.pending_error.take() {
561 if self.pending_error_is_terminal {
562 self.pending_error_is_terminal = false;
563 self.pending_events.clear();
564 self.utf8_buffer.clear();
565 self.parser = SseParser::new();
566 self.terminated = true;
567 }
568 return Poll::Ready(Some(Err(err)));
569 }
570 if self.terminated {
571 return Poll::Ready(None);
572 }
573
574 loop {
575 match Pin::new(&mut self.inner).poll_next(cx) {
576 Poll::Ready(Some(Ok(bytes))) => {
577 if let Err(err) = self.process_chunk(&bytes) {
578 if let Some(event) = self.pending_events.pop_front() {
579 self.pending_error = Some(err);
580 self.pending_error_is_terminal = true;
581 return Poll::Ready(Some(Ok(event)));
582 }
583 self.pending_events.clear();
584 self.utf8_buffer.clear();
585 self.parser = SseParser::new();
586 self.terminated = true;
587 return Poll::Ready(Some(Err(err)));
588 }
589
590 if let Some(event) = self.pending_events.pop_front() {
591 return Poll::Ready(Some(Ok(event)));
592 }
593 }
594 Poll::Ready(Some(Err(e))) => {
595 return Poll::Ready(Some(Err(e)));
596 }
597 Poll::Ready(None) => {
598 return self.poll_stream_end();
599 }
600 Poll::Pending => {
601 return Poll::Pending;
602 }
603 }
604 }
605 }
606}
607
608impl<S> futures::Stream for SseStream<S>
609where
610 S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
611{
612 type Item = Result<SseEvent, std::io::Error>;
613
614 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
615 self.poll_next_event(cx)
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622 use futures::StreamExt;
623 use futures::stream;
624 use proptest::prelude::*;
625 use serde_json::json;
626 use std::fmt::Write as _;
627 use std::io::ErrorKind;
628
629 #[derive(Debug, Clone)]
630 struct GeneratedEvent {
631 event: Option<String>,
632 id: Option<String>,
633 retry: Option<u32>,
634 data: Vec<String>,
635 comment: Option<String>,
636 }
637
638 #[derive(Debug, Clone, Copy)]
639 enum LineEnding {
640 Lf,
641 Cr,
642 CrLf,
643 }
644
645 impl LineEnding {
646 fn as_str(self) -> &'static str {
647 match self {
648 Self::Lf => "\n",
649 Self::Cr => "\r",
650 Self::CrLf => "\r\n",
651 }
652 }
653 }
654
655 impl GeneratedEvent {
656 fn render(&self) -> String {
657 let mut out = String::new();
658 if let Some(comment) = &self.comment {
659 out.push(':');
660 out.push_str(comment);
661 out.push('\n');
662 }
663 if let Some(event) = &self.event {
664 out.push_str("event: ");
665 out.push_str(event);
666 out.push('\n');
667 }
668 if let Some(id) = &self.id {
669 out.push_str("id: ");
670 out.push_str(id);
671 out.push('\n');
672 }
673 if let Some(retry) = &self.retry {
674 out.push_str("retry: ");
675 out.push_str(&retry.to_string());
676 out.push('\n');
677 }
678 for line in &self.data {
679 out.push_str("data: ");
680 out.push_str(line);
681 out.push('\n');
682 }
683 out.push('\n');
684 out
685 }
686 }
687
688 fn ascii_line() -> impl Strategy<Value = String> {
689 "[ -~]{0,24}".prop_map(|s| s)
691 }
692
693 fn event_strategy() -> impl Strategy<Value = GeneratedEvent> {
694 (
695 prop::option::of("[a-z_]{1,12}"),
696 prop::option::of("[0-9]{1,8}"),
697 prop::option::of(0u32..5000),
698 prop::collection::vec(ascii_line(), 1..4),
699 prop::option::of(ascii_line()),
700 )
701 .prop_map(|(event, id, retry, data, comment)| GeneratedEvent {
702 event,
703 id,
704 retry,
705 data,
706 comment,
707 })
708 }
709
710 fn line_ending_strategy() -> impl Strategy<Value = LineEnding> {
711 prop_oneof![
712 Just(LineEnding::Lf),
713 Just(LineEnding::Cr),
714 Just(LineEnding::CrLf),
715 ]
716 }
717
718 fn unicode_line() -> impl Strategy<Value = String> {
719 prop::collection::vec(
720 any::<char>().prop_filter("no CR/LF", |c| *c != '\r' && *c != '\n'),
721 0..24,
722 )
723 .prop_map(|chars| chars.into_iter().collect())
724 }
725
726 fn id_field_strategy() -> impl Strategy<Value = String> {
727 prop_oneof![
728 4 => "[ -~]{0,24}".prop_map(|s| s),
729 1 => ("[ -~]{0,12}", "[ -~]{0,12}").prop_map(|(head, tail)| format!("{head}\0{tail}")),
730 ]
731 }
732
733 fn retry_field_strategy() -> impl Strategy<Value = String> {
734 prop_oneof![
735 6 => (0u64..=50_000u64).prop_map(|n| n.to_string()),
736 2 => (u64::MAX - 10..=u64::MAX).prop_map(|n| n.to_string()),
737 2 => "[a-zA-Z]{1,16}".prop_map(|s| s),
738 1 => "-[0-9]{1,24}".prop_map(|s| s),
739 1 => ((u128::from(u64::MAX) + 1)..=(u128::from(u64::MAX) + 50_000))
740 .prop_map(|n| n.to_string()),
741 1 => Just(String::new()),
742 ]
743 }
744
745 fn oversized_data_len_strategy() -> impl Strategy<Value = usize> {
746 prop_oneof![
748 10 => 1024usize..=65_536usize,
749 5 => 65_537usize..=262_144usize,
750 2 => 262_145usize..=1_048_576usize,
751 1 => 1_048_577usize..=3_145_728usize,
752 ]
753 }
754
755 fn render_stream(events: &[GeneratedEvent], terminal_delimiter: bool) -> String {
756 let mut out = String::new();
757 for event in events {
758 out.push_str(&event.render());
759 }
760 if !terminal_delimiter && out.ends_with('\n') {
761 out.pop();
762 }
763 out
764 }
765
766 fn render_stream_with_line_endings(
767 events: &[GeneratedEvent],
768 terminal_delimiter: bool,
769 line_ending: LineEnding,
770 ) -> String {
771 let canonical = render_stream(events, terminal_delimiter);
772 if matches!(line_ending, LineEnding::Lf) {
773 canonical
774 } else {
775 canonical.replace('\n', line_ending.as_str())
776 }
777 }
778
779 fn parse_all(input: &str) -> Vec<SseEvent> {
780 let mut parser = SseParser::new();
781 let mut events = parser.feed(input);
782 if let Some(event) = parser.flush() {
783 events.push(event);
784 }
785 events
786 }
787
788 fn parse_chunked(input: &str, chunk_sizes: &[usize]) -> Vec<SseEvent> {
789 let mut parser = SseParser::new();
790 let mut events = Vec::new();
791 let bytes = input.as_bytes();
792 let mut start = 0usize;
793
794 for &size in chunk_sizes {
795 if start >= bytes.len() {
796 break;
797 }
798 let end = (start + size).min(bytes.len());
799 let chunk = std::str::from_utf8(&bytes[start..end]).expect("ascii chunks");
800 events.extend(parser.feed(chunk));
801 start = end;
802 }
803
804 if start < bytes.len() {
805 let chunk = std::str::from_utf8(&bytes[start..]).expect("ascii remainder");
806 events.extend(parser.feed(chunk));
807 }
808
809 if let Some(event) = parser.flush() {
810 events.push(event);
811 }
812
813 events
814 }
815
816 fn split_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
817 let mut chunks = Vec::new();
818 let mut start = 0usize;
819
820 for &size in chunk_sizes {
821 if start >= input.len() {
822 break;
823 }
824 let end = (start + size).min(input.len());
825 chunks.push(input[start..end].to_vec());
826 start = end;
827 }
828
829 if start < input.len() {
830 chunks.push(input[start..].to_vec());
831 }
832
833 chunks
834 }
835
836 fn parse_stream_chunks(chunks: Vec<Vec<u8>>) -> (Vec<SseEvent>, Vec<ErrorKind>) {
837 let mut stream = SseStream::new(stream::iter(
838 chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
839 ));
840 let mut events = Vec::new();
841 let mut errors = Vec::new();
842
843 futures::executor::block_on(async {
844 while let Some(item) = stream.next().await {
845 match item {
846 Ok(event) => events.push(event),
847 Err(err) => errors.push(err.kind()),
848 }
849 }
850 });
851
852 (events, errors)
853 }
854
855 fn parse_stream_chunks_limited(
856 chunks: Vec<Vec<u8>>,
857 max_items: usize,
858 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
859 let mut stream = SseStream::new(stream::iter(
860 chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
861 ));
862 let mut events = Vec::new();
863 let mut errors = Vec::new();
864
865 futures::executor::block_on(async {
866 for _ in 0..max_items {
867 let Some(item) = stream.next().await else {
868 break;
869 };
870 match item {
871 Ok(event) => events.push(event),
872 Err(err) => errors.push(err.kind()),
873 }
874 }
875 });
876
877 (events, errors)
878 }
879
880 fn parse_stream_single_chunk(input: &[u8]) -> (Vec<SseEvent>, Vec<ErrorKind>) {
881 parse_stream_chunks(vec![input.to_vec()])
882 }
883
884 fn parse_stream_chunked(
885 input: &[u8],
886 chunk_sizes: &[usize],
887 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
888 let chunks = split_bytes(input, chunk_sizes);
889 parse_stream_chunks(chunks)
890 }
891
892 fn parse_stream_chunked_limited(
893 input: &[u8],
894 chunk_sizes: &[usize],
895 max_items: usize,
896 ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
897 let chunks = split_bytes(input, chunk_sizes);
898 parse_stream_chunks_limited(chunks, max_items)
899 }
900
901 fn diag_json(
902 fixture_id: &str,
903 parser: &SseParser,
904 input: &str,
905 expected: &str,
906 actual: &str,
907 ) -> String {
908 json!({
909 "fixture_id": fixture_id,
910 "seed": "deterministic-static",
911 "env": {
912 "os": std::env::consts::OS,
913 "arch": std::env::consts::ARCH,
914 "cwd": std::env::current_dir().ok().map(|path| path.display().to_string()),
915 },
916 "input_preview": input,
917 "parser_state": {
918 "has_pending": parser.has_pending(),
919 },
920 "expected": expected,
921 "actual": actual,
922 })
923 .to_string()
924 }
925
926 #[test]
927 fn test_simple_event() {
928 let mut parser = SseParser::new();
929 let events = parser.feed("data: hello\n\n");
930 assert_eq!(events.len(), 1);
931 assert_eq!(events[0].event, "message");
932 assert_eq!(events[0].data, "hello");
933 }
934
935 #[test]
936 fn test_multiline_data() {
937 let mut parser = SseParser::new();
938 let events = parser.feed("data: line1\ndata: line2\n\n");
939 assert_eq!(events.len(), 1);
940 assert_eq!(events[0].data, "line1\nline2");
941 }
942
943 #[test]
944 fn test_named_event() {
945 let mut parser = SseParser::new();
946 let events = parser.feed("event: ping\ndata: {}\n\n");
947 assert_eq!(events.len(), 1);
948 assert_eq!(events[0].event, "ping");
949 assert_eq!(events[0].data, "{}");
950 }
951
952 #[test]
953 fn test_event_with_id() {
954 let mut parser = SseParser::new();
955 let events = parser.feed("id: 123\ndata: test\n\n");
956 assert_eq!(events.len(), 1);
957 assert_eq!(events[0].id, Some("123".to_string()));
958 assert_eq!(events[0].data, "test");
959 }
960
961 #[test]
962 fn test_last_event_id_persists_across_dispatched_events() {
963 let mut parser = SseParser::new();
964 let events = parser.feed("id: 123\ndata: first\n\ndata: second\n\n");
965 assert_eq!(events.len(), 2);
966 assert_eq!(events[0].id.as_deref(), Some("123"));
967 assert_eq!(events[1].id.as_deref(), Some("123"));
968 assert_eq!(events[1].data, "second");
969 }
970
971 #[test]
972 fn test_multiple_events() {
973 let mut parser = SseParser::new();
974 let events = parser.feed("data: first\n\ndata: second\n\n");
975 assert_eq!(events.len(), 2);
976 assert_eq!(events[0].data, "first");
977 assert_eq!(events[1].data, "second");
978 }
979
980 #[test]
981 fn test_incremental_feed() {
982 let mut parser = SseParser::new();
983
984 let events = parser.feed("data: hel");
986 assert!(events.is_empty());
987
988 let events = parser.feed("lo\n");
990 assert!(events.is_empty());
991
992 let events = parser.feed("\n");
994 assert_eq!(events.len(), 1);
995 assert_eq!(events[0].data, "hello");
996 }
997
998 #[test]
999 fn test_buffer_no_duplication_on_straddle() {
1000 let mut parser = SseParser::new();
1003
1004 let events = parser.feed("data: start");
1006 assert!(events.is_empty());
1007 assert!(parser.has_pending()); let events = parser.feed("_middle_incomplete");
1011 assert!(events.is_empty());
1012 assert!(parser.has_pending()); let events = parser.feed("\n\n");
1016 assert_eq!(events.len(), 1);
1017 assert_eq!(events[0].data, "start_middle_incomplete");
1018
1019 let events = parser.feed("data: second\n\n");
1021 assert_eq!(events.len(), 1);
1022 assert_eq!(events[0].data, "second");
1023 }
1024
1025 #[test]
1026 fn test_comment_ignored() {
1027 let mut parser = SseParser::new();
1028 let events = parser.feed(":this is a comment\ndata: actual\n\n");
1029 assert_eq!(events.len(), 1);
1030 assert_eq!(events[0].data, "actual");
1031 }
1032
1033 #[test]
1034 fn test_retry_field() {
1035 let mut parser = SseParser::new();
1036 let events = parser.feed("retry: 3000\ndata: test\n\n");
1037 assert_eq!(events.len(), 1);
1038 assert_eq!(events[0].retry, Some(3000));
1039 }
1040
1041 #[test]
1042 fn test_retry_hint_persists_across_dispatched_events() {
1043 let mut parser = SseParser::new();
1044 let events = parser.feed("retry: 3000\ndata: first\n\ndata: second\n\n");
1045 assert_eq!(events.len(), 2);
1046 assert_eq!(events[0].retry, Some(3000));
1047 assert_eq!(events[1].retry, Some(3000));
1048 }
1049
1050 #[test]
1051 fn test_append_data_line_enforces_projected_limit() {
1052 let mut current = SseEvent::default();
1053 let mut has_data = false;
1054
1055 SseParser::append_data_line(&mut current, "ab", &mut has_data, 3);
1056 assert_eq!(current.data, "ab\n");
1057 assert!(has_data);
1058
1059 SseParser::append_data_line(&mut current, "c", &mut has_data, 3);
1060 assert_eq!(current.data, "ab\n");
1061 }
1062
1063 #[test]
1064 fn test_data_cap_single_oversized_line_via_feed() {
1065 let mut parser = SseParser::with_max_event_data_bytes(10);
1068 let events = parser.feed("data: this-is-longer-than-ten-bytes\n\n");
1069 assert_eq!(events.len(), 1);
1070 assert_eq!(events[0].data, "");
1071 }
1072
1073 #[test]
1074 fn test_data_cap_accumulation_via_feed() {
1075 let mut parser = SseParser::with_max_event_data_bytes(10);
1078 let events = parser.feed("data: abc\ndata: def\ndata: ghi\n\n");
1080 assert_eq!(events.len(), 1);
1081 assert_eq!(events[0].data, "abc\ndef");
1084 }
1085
1086 #[test]
1087 fn test_data_cap_exact_boundary_via_feed() {
1088 let mut parser = SseParser::with_max_event_data_bytes(4);
1090 let events = parser.feed("data: abc\n\n");
1092 assert_eq!(events.len(), 1);
1093 assert_eq!(events[0].data, "abc");
1094 }
1095
1096 #[test]
1097 fn test_data_cap_next_event_resets() {
1098 let mut parser = SseParser::with_max_event_data_bytes(6);
1100 let events = parser.feed("data: abcde\ndata: rejected\n\ndata: ok\n\n");
1101 assert_eq!(events.len(), 2);
1102 assert_eq!(events[0].data, "abcde");
1104 assert_eq!(events[1].data, "ok");
1106 }
1107
1108 #[test]
1109 fn test_data_cap_chunked_delivery() {
1110 let mut parser = SseParser::with_max_event_data_bytes(10);
1112 parser.feed("data: abc\n");
1113 parser.feed("data: def\n");
1114 let events = parser.feed("data: toolong\n\n");
1116 assert_eq!(events.len(), 1);
1117 assert_eq!(events[0].data, "abc\ndef");
1118 }
1119
1120 #[test]
1121 fn test_data_cap_flush_path() {
1122 let mut parser = SseParser::with_max_event_data_bytes(6);
1124 parser.feed("data: abcde\n");
1125 parser.feed("data: no\n");
1126 let event = parser.flush().expect("should flush pending event");
1128 assert_eq!(event.data, "abcde");
1129 }
1130
1131 #[test]
1132 fn test_keep_alive_comment_does_not_emit_event() {
1133 let mut parser = SseParser::new();
1134 let events = parser.feed(": keepalive\n\n");
1135 assert!(events.is_empty());
1136 }
1137
1138 #[test]
1139 fn test_crlf_handling() {
1140 let mut parser = SseParser::new();
1141 let events = parser.feed("data: hello\r\n\r\n");
1142 assert_eq!(events.len(), 1);
1143 assert_eq!(events[0].data, "hello");
1144 }
1145
1146 #[test]
1147 fn test_flush_pending() {
1148 let mut parser = SseParser::new();
1149 let events = parser.feed("data: incomplete");
1150 assert!(events.is_empty());
1151 assert!(parser.has_pending());
1152
1153 let event = parser.flush();
1155 assert!(event.is_some());
1156 assert_eq!(event.unwrap().data, "incomplete");
1157 }
1158
1159 #[test]
1160 fn test_event_without_data_is_ignored() {
1161 let mut parser = SseParser::new();
1162 let events = parser.feed("event: ping\n\n");
1163 assert!(
1164 events.is_empty(),
1165 "event block without data should not emit an event"
1166 );
1167 }
1168
1169 #[test]
1170 fn test_unknown_field_is_ignored() {
1171 let mut parser = SseParser::new();
1172 let events = parser.feed("foo: bar\ndata: hello\n\n");
1173 assert_eq!(events.len(), 1);
1174 assert_eq!(events[0].data, "hello");
1175 assert_eq!(events[0].event, "message");
1176 }
1177
1178 #[test]
1179 fn test_error_event_parsing() {
1180 let mut parser = SseParser::new();
1181 let events = parser.feed("event: error\ndata: {\"message\":\"boom\"}\n\n");
1182 assert_eq!(events.len(), 1);
1183 assert_eq!(events[0].event, "error");
1184 assert_eq!(events[0].data, "{\"message\":\"boom\"}");
1185 }
1186
1187 #[test]
1188 fn test_empty_event_field_defaults_to_message() {
1189 let mut parser = SseParser::new();
1190 let input = "event\ndata: hello\n\n";
1191 let events = parser.feed(input);
1192 let diag = diag_json(
1193 "sse-empty-event-field-default",
1194 &parser,
1195 input,
1196 r#"{"event":"message","data":"hello"}"#,
1197 &format!("{events:?}"),
1198 );
1199
1200 assert_eq!(events.len(), 1, "{diag}");
1201 assert_eq!(events[0].event, "message", "{diag}");
1202 assert_eq!(events[0].data, "hello", "{diag}");
1203 }
1204
1205 #[test]
1206 fn test_large_payload_event() {
1207 let mut parser = SseParser::new();
1208 let payload = "x".repeat(128 * 1024);
1209 let input = format!("data: {payload}\n\n");
1210 let events = parser.feed(&input);
1211 assert_eq!(events.len(), 1);
1212 assert_eq!(events[0].data.len(), payload.len());
1213 assert_eq!(events[0].data, payload);
1214 }
1215
1216 #[test]
1217 fn test_buffer_limit_overflow_resets_parser_state() {
1218 let mut parser = SseParser::new();
1219 assert!(parser.feed("data: stale\n").is_empty());
1220
1221 let oversized = "x".repeat(10 * 1024 * 1024 + 1);
1222 let overflow_events = parser.feed(&oversized);
1223 assert_eq!(overflow_events.len(), 1);
1224 assert_eq!(overflow_events[0].event, "error");
1225 assert_eq!(overflow_events[0].data, "SSE buffer limit exceeded");
1226
1227 assert!(!parser.has_pending());
1228 assert!(parser.buffer.capacity() < 1024);
1229 assert!(parser.flush().is_none());
1230
1231 let fresh = parser.feed("data: fresh\n\n");
1232 assert_eq!(fresh.len(), 1);
1233 assert_eq!(fresh[0].data, "fresh");
1234 }
1235
1236 #[test]
1237 fn test_large_complete_chunk_does_not_trip_buffer_limit_fast_path() {
1238 let mut parser = SseParser::new();
1239 let payload = "x".repeat(10 * 1024 * 1024 + 1);
1240 let events = parser.feed(&format!("data: {payload}\n\n"));
1241
1242 assert_eq!(events.len(), 1);
1243 assert_eq!(events[0].event, "message");
1244 assert_eq!(events[0].data.len(), payload.len());
1245 assert_eq!(events[0].data, payload);
1246 assert!(!parser.has_pending());
1247 assert!(parser.buffer.capacity() < 1024);
1248 assert!(parser.flush().is_none());
1249 }
1250
1251 #[test]
1252 fn test_large_complete_chunk_does_not_trip_buffer_limit_with_buffered_prefix() {
1253 let mut parser = SseParser::new();
1254 assert!(parser.feed("data: ").is_empty());
1255
1256 let payload = "x".repeat(10 * 1024 * 1024 + 1);
1257 let events = parser.feed(&format!("{payload}\n\n"));
1258
1259 assert_eq!(events.len(), 1);
1260 assert_eq!(events[0].event, "message");
1261 assert_eq!(events[0].data.len(), payload.len());
1262 assert_eq!(events[0].data, payload);
1263 assert!(!parser.has_pending());
1264 assert!(parser.buffer.capacity() < 1024);
1265 assert!(parser.flush().is_none());
1266 }
1267
1268 #[test]
1269 fn test_rapid_sequential_events() {
1270 let mut parser = SseParser::new();
1271 let mut input = String::new();
1272 for i in 0..200 {
1273 let _ = write!(&mut input, "event: e{i}\ndata: payload{i}\n\n");
1274 }
1275 let events = parser.feed(&input);
1276 assert_eq!(events.len(), 200);
1277 assert_eq!(events[0].event, "e0");
1278 assert_eq!(events[0].data, "payload0");
1279 assert_eq!(events[199].event, "e199");
1280 assert_eq!(events[199].data, "payload199");
1281 }
1282
1283 #[test]
1284 fn test_stream_event_name_matrix() {
1285 let names = [
1286 "message_start",
1287 "content_block_start",
1288 "content_block_delta",
1289 "content_block_stop",
1290 "message_delta",
1291 "message_stop",
1292 "message",
1293 "error",
1294 "ping",
1295 "response.created",
1296 "response.output_text.delta",
1297 "response.completed",
1298 ];
1299
1300 let mut parser = SseParser::new();
1301 let mut input = String::new();
1302 for name in names {
1303 let _ = write!(&mut input, "event: {name}\ndata: {{}}\n\n");
1304 }
1305
1306 let events = parser.feed(&input);
1307 assert_eq!(events.len(), names.len());
1308 for (idx, name) in names.iter().enumerate() {
1309 assert_eq!(events[idx].event, *name);
1310 assert_eq!(events[idx].data, "{}");
1311 }
1312 }
1313
1314 #[test]
1315 fn test_anthropic_style_events() {
1316 let mut parser = SseParser::new();
1317
1318 let events = parser.feed(
1320 r#"event: message_start
1321data: {"type":"message_start","message":{"id":"msg_123"}}
1322
1323event: content_block_start
1324data: {"type":"content_block_start","index":0,"content_block":{"type":"text"}}
1325
1326event: content_block_delta
1327data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
1328
1329event: content_block_stop
1330data: {"type":"content_block_stop","index":0}
1331
1332event: message_stop
1333data: {"type":"message_stop"}
1334
1335"#,
1336 );
1337
1338 assert_eq!(events.len(), 5);
1339 assert_eq!(events[0].event, "message_start");
1340 assert!(events[0].data.contains("message_start"));
1341 assert_eq!(events[1].event, "content_block_start");
1342 assert_eq!(events[2].event, "content_block_delta");
1343 assert!(events[2].data.contains("Hello"));
1344 assert_eq!(events[3].event, "content_block_stop");
1345 assert_eq!(events[4].event, "message_stop");
1346 }
1347
1348 #[test]
1349 fn test_stream_yields_multiple_events_from_one_chunk() {
1350 let bytes = b"data: first\n\ndata: second\n\n".to_vec();
1351 let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1352
1353 futures::executor::block_on(async {
1354 let first = stream.next().await.expect("first event").expect("ok");
1355 assert_eq!(first.data, "first");
1356
1357 let second = stream.next().await.expect("second event").expect("ok");
1358 assert_eq!(second.data, "second");
1359
1360 assert!(stream.next().await.is_none());
1361 });
1362 }
1363
1364 #[test]
1365 fn test_stream_handles_utf8_split_across_chunks() {
1366 let chunks = vec![Ok(b"data: \xE2".to_vec()), Ok(b"\x98\x83\n\n".to_vec())];
1368 let mut stream = SseStream::new(stream::iter(chunks));
1369
1370 futures::executor::block_on(async {
1371 let event = stream.next().await.expect("event").expect("ok");
1372 assert_eq!(event.data, "☃");
1373 assert!(stream.next().await.is_none());
1374 });
1375 }
1376
1377 #[test]
1378 fn test_stream_handles_crlf_split_across_partial_frames() {
1379 let chunks = vec![
1380 Ok(b"data: first\r".to_vec()),
1381 Ok(b"\n".to_vec()),
1382 Ok(b"\r".to_vec()),
1383 Ok(b"\n".to_vec()),
1384 ];
1385 let mut stream = SseStream::new(stream::iter(chunks));
1386
1387 futures::executor::block_on(async {
1388 let first = stream.next().await.expect("first event").expect("ok");
1389 let diag = json!({
1390 "fixture_id": "sse-crlf-split-across-chunks",
1391 "seed": "deterministic-static",
1392 "expected": {"event": "message", "data": "first"},
1393 "actual": {"event": first.event, "data": first.data},
1394 })
1395 .to_string();
1396 assert_eq!(first.data, "first", "{diag}");
1397 assert!(stream.next().await.is_none(), "{diag}");
1398 });
1399 }
1400
1401 #[test]
1402 fn test_stream_flushes_pending_event_at_end() {
1403 let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: last".to_vec())]));
1404
1405 futures::executor::block_on(async {
1406 let event = stream.next().await.expect("event").expect("ok");
1407 assert_eq!(event.data, "last");
1408 assert!(stream.next().await.is_none());
1409 });
1410 }
1411
1412 #[test]
1413 fn test_stream_errors_on_incomplete_utf8_at_end() {
1414 let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: \xE2".to_vec())]));
1415
1416 futures::executor::block_on(async {
1417 let err = stream
1418 .next()
1419 .await
1420 .expect("expected a result")
1421 .expect_err("expected utf8 error");
1422 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1423 assert!(
1424 stream.next().await.is_none(),
1425 "incomplete UTF-8 at EOF should produce a terminal error"
1426 );
1427 });
1428 }
1429
1430 #[test]
1431 fn test_stream_surfaces_pending_event_before_utf8_error() {
1432 let chunks = vec![Ok(b"data: ok\n\ndata: \xFF\n\n".to_vec())];
1437 let mut stream = SseStream::new(stream::iter(chunks));
1438
1439 futures::executor::block_on(async {
1440 let first = stream.next().await.expect("first item").expect("first ok");
1441 let diag = json!({
1442 "fixture_id": "sse-valid-event-before-invalid-utf8",
1443 "seed": "deterministic-static",
1444 "expected_sequence": ["Ok(data=ok)", "Ok(data=)", "Err(invalid utf8)"],
1445 "actual_first": {"event": first.event, "data": first.data},
1446 })
1447 .to_string();
1448 assert_eq!(first.data, "ok", "{diag}");
1449
1450 let second = stream
1453 .next()
1454 .await
1455 .expect("second item")
1456 .expect("second ok");
1457 assert_eq!(second.data, "", "{diag}");
1458
1459 let err = stream
1460 .next()
1461 .await
1462 .expect("third item")
1463 .expect_err("third should be utf8 error");
1464 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData, "{diag}");
1465 });
1466 }
1467
1468 #[test]
1469 fn test_stream_resumes_parsing_remainder_after_utf8_error() {
1470 let mut bytes = b"data: ok\n\n".to_vec();
1477 bytes.push(0xFF);
1478 bytes.extend_from_slice(b"data: after\n\n");
1479
1480 let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1481
1482 futures::executor::block_on(async {
1483 let first = stream.next().await.expect("1").expect("ok");
1485 assert_eq!(first.data, "ok");
1486
1487 let second = stream.next().await.expect("2").expect("after");
1489 assert_eq!(second.data, "after");
1490
1491 let err = stream.next().await.expect("3").expect_err("error");
1493 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1494 });
1495 }
1496
1497 #[test]
1498 fn test_stream_does_not_flush_partial_tail_after_utf8_error() {
1499 let mut bytes = b"data: ok\n\n".to_vec();
1500 bytes.push(0xFF);
1501 bytes.extend_from_slice(b"data: partial");
1502
1503 let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1504
1505 futures::executor::block_on(async {
1506 let first = stream.next().await.expect("1").expect("ok");
1507 assert_eq!(first.data, "ok");
1508
1509 let err = stream.next().await.expect("2").expect_err("error");
1510 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1511 assert!(
1512 stream.next().await.is_none(),
1513 "utf-8 parse errors should terminate the stream without flushing a partial tail"
1514 );
1515 });
1516 }
1517
1518 #[test]
1519 fn test_bom_stripping_with_preceding_empty_chunk() {
1520 let mut parser = SseParser::new();
1521 let events = parser.feed("");
1523 assert!(events.is_empty());
1524
1525 let events = parser.feed("\u{FEFF}data: hello\n\n");
1527 assert_eq!(events.len(), 1);
1528 assert_eq!(events[0].data, "hello");
1529 assert_eq!(events[0].event, "message");
1531 }
1532
1533 #[test]
1539 fn test_fuzz_regression_crash_28de6b() {
1540 let data: &[u8] = &[
1541 0x64, 0x3d, 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x31, 0x0a, 0x5a, 0x61,
1542 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x32, 0x0a, 0x64, 0x61, 0x74, 0x61,
1543 0x3a, 0x20, 0x6c, 0x9f, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1544 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x28, 0xcd, 0xcd, 0xa1,
1545 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1,
1546 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1547 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82,
1548 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x40, 0x82, 0xcd,
1549 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x91,
1550 0x9a, 0x93, 0x69, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1551 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1552 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
1553 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1554 0x00, 0x00, 0x00,
1555 ];
1556 let input = String::from_utf8_lossy(data);
1557
1558 let mut parser_whole = SseParser::new();
1560 let events_whole = parser_whole.feed(&input);
1561 let flush_whole = parser_whole.flush();
1562
1563 let mut parser_char = SseParser::new();
1565 let mut events_char = Vec::new();
1566 for ch in input.chars() {
1567 let mut buf = [0u8; 4];
1568 events_char.extend(parser_char.feed(ch.encode_utf8(&mut buf)));
1569 }
1570 let flush_char = parser_char.flush();
1571
1572 let mid = input.len() / 2;
1574 let mut split_at = mid;
1575 while !input.is_char_boundary(split_at) && split_at < input.len() {
1576 split_at += 1;
1577 }
1578 let (part1, part2) = input.split_at(split_at);
1579 let mut parser_split = SseParser::new();
1580 let mut events_split: Vec<_> = parser_split.feed(part1);
1581 events_split.extend(parser_split.feed(part2));
1582 let flush_split = parser_split.flush();
1583
1584 assert_eq!(events_whole, events_split, "whole vs split events");
1585 assert_eq!(flush_whole, flush_split, "whole vs split flush");
1586 assert_eq!(events_whole, events_char, "whole vs char events");
1587 assert_eq!(flush_whole, flush_char, "whole vs char flush");
1588 }
1589
1590 proptest! {
1591 #![proptest_config(ProptestConfig {
1592 cases: 256,
1593 max_shrink_iters: 200,
1594 .. ProptestConfig::default()
1595 })]
1596
1597 #[test]
1598 fn sse_chunking_invariant(
1599 events in prop::collection::vec(event_strategy(), 1..10),
1600 chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1601 terminal_delimiter in any::<bool>(),
1602 ) {
1603 let input = render_stream(&events, terminal_delimiter);
1604 let expected = parse_all(&input);
1605 let actual = parse_chunked(&input, &chunk_sizes);
1606 prop_assert_eq!(actual, expected);
1607 }
1608
1609 #[test]
1610 fn sse_line_ending_chunking_invariant(
1611 events in prop::collection::vec(event_strategy(), 1..10),
1612 chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1613 terminal_delimiter in any::<bool>(),
1614 line_ending in line_ending_strategy(),
1615 ) {
1616 let input = render_stream_with_line_endings(&events, terminal_delimiter, line_ending);
1617 let expected = parse_all(&input);
1618 let actual = parse_chunked(&input, &chunk_sizes);
1619 prop_assert_eq!(actual, expected);
1620 }
1621
1622 #[test]
1623 fn sse_id_with_null_bytes_is_rejected(
1624 id in id_field_strategy(),
1625 data in ascii_line(),
1626 ) {
1627 let input = format!("id: {id}\ndata: {data}\n\n");
1628 let events = parse_all(&input);
1629
1630 prop_assert_eq!(events.len(), 1);
1631 let expected_id = if id.contains('\0') { None } else { Some(id) };
1632 prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1633 }
1634
1635 #[test]
1636 fn sse_retry_field_fuzz_last_assignment_wins(
1637 retry_values in prop::collection::vec(retry_field_strategy(), 1..6),
1638 data in ascii_line(),
1639 ) {
1640 let mut input = String::new();
1641 for value in &retry_values {
1642 let _ = writeln!(&mut input, "retry: {value}");
1643 }
1644 let _ = writeln!(&mut input, "data: {data}");
1645 input.push('\n');
1646
1647 let events = parse_all(&input);
1648 prop_assert_eq!(events.len(), 1);
1649
1650 let expected_retry = retry_values
1651 .last()
1652 .and_then(|value| value.parse::<u64>().ok());
1653 prop_assert_eq!(events[0].retry, expected_retry);
1654 }
1655
1656 #[test]
1657 fn sse_duplicate_fields_apply_spec_semantics(
1658 event_names in prop::collection::vec("[a-z_]{1,12}", 1..6),
1659 ids in prop::collection::vec(id_field_strategy(), 0..6),
1660 data_lines in prop::collection::vec(ascii_line(), 1..6),
1661 ) {
1662 let mut input = String::new();
1663
1664 for event_name in &event_names {
1665 let _ = writeln!(&mut input, "event: {event_name}");
1666 }
1667 for id in &ids {
1668 let _ = writeln!(&mut input, "id: {id}");
1669 }
1670 for line in &data_lines {
1671 let _ = writeln!(&mut input, "data: {line}");
1672 }
1673 input.push('\n');
1674
1675 let events = parse_all(&input);
1676 prop_assert_eq!(events.len(), 1);
1677 prop_assert_eq!(events[0].event.as_ref(), event_names.last().expect("non-empty"));
1678
1679 let expected_id = ids.iter().rfind(|id| !id.contains('\0')).cloned();
1680 prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1681
1682 let expected_data = data_lines.join("\n");
1683 prop_assert_eq!(events[0].data.as_str(), expected_data);
1684 }
1685
1686 #[test]
1687 fn sse_oversized_data_fields_round_trip(len in oversized_data_len_strategy(),) {
1688 let payload = "x".repeat(len);
1689 let input = format!("data: {payload}\n\n");
1690 let events = parse_all(&input);
1691
1692 prop_assert_eq!(events.len(), 1);
1693 prop_assert_eq!(events[0].data.len(), len);
1694 prop_assert_eq!(events[0].data.as_str(), payload);
1695 }
1696
1697 #[test]
1698 fn sse_stream_utf8_valid_input_chunking_invariant(
1699 lines in prop::collection::vec(unicode_line(), 1..5),
1700 chunk_sizes in prop::collection::vec(1usize..16, 0..24),
1701 ) {
1702 let mut input = String::new();
1703 for line in &lines {
1704 let _ = writeln!(&mut input, "data: {line}");
1705 }
1706 input.push('\n');
1707
1708 let (single_events, single_errors) = parse_stream_single_chunk(input.as_bytes());
1709 let (chunked_events, chunked_errors) =
1710 parse_stream_chunked(input.as_bytes(), &chunk_sizes);
1711
1712 prop_assert!(single_errors.is_empty(), "single-chunk had UTF-8 errors");
1713 prop_assert!(chunked_errors.is_empty(), "chunked parse had UTF-8 errors");
1714 prop_assert_eq!(chunked_events, single_events);
1715 }
1716
1717 #[test]
1718 fn sse_stream_bom_start_stripped_embedded_preserved(
1719 left in unicode_line(),
1720 right in unicode_line(),
1721 chunk_sizes in prop::collection::vec(1usize..8, 0..24),
1722 ) {
1723 let start_bom = format!("\u{FEFF}data: {left}{right}\n\n");
1724 let (start_events, start_errors) =
1725 parse_stream_chunked(start_bom.as_bytes(), &chunk_sizes);
1726 prop_assert!(start_errors.is_empty(), "start BOM should be valid UTF-8");
1727 prop_assert_eq!(start_events.len(), 1);
1728 let expected_start = format!("{left}{right}");
1729 prop_assert_eq!(start_events[0].data.as_str(), expected_start);
1730
1731 let embedded_bom = format!("data: {left}\u{FEFF}{right}\n\n");
1732 let (embedded_events, embedded_errors) =
1733 parse_stream_chunked(embedded_bom.as_bytes(), &chunk_sizes);
1734 prop_assert!(embedded_errors.is_empty(), "embedded BOM should be preserved");
1735 prop_assert_eq!(embedded_events.len(), 1);
1736 let expected_embedded = format!("{left}\u{FEFF}{right}");
1737 prop_assert_eq!(embedded_events[0].data.as_str(), expected_embedded);
1738 }
1739
1740 #[test]
1741 fn sse_stream_invalid_utf8_yields_invalid_data_errors(
1742 prefix in ascii_line(),
1743 suffix in ascii_line(),
1744 invalid_len in 1usize..4,
1745 chunk_sizes in prop::collection::vec(1usize..8, 0..20),
1746 ) {
1747 let mut bytes = format!("data: {prefix}\n\n").into_bytes();
1748 bytes.extend(std::iter::repeat_n(0xFFu8, invalid_len));
1749 bytes.extend(format!("data: {suffix}\n\n").as_bytes());
1750
1751 let (events, errors) = parse_stream_chunked_limited(&bytes, &chunk_sizes, 32);
1752 prop_assert!(
1753 events.iter().any(|event| event.data == prefix),
1754 "event before invalid sequence should still be surfaced"
1755 );
1756 prop_assert!(!errors.is_empty(), "invalid UTF-8 should emit at least one error");
1757 prop_assert!(
1758 errors.iter().all(|kind| *kind == ErrorKind::InvalidData),
1759 "all stream decoding errors must be InvalidData"
1760 );
1761 }
1762 }
1763}