Skip to main content

pi/
sse.rs

1//! Server-Sent Events (SSE) parser for asupersync HTTP client.
2//!
3//! Implements the SSE protocol (text/event-stream) on top of asupersync's
4//! HTTP client for streaming LLM responses.
5
6use std::borrow::Cow;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11/// A parsed SSE event.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct SseEvent {
14    /// Event type (from "event:" field, defaults to "message").
15    pub event: Cow<'static, str>,
16    /// Event data (from "data:" field(s), joined with newlines).
17    pub data: String,
18    /// Last event ID (from "id:" field).
19    pub id: Option<String>,
20    /// Retry interval hint in milliseconds (from "retry:" field).
21    pub retry: Option<u64>,
22}
23
24impl Default for SseEvent {
25    fn default() -> Self {
26        Self {
27            event: Cow::Borrowed("message"),
28            data: String::new(),
29            id: None,
30            retry: None,
31        }
32    }
33}
34
35/// Parser state for SSE stream.
36#[derive(Debug, Default)]
37pub struct SseParser {
38    buffer: String,
39    current: SseEvent,
40    has_data: bool,
41    /// Whether we've already stripped the BOM from the first feed.
42    bom_checked: bool,
43    /// Number of bytes in `buffer` that have already been scanned for newlines.
44    scanned_len: usize,
45}
46
47impl SseParser {
48    /// Create a new SSE parser.
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// Intern common SSE event type names to avoid per-event String allocation.
54    /// LLM streaming APIs use a fixed set of event types; matching them to
55    /// `Cow::Borrowed` static strings eliminates one allocation per event.
56    #[inline]
57    fn intern_event_type(value: &str) -> Cow<'static, str> {
58        match value {
59            "message" => Cow::Borrowed("message"),
60            "message_start" => Cow::Borrowed("message_start"),
61            "message_stop" => Cow::Borrowed("message_stop"),
62            "message_delta" => Cow::Borrowed("message_delta"),
63            "content_block_start" => Cow::Borrowed("content_block_start"),
64            "content_block_delta" => Cow::Borrowed("content_block_delta"),
65            "content_block_stop" => Cow::Borrowed("content_block_stop"),
66            "ping" => Cow::Borrowed("ping"),
67            "error" => Cow::Borrowed("error"),
68            _ => Cow::Owned(value.to_string()),
69        }
70    }
71
72    /// Process a single line of SSE data.
73    fn process_line(line: &str, current: &mut SseEvent, has_data: &mut bool) {
74        // limit event data to 100MB to prevent OOM from malicious/broken streams
75        if current.data.len() > 100 * 1024 * 1024 {
76            return;
77        }
78
79        if let Some(rest) = line.strip_prefix(':') {
80            // Comment line - ignore (but could be used for keep-alive)
81            let _ = rest;
82        } else if let Some((field, value)) = line.split_once(':') {
83            // Field: value
84            let value = value.strip_prefix(' ').unwrap_or(value);
85            match field {
86                "event" => current.event = Self::intern_event_type(value),
87                "data" => {
88                    current.data.push_str(value);
89                    current.data.push('\n');
90                    *has_data = true;
91                }
92                "id" => {
93                    if !value.contains('\0') {
94                        current.id = Some(value.to_string());
95                    }
96                }
97                "retry" => current.retry = value.parse().ok(),
98                _ => {} // Unknown field - ignore
99            }
100        } else {
101            // Field with no value
102            match line {
103                "event" => current.event = Cow::Borrowed(""),
104                "data" => {
105                    current.data.push('\n');
106                    *has_data = true;
107                }
108                "id" => current.id = Some(String::new()),
109                _ => {}
110            }
111        }
112    }
113
114    /// Process complete lines from `source`, dispatching events via `emit`.
115    /// Returns the byte offset of the first unconsumed byte.
116    #[inline]
117    fn process_source<F>(
118        source: &str,
119        scan_start: usize,
120        bom_checked: &mut bool,
121        current: &mut SseEvent,
122        has_data: &mut bool,
123        emit: &mut F,
124    ) -> usize
125    where
126        F: FnMut(SseEvent),
127    {
128        let bytes = source.as_bytes();
129        let mut start = 0usize;
130        let mut search_pos = scan_start;
131
132        // Strip UTF-8 BOM from the beginning of the stream (SSE spec compliance).
133        if !*bom_checked && !source.is_empty() {
134            *bom_checked = true;
135            if source.starts_with('\u{FEFF}') {
136                start = 3;
137                if search_pos < 3 {
138                    search_pos = 3;
139                }
140            }
141        }
142
143        // Use memchr2 to find either \r or \n
144        while let Some(rel_pos) = memchr::memchr2(b'\r', b'\n', &bytes[search_pos..]) {
145            let pos = search_pos + rel_pos;
146            let b = bytes[pos];
147
148            let line_end;
149            let next_start;
150
151            if b == b'\n' {
152                // Bare LF
153                line_end = pos;
154                next_start = pos + 1;
155            } else {
156                // Found \r
157                if pos + 1 < source.len() {
158                    line_end = pos;
159                    next_start = if bytes[pos + 1] == b'\n' {
160                        // CRLF
161                        pos + 2
162                    } else {
163                        // Bare CR
164                        pos + 1
165                    };
166                } else {
167                    // CR at end of buffer - wait for more data to check for \n
168                    break;
169                }
170            }
171
172            let line = &source[start..line_end];
173            start = next_start;
174            search_pos = next_start;
175
176            if line.is_empty() {
177                // Blank line = event boundary
178                if *has_data {
179                    // Trim trailing newline from data
180                    if current.data.ends_with('\n') {
181                        current.data.pop();
182                    }
183                    // Per SSE spec, an empty event name dispatches as "message".
184                    if current.event.is_empty() {
185                        current.event = Cow::Borrowed("message");
186                    }
187                    emit(std::mem::take(current));
188                    *current = SseEvent::default();
189                    *has_data = false;
190                }
191            } else {
192                Self::process_line(line, current, has_data);
193            }
194        }
195
196        start
197    }
198
199    /// Feed data to the parser and emit any complete events to `emit`.
200    fn feed_into<F>(&mut self, data: &str, mut emit: F)
201    where
202        F: FnMut(SseEvent),
203    {
204        const MAX_BUFFER_SIZE: usize = 10 * 1024 * 1024;
205        if self.buffer.len() + data.len() > MAX_BUFFER_SIZE {
206            self.buffer.clear();
207            self.current = SseEvent::default();
208            self.has_data = false;
209            self.bom_checked = false;
210            self.scanned_len = 0;
211            emit(SseEvent {
212                event: Cow::Borrowed("error"),
213                data: "SSE buffer limit exceeded".to_string(),
214                ..Default::default()
215            });
216            return;
217        }
218
219        if self.buffer.is_empty() {
220            // Fast path: process data directly without copying to buffer.
221            let consumed = Self::process_source(
222                data,
223                0,
224                &mut self.bom_checked,
225                &mut self.current,
226                &mut self.has_data,
227                &mut emit,
228            );
229            if consumed < data.len() {
230                self.buffer.push_str(&data[consumed..]);
231            }
232        } else {
233            // Slow path: combine with existing buffered data, then process.
234            self.buffer.push_str(data);
235            // Re-scan from the last safe point (minus 1 to handle split CRLF).
236            let scan_start = self.scanned_len.saturating_sub(1);
237            let consumed = Self::process_source(
238                &self.buffer,
239                scan_start,
240                &mut self.bom_checked,
241                &mut self.current,
242                &mut self.has_data,
243                &mut emit,
244            );
245            if consumed > 0 {
246                self.buffer.drain(..consumed);
247            }
248        }
249        // Whether we drained or not, the entire remaining buffer has been scanned.
250        self.scanned_len = self.buffer.len();
251    }
252
253    /// Feed data to the parser and extract any complete events.
254    ///
255    /// Returns a vector of parsed events. Events are delimited by blank lines.
256    pub fn feed(&mut self, data: &str) -> Vec<SseEvent> {
257        let mut events = Vec::with_capacity(4);
258        self.feed_into(data, |event| events.push(event));
259        events
260    }
261
262    /// Check if the parser has any pending data.
263    pub fn has_pending(&self) -> bool {
264        !self.buffer.is_empty() || self.has_data
265    }
266
267    /// Flush any pending event (called when stream ends).
268    pub fn flush(&mut self) -> Option<SseEvent> {
269        // First, process any remaining buffer content that doesn't end with newline
270        if !self.buffer.is_empty() {
271            let line = std::mem::take(&mut self.buffer);
272            let line = line.trim_end_matches('\r');
273            Self::process_line(line, &mut self.current, &mut self.has_data);
274        }
275
276        if self.has_data {
277            if self.current.data.ends_with('\n') {
278                self.current.data.pop();
279            }
280            if self.current.event.is_empty() {
281                self.current.event = Cow::Borrowed("message");
282            }
283            let event = std::mem::take(&mut self.current);
284            self.current = SseEvent::default();
285            self.has_data = false;
286            Some(event)
287        } else {
288            None
289        }
290    }
291}
292
293/// Stream wrapper for SSE events.
294///
295/// Converts a byte stream into an SSE event stream.
296pub struct SseStream<S> {
297    inner: S,
298    parser: SseParser,
299    pending_events: VecDeque<SseEvent>,
300    pending_error: Option<std::io::Error>,
301    utf8_buffer: Vec<u8>,
302}
303
304impl<S> SseStream<S> {
305    /// Create a new SSE stream from a byte stream.
306    pub fn new(inner: S) -> Self {
307        Self {
308            inner,
309            parser: SseParser::new(),
310            pending_events: VecDeque::new(),
311            pending_error: None,
312            utf8_buffer: Vec::new(),
313        }
314    }
315}
316
317impl<S> SseStream<S>
318where
319    S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
320{
321    #[inline]
322    fn invalid_utf8_error() -> std::io::Error {
323        std::io::Error::new(
324            std::io::ErrorKind::InvalidData,
325            "Invalid UTF-8 in SSE stream",
326        )
327    }
328
329    fn feed_parsed_chunk(parser: &mut SseParser, pending: &mut VecDeque<SseEvent>, s: &str) {
330        parser.feed_into(s, |event| pending.push_back(event));
331    }
332
333    fn feed_to_pending(&mut self, s: &str) {
334        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
335    }
336
337    fn process_chunk_without_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
338        let mut processed = 0;
339        let mut first_error: Option<std::io::Error> = None;
340        loop {
341            match std::str::from_utf8(&bytes[processed..]) {
342                Ok(s) => {
343                    if !s.is_empty() {
344                        self.feed_to_pending(s);
345                    }
346                    return first_error.map_or(Ok(()), Err);
347                }
348                Err(err) => {
349                    let valid_len = err.valid_up_to();
350                    if valid_len > 0 {
351                        let s = std::str::from_utf8(&bytes[processed..processed + valid_len])
352                            .expect("valid utf8 prefix");
353                        self.feed_to_pending(s);
354                        processed += valid_len;
355                    }
356
357                    if let Some(invalid_len) = err.error_len() {
358                        processed += invalid_len;
359                        if first_error.is_none() {
360                            first_error = Some(Self::invalid_utf8_error());
361                        }
362                    } else {
363                        self.utf8_buffer.extend_from_slice(&bytes[processed..]);
364                        return first_error.map_or(Ok(()), Err);
365                    }
366                }
367            }
368        }
369    }
370
371    fn process_chunk_with_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
372        self.utf8_buffer.extend_from_slice(bytes);
373        let mut processed = 0;
374        let mut first_error: Option<std::io::Error> = None;
375        loop {
376            match std::str::from_utf8(&self.utf8_buffer[processed..]) {
377                Ok(s) => {
378                    if !s.is_empty() {
379                        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
380                    }
381                    self.utf8_buffer.clear();
382                    return first_error.map_or(Ok(()), Err);
383                }
384                Err(err) => {
385                    let valid_len = err.valid_up_to();
386                    if valid_len > 0 {
387                        let s = std::str::from_utf8(
388                            &self.utf8_buffer[processed..processed + valid_len],
389                        )
390                        .expect("valid utf8 prefix");
391                        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
392                        processed += valid_len;
393                    }
394
395                    if let Some(invalid_len) = err.error_len() {
396                        processed += invalid_len;
397                        if first_error.is_none() {
398                            first_error = Some(Self::invalid_utf8_error());
399                        }
400                    } else {
401                        // Move remaining bytes to start of utf8_buffer
402                        let remaining = self.utf8_buffer.len() - processed;
403                        self.utf8_buffer.copy_within(processed.., 0);
404                        self.utf8_buffer.truncate(remaining);
405                        return first_error.map_or(Ok(()), Err);
406                    }
407                }
408            }
409        }
410    }
411
412    fn process_chunk(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
413        if self.utf8_buffer.is_empty() {
414            self.process_chunk_without_utf8_tail(bytes)
415        } else {
416            self.process_chunk_with_utf8_tail(bytes)
417        }
418    }
419
420    fn poll_stream_end(&mut self) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
421        if !self.utf8_buffer.is_empty() {
422            // EOF with an incomplete UTF-8 tail is a terminal stream error.
423            // Clear parser state so repeated polls don't emit the same error forever.
424            self.utf8_buffer.clear();
425            self.pending_events.clear();
426            self.pending_error = None;
427            self.parser = SseParser::new();
428            return Poll::Ready(Some(Err(std::io::Error::new(
429                std::io::ErrorKind::InvalidData,
430                "Stream ended with incomplete UTF-8 sequence",
431            ))));
432        }
433
434        if let Some(event) = self.parser.flush() {
435            return Poll::Ready(Some(Ok(event)));
436        }
437        Poll::Ready(None)
438    }
439
440    /// Poll for the next SSE event.
441    pub fn poll_next_event(
442        mut self: Pin<&mut Self>,
443        cx: &mut Context<'_>,
444    ) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
445        if let Some(event) = self.pending_events.pop_front() {
446            return Poll::Ready(Some(Ok(event)));
447        }
448        if let Some(err) = self.pending_error.take() {
449            return Poll::Ready(Some(Err(err)));
450        }
451
452        loop {
453            match Pin::new(&mut self.inner).poll_next(cx) {
454                Poll::Ready(Some(Ok(bytes))) => {
455                    if let Err(err) = self.process_chunk(&bytes) {
456                        if let Some(event) = self.pending_events.pop_front() {
457                            self.pending_error = Some(err);
458                            return Poll::Ready(Some(Ok(event)));
459                        }
460                        return Poll::Ready(Some(Err(err)));
461                    }
462
463                    if let Some(event) = self.pending_events.pop_front() {
464                        return Poll::Ready(Some(Ok(event)));
465                    }
466                }
467                Poll::Ready(Some(Err(e))) => {
468                    return Poll::Ready(Some(Err(e)));
469                }
470                Poll::Ready(None) => {
471                    return self.poll_stream_end();
472                }
473                Poll::Pending => {
474                    return Poll::Pending;
475                }
476            }
477        }
478    }
479}
480
481impl<S> futures::Stream for SseStream<S>
482where
483    S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
484{
485    type Item = Result<SseEvent, std::io::Error>;
486
487    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
488        self.poll_next_event(cx)
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use futures::StreamExt;
496    use futures::stream;
497    use proptest::prelude::*;
498    use serde_json::json;
499    use std::fmt::Write as _;
500    use std::io::ErrorKind;
501
502    #[derive(Debug, Clone)]
503    struct GeneratedEvent {
504        event: Option<String>,
505        id: Option<String>,
506        retry: Option<u32>,
507        data: Vec<String>,
508        comment: Option<String>,
509    }
510
511    #[derive(Debug, Clone, Copy)]
512    enum LineEnding {
513        Lf,
514        Cr,
515        CrLf,
516    }
517
518    impl LineEnding {
519        fn as_str(self) -> &'static str {
520            match self {
521                Self::Lf => "\n",
522                Self::Cr => "\r",
523                Self::CrLf => "\r\n",
524            }
525        }
526    }
527
528    impl GeneratedEvent {
529        fn render(&self) -> String {
530            let mut out = String::new();
531            if let Some(comment) = &self.comment {
532                out.push(':');
533                out.push_str(comment);
534                out.push('\n');
535            }
536            if let Some(event) = &self.event {
537                out.push_str("event: ");
538                out.push_str(event);
539                out.push('\n');
540            }
541            if let Some(id) = &self.id {
542                out.push_str("id: ");
543                out.push_str(id);
544                out.push('\n');
545            }
546            if let Some(retry) = &self.retry {
547                out.push_str("retry: ");
548                out.push_str(&retry.to_string());
549                out.push('\n');
550            }
551            for line in &self.data {
552                out.push_str("data: ");
553                out.push_str(line);
554                out.push('\n');
555            }
556            out.push('\n');
557            out
558        }
559    }
560
561    fn ascii_line() -> impl Strategy<Value = String> {
562        // ASCII printable range (no CR/LF), keeps chunking safe with byte splits.
563        "[ -~]{0,24}".prop_map(|s| s)
564    }
565
566    fn event_strategy() -> impl Strategy<Value = GeneratedEvent> {
567        (
568            prop::option::of("[a-z_]{1,12}"),
569            prop::option::of("[0-9]{1,8}"),
570            prop::option::of(0u32..5000),
571            prop::collection::vec(ascii_line(), 1..4),
572            prop::option::of(ascii_line()),
573        )
574            .prop_map(|(event, id, retry, data, comment)| GeneratedEvent {
575                event,
576                id,
577                retry,
578                data,
579                comment,
580            })
581    }
582
583    fn line_ending_strategy() -> impl Strategy<Value = LineEnding> {
584        prop_oneof![
585            Just(LineEnding::Lf),
586            Just(LineEnding::Cr),
587            Just(LineEnding::CrLf),
588        ]
589    }
590
591    fn unicode_line() -> impl Strategy<Value = String> {
592        prop::collection::vec(
593            any::<char>().prop_filter("no CR/LF", |c| *c != '\r' && *c != '\n'),
594            0..24,
595        )
596        .prop_map(|chars| chars.into_iter().collect())
597    }
598
599    fn id_field_strategy() -> impl Strategy<Value = String> {
600        prop_oneof![
601            4 => "[ -~]{0,24}".prop_map(|s| s),
602            1 => ("[ -~]{0,12}", "[ -~]{0,12}").prop_map(|(head, tail)| format!("{head}\0{tail}")),
603        ]
604    }
605
606    fn retry_field_strategy() -> impl Strategy<Value = String> {
607        prop_oneof![
608            6 => (0u64..=50_000u64).prop_map(|n| n.to_string()),
609            2 => (u64::MAX - 10..=u64::MAX).prop_map(|n| n.to_string()),
610            2 => "[a-zA-Z]{1,16}".prop_map(|s| s),
611            1 => "-[0-9]{1,24}".prop_map(|s| s),
612            1 => ((u128::from(u64::MAX) + 1)..=(u128::from(u64::MAX) + 50_000))
613                .prop_map(|n| n.to_string()),
614            1 => Just(String::new()),
615        ]
616    }
617
618    fn oversized_data_len_strategy() -> impl Strategy<Value = usize> {
619        // Keep average runtime reasonable while still exercising multi-megabyte inputs.
620        prop_oneof![
621            10 => 1024usize..=65_536usize,
622            5 => 65_537usize..=262_144usize,
623            2 => 262_145usize..=1_048_576usize,
624            1 => 1_048_577usize..=3_145_728usize,
625        ]
626    }
627
628    fn render_stream(events: &[GeneratedEvent], terminal_delimiter: bool) -> String {
629        let mut out = String::new();
630        for event in events {
631            out.push_str(&event.render());
632        }
633        if !terminal_delimiter && out.ends_with('\n') {
634            out.pop();
635        }
636        out
637    }
638
639    fn render_stream_with_line_endings(
640        events: &[GeneratedEvent],
641        terminal_delimiter: bool,
642        line_ending: LineEnding,
643    ) -> String {
644        let canonical = render_stream(events, terminal_delimiter);
645        if matches!(line_ending, LineEnding::Lf) {
646            canonical
647        } else {
648            canonical.replace('\n', line_ending.as_str())
649        }
650    }
651
652    fn parse_all(input: &str) -> Vec<SseEvent> {
653        let mut parser = SseParser::new();
654        let mut events = parser.feed(input);
655        if let Some(event) = parser.flush() {
656            events.push(event);
657        }
658        events
659    }
660
661    fn parse_chunked(input: &str, chunk_sizes: &[usize]) -> Vec<SseEvent> {
662        let mut parser = SseParser::new();
663        let mut events = Vec::new();
664        let bytes = input.as_bytes();
665        let mut start = 0usize;
666
667        for &size in chunk_sizes {
668            if start >= bytes.len() {
669                break;
670            }
671            let end = (start + size).min(bytes.len());
672            let chunk = std::str::from_utf8(&bytes[start..end]).expect("ascii chunks");
673            events.extend(parser.feed(chunk));
674            start = end;
675        }
676
677        if start < bytes.len() {
678            let chunk = std::str::from_utf8(&bytes[start..]).expect("ascii remainder");
679            events.extend(parser.feed(chunk));
680        }
681
682        if let Some(event) = parser.flush() {
683            events.push(event);
684        }
685
686        events
687    }
688
689    fn split_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
690        let mut chunks = Vec::new();
691        let mut start = 0usize;
692
693        for &size in chunk_sizes {
694            if start >= input.len() {
695                break;
696            }
697            let end = (start + size).min(input.len());
698            chunks.push(input[start..end].to_vec());
699            start = end;
700        }
701
702        if start < input.len() {
703            chunks.push(input[start..].to_vec());
704        }
705
706        chunks
707    }
708
709    fn parse_stream_chunks(chunks: Vec<Vec<u8>>) -> (Vec<SseEvent>, Vec<ErrorKind>) {
710        let mut stream = SseStream::new(stream::iter(
711            chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
712        ));
713        let mut events = Vec::new();
714        let mut errors = Vec::new();
715
716        futures::executor::block_on(async {
717            while let Some(item) = stream.next().await {
718                match item {
719                    Ok(event) => events.push(event),
720                    Err(err) => errors.push(err.kind()),
721                }
722            }
723        });
724
725        (events, errors)
726    }
727
728    fn parse_stream_chunks_limited(
729        chunks: Vec<Vec<u8>>,
730        max_items: usize,
731    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
732        let mut stream = SseStream::new(stream::iter(
733            chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
734        ));
735        let mut events = Vec::new();
736        let mut errors = Vec::new();
737
738        futures::executor::block_on(async {
739            for _ in 0..max_items {
740                let Some(item) = stream.next().await else {
741                    break;
742                };
743                match item {
744                    Ok(event) => events.push(event),
745                    Err(err) => errors.push(err.kind()),
746                }
747            }
748        });
749
750        (events, errors)
751    }
752
753    fn parse_stream_single_chunk(input: &[u8]) -> (Vec<SseEvent>, Vec<ErrorKind>) {
754        parse_stream_chunks(vec![input.to_vec()])
755    }
756
757    fn parse_stream_chunked(
758        input: &[u8],
759        chunk_sizes: &[usize],
760    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
761        let chunks = split_bytes(input, chunk_sizes);
762        parse_stream_chunks(chunks)
763    }
764
765    fn parse_stream_chunked_limited(
766        input: &[u8],
767        chunk_sizes: &[usize],
768        max_items: usize,
769    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
770        let chunks = split_bytes(input, chunk_sizes);
771        parse_stream_chunks_limited(chunks, max_items)
772    }
773
774    fn diag_json(
775        fixture_id: &str,
776        parser: &SseParser,
777        input: &str,
778        expected: &str,
779        actual: &str,
780    ) -> String {
781        json!({
782            "fixture_id": fixture_id,
783            "seed": "deterministic-static",
784            "env": {
785                "os": std::env::consts::OS,
786                "arch": std::env::consts::ARCH,
787                "cwd": std::env::current_dir().ok().map(|path| path.display().to_string()),
788            },
789            "input_preview": input,
790            "parser_state": {
791                "has_pending": parser.has_pending(),
792            },
793            "expected": expected,
794            "actual": actual,
795        })
796        .to_string()
797    }
798
799    #[test]
800    fn test_simple_event() {
801        let mut parser = SseParser::new();
802        let events = parser.feed("data: hello\n\n");
803        assert_eq!(events.len(), 1);
804        assert_eq!(events[0].event, "message");
805        assert_eq!(events[0].data, "hello");
806    }
807
808    #[test]
809    fn test_multiline_data() {
810        let mut parser = SseParser::new();
811        let events = parser.feed("data: line1\ndata: line2\n\n");
812        assert_eq!(events.len(), 1);
813        assert_eq!(events[0].data, "line1\nline2");
814    }
815
816    #[test]
817    fn test_named_event() {
818        let mut parser = SseParser::new();
819        let events = parser.feed("event: ping\ndata: {}\n\n");
820        assert_eq!(events.len(), 1);
821        assert_eq!(events[0].event, "ping");
822        assert_eq!(events[0].data, "{}");
823    }
824
825    #[test]
826    fn test_event_with_id() {
827        let mut parser = SseParser::new();
828        let events = parser.feed("id: 123\ndata: test\n\n");
829        assert_eq!(events.len(), 1);
830        assert_eq!(events[0].id, Some("123".to_string()));
831        assert_eq!(events[0].data, "test");
832    }
833
834    #[test]
835    fn test_multiple_events() {
836        let mut parser = SseParser::new();
837        let events = parser.feed("data: first\n\ndata: second\n\n");
838        assert_eq!(events.len(), 2);
839        assert_eq!(events[0].data, "first");
840        assert_eq!(events[1].data, "second");
841    }
842
843    #[test]
844    fn test_incremental_feed() {
845        let mut parser = SseParser::new();
846
847        // Feed partial data
848        let events = parser.feed("data: hel");
849        assert!(events.is_empty());
850
851        // Feed more
852        let events = parser.feed("lo\n");
853        assert!(events.is_empty());
854
855        // Feed blank line to complete event
856        let events = parser.feed("\n");
857        assert_eq!(events.len(), 1);
858        assert_eq!(events[0].data, "hello");
859    }
860
861    #[test]
862    fn test_comment_ignored() {
863        let mut parser = SseParser::new();
864        let events = parser.feed(":this is a comment\ndata: actual\n\n");
865        assert_eq!(events.len(), 1);
866        assert_eq!(events[0].data, "actual");
867    }
868
869    #[test]
870    fn test_retry_field() {
871        let mut parser = SseParser::new();
872        let events = parser.feed("retry: 3000\ndata: test\n\n");
873        assert_eq!(events.len(), 1);
874        assert_eq!(events[0].retry, Some(3000));
875    }
876
877    #[test]
878    fn test_keep_alive_comment_does_not_emit_event() {
879        let mut parser = SseParser::new();
880        let events = parser.feed(": keepalive\n\n");
881        assert!(events.is_empty());
882    }
883
884    #[test]
885    fn test_crlf_handling() {
886        let mut parser = SseParser::new();
887        let events = parser.feed("data: hello\r\n\r\n");
888        assert_eq!(events.len(), 1);
889        assert_eq!(events[0].data, "hello");
890    }
891
892    #[test]
893    fn test_flush_pending() {
894        let mut parser = SseParser::new();
895        let events = parser.feed("data: incomplete");
896        assert!(events.is_empty());
897        assert!(parser.has_pending());
898
899        // Flush at stream end
900        let event = parser.flush();
901        assert!(event.is_some());
902        assert_eq!(event.unwrap().data, "incomplete");
903    }
904
905    #[test]
906    fn test_event_without_data_is_ignored() {
907        let mut parser = SseParser::new();
908        let events = parser.feed("event: ping\n\n");
909        assert!(
910            events.is_empty(),
911            "event block without data should not emit an event"
912        );
913    }
914
915    #[test]
916    fn test_unknown_field_is_ignored() {
917        let mut parser = SseParser::new();
918        let events = parser.feed("foo: bar\ndata: hello\n\n");
919        assert_eq!(events.len(), 1);
920        assert_eq!(events[0].data, "hello");
921        assert_eq!(events[0].event, "message");
922    }
923
924    #[test]
925    fn test_error_event_parsing() {
926        let mut parser = SseParser::new();
927        let events = parser.feed("event: error\ndata: {\"message\":\"boom\"}\n\n");
928        assert_eq!(events.len(), 1);
929        assert_eq!(events[0].event, "error");
930        assert_eq!(events[0].data, "{\"message\":\"boom\"}");
931    }
932
933    #[test]
934    fn test_empty_event_field_defaults_to_message() {
935        let mut parser = SseParser::new();
936        let input = "event\ndata: hello\n\n";
937        let events = parser.feed(input);
938        let diag = diag_json(
939            "sse-empty-event-field-default",
940            &parser,
941            input,
942            r#"{"event":"message","data":"hello"}"#,
943            &format!("{events:?}"),
944        );
945
946        assert_eq!(events.len(), 1, "{diag}");
947        assert_eq!(events[0].event, "message", "{diag}");
948        assert_eq!(events[0].data, "hello", "{diag}");
949    }
950
951    #[test]
952    fn test_large_payload_event() {
953        let mut parser = SseParser::new();
954        let payload = "x".repeat(128 * 1024);
955        let input = format!("data: {payload}\n\n");
956        let events = parser.feed(&input);
957        assert_eq!(events.len(), 1);
958        assert_eq!(events[0].data.len(), payload.len());
959        assert_eq!(events[0].data, payload);
960    }
961
962    #[test]
963    fn test_buffer_limit_overflow_resets_parser_state() {
964        let mut parser = SseParser::new();
965        assert!(parser.feed("data: stale\n").is_empty());
966
967        let oversized = "x".repeat(10 * 1024 * 1024 + 1);
968        let overflow_events = parser.feed(&oversized);
969        assert_eq!(overflow_events.len(), 1);
970        assert_eq!(overflow_events[0].event, "error");
971        assert_eq!(overflow_events[0].data, "SSE buffer limit exceeded");
972
973        assert!(!parser.has_pending());
974        assert!(parser.flush().is_none());
975
976        let fresh = parser.feed("data: fresh\n\n");
977        assert_eq!(fresh.len(), 1);
978        assert_eq!(fresh[0].data, "fresh");
979    }
980
981    #[test]
982    fn test_rapid_sequential_events() {
983        let mut parser = SseParser::new();
984        let mut input = String::new();
985        for i in 0..200 {
986            let _ = write!(&mut input, "event: e{i}\ndata: payload{i}\n\n");
987        }
988        let events = parser.feed(&input);
989        assert_eq!(events.len(), 200);
990        assert_eq!(events[0].event, "e0");
991        assert_eq!(events[0].data, "payload0");
992        assert_eq!(events[199].event, "e199");
993        assert_eq!(events[199].data, "payload199");
994    }
995
996    #[test]
997    fn test_stream_event_name_matrix() {
998        let names = [
999            "message_start",
1000            "content_block_start",
1001            "content_block_delta",
1002            "content_block_stop",
1003            "message_delta",
1004            "message_stop",
1005            "message",
1006            "error",
1007            "ping",
1008            "response.created",
1009            "response.output_text.delta",
1010            "response.completed",
1011        ];
1012
1013        let mut parser = SseParser::new();
1014        let mut input = String::new();
1015        for name in names {
1016            let _ = write!(&mut input, "event: {name}\ndata: {{}}\n\n");
1017        }
1018
1019        let events = parser.feed(&input);
1020        assert_eq!(events.len(), names.len());
1021        for (idx, name) in names.iter().enumerate() {
1022            assert_eq!(events[idx].event, *name);
1023            assert_eq!(events[idx].data, "{}");
1024        }
1025    }
1026
1027    #[test]
1028    fn test_anthropic_style_events() {
1029        let mut parser = SseParser::new();
1030
1031        // Simulate Anthropic API response
1032        let events = parser.feed(
1033            r#"event: message_start
1034data: {"type":"message_start","message":{"id":"msg_123"}}
1035
1036event: content_block_start
1037data: {"type":"content_block_start","index":0,"content_block":{"type":"text"}}
1038
1039event: content_block_delta
1040data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
1041
1042event: content_block_stop
1043data: {"type":"content_block_stop","index":0}
1044
1045event: message_stop
1046data: {"type":"message_stop"}
1047
1048"#,
1049        );
1050
1051        assert_eq!(events.len(), 5);
1052        assert_eq!(events[0].event, "message_start");
1053        assert!(events[0].data.contains("message_start"));
1054        assert_eq!(events[1].event, "content_block_start");
1055        assert_eq!(events[2].event, "content_block_delta");
1056        assert!(events[2].data.contains("Hello"));
1057        assert_eq!(events[3].event, "content_block_stop");
1058        assert_eq!(events[4].event, "message_stop");
1059    }
1060
1061    #[test]
1062    fn test_stream_yields_multiple_events_from_one_chunk() {
1063        let bytes = b"data: first\n\ndata: second\n\n".to_vec();
1064        let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1065
1066        futures::executor::block_on(async {
1067            let first = stream.next().await.expect("first event").expect("ok");
1068            assert_eq!(first.data, "first");
1069
1070            let second = stream.next().await.expect("second event").expect("ok");
1071            assert_eq!(second.data, "second");
1072
1073            assert!(stream.next().await.is_none());
1074        });
1075    }
1076
1077    #[test]
1078    fn test_stream_handles_utf8_split_across_chunks() {
1079        // Snowman is a 3-byte UTF-8 sequence: E2 98 83. Split it across chunks.
1080        let chunks = vec![Ok(b"data: \xE2".to_vec()), Ok(b"\x98\x83\n\n".to_vec())];
1081        let mut stream = SseStream::new(stream::iter(chunks));
1082
1083        futures::executor::block_on(async {
1084            let event = stream.next().await.expect("event").expect("ok");
1085            assert_eq!(event.data, "☃");
1086            assert!(stream.next().await.is_none());
1087        });
1088    }
1089
1090    #[test]
1091    fn test_stream_handles_crlf_split_across_partial_frames() {
1092        let chunks = vec![
1093            Ok(b"data: first\r".to_vec()),
1094            Ok(b"\n".to_vec()),
1095            Ok(b"\r".to_vec()),
1096            Ok(b"\n".to_vec()),
1097        ];
1098        let mut stream = SseStream::new(stream::iter(chunks));
1099
1100        futures::executor::block_on(async {
1101            let first = stream.next().await.expect("first event").expect("ok");
1102            let diag = json!({
1103                "fixture_id": "sse-crlf-split-across-chunks",
1104                "seed": "deterministic-static",
1105                "expected": {"event": "message", "data": "first"},
1106                "actual": {"event": first.event, "data": first.data},
1107            })
1108            .to_string();
1109            assert_eq!(first.data, "first", "{diag}");
1110            assert!(stream.next().await.is_none(), "{diag}");
1111        });
1112    }
1113
1114    #[test]
1115    fn test_stream_flushes_pending_event_at_end() {
1116        let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: last".to_vec())]));
1117
1118        futures::executor::block_on(async {
1119            let event = stream.next().await.expect("event").expect("ok");
1120            assert_eq!(event.data, "last");
1121            assert!(stream.next().await.is_none());
1122        });
1123    }
1124
1125    #[test]
1126    fn test_stream_errors_on_incomplete_utf8_at_end() {
1127        let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: \xE2".to_vec())]));
1128
1129        futures::executor::block_on(async {
1130            let err = stream
1131                .next()
1132                .await
1133                .expect("expected a result")
1134                .expect_err("expected utf8 error");
1135            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1136            assert!(
1137                stream.next().await.is_none(),
1138                "incomplete UTF-8 at EOF should produce a terminal error"
1139            );
1140        });
1141    }
1142
1143    #[test]
1144    fn test_stream_surfaces_pending_event_before_utf8_error() {
1145        // Input: "data: ok\n\ndata: \xFF\n\n"
1146        // The parser feeds valid prefix "data: ok\n\ndata: " → emits event("ok"),
1147        // then recovers remainder "\n\n" after the 0xFF → completes partial "data: "
1148        // → emits event(""). All pending events drain before the error.
1149        let chunks = vec![Ok(b"data: ok\n\ndata: \xFF\n\n".to_vec())];
1150        let mut stream = SseStream::new(stream::iter(chunks));
1151
1152        futures::executor::block_on(async {
1153            let first = stream.next().await.expect("first item").expect("first ok");
1154            let diag = json!({
1155                "fixture_id": "sse-valid-event-before-invalid-utf8",
1156                "seed": "deterministic-static",
1157                "expected_sequence": ["Ok(data=ok)", "Ok(data=)", "Err(invalid utf8)"],
1158                "actual_first": {"event": first.event, "data": first.data},
1159            })
1160            .to_string();
1161            assert_eq!(first.data, "ok", "{diag}");
1162
1163            // The recovered remainder "\n\n" completes the partial "data: " line,
1164            // producing an empty-data event before the error surfaces.
1165            let second = stream
1166                .next()
1167                .await
1168                .expect("second item")
1169                .expect("second ok");
1170            assert_eq!(second.data, "", "{diag}");
1171
1172            let err = stream
1173                .next()
1174                .await
1175                .expect("third item")
1176                .expect_err("third should be utf8 error");
1177            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData, "{diag}");
1178        });
1179    }
1180
1181    #[test]
1182    fn test_stream_resumes_parsing_remainder_after_utf8_error() {
1183        // "data: ok\n\n" (valid) + 0xFF (invalid) + "data: after\n\n" (valid)
1184        // Sent in one chunk.
1185        // The recovery code feeds remainder "data: after\n\n" to pending_events
1186        // before the error is stored, so events drain first:
1187        // Expect: Ok(ok), Ok(after), Err(invalid)
1188
1189        let mut bytes = b"data: ok\n\n".to_vec();
1190        bytes.push(0xFF);
1191        bytes.extend_from_slice(b"data: after\n\n");
1192
1193        let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1194
1195        futures::executor::block_on(async {
1196            // 1. "ok" — from valid prefix before 0xFF
1197            let first = stream.next().await.expect("1").expect("ok");
1198            assert_eq!(first.data, "ok");
1199
1200            // 2. "after" — recovered from remainder after 0xFF (pending events drain first)
1201            let second = stream.next().await.expect("2").expect("after");
1202            assert_eq!(second.data, "after");
1203
1204            // 3. Error — surfaces after all pending events are delivered
1205            let err = stream.next().await.expect("3").expect_err("error");
1206            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1207        });
1208    }
1209
1210    #[test]
1211    fn test_bom_stripping_with_preceding_empty_chunk() {
1212        let mut parser = SseParser::new();
1213        // Feed empty chunk first - should not mark BOM as checked
1214        let events = parser.feed("");
1215        assert!(events.is_empty());
1216
1217        // Feed content with BOM
1218        let events = parser.feed("\u{FEFF}data: hello\n\n");
1219        assert_eq!(events.len(), 1);
1220        assert_eq!(events[0].data, "hello");
1221        // Ensure the BOM didn't end up in the field name (causing it to be ignored)
1222        assert_eq!(events[0].event, "message");
1223    }
1224
1225    /// Regression test for fuzz crash artifact
1226    /// (crash-28de6b0685a7989f4c24ea8feea29aaa57d50053).
1227    ///
1228    /// Verifies the chunking invariant: feeding data whole, char-by-char, and
1229    /// split at midpoint must produce identical events and flush results.
1230    #[test]
1231    fn test_fuzz_regression_crash_28de6b() {
1232        let data: &[u8] = &[
1233            0x64, 0x3d, 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x31, 0x0a, 0x5a, 0x61,
1234            0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x32, 0x0a, 0x64, 0x61, 0x74, 0x61,
1235            0x3a, 0x20, 0x6c, 0x9f, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1236            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x28, 0xcd, 0xcd, 0xa1,
1237            0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1,
1238            0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1239            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82,
1240            0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x40, 0x82, 0xcd,
1241            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x91,
1242            0x9a, 0x93, 0x69, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1243            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1244            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
1245            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1246            0x00, 0x00, 0x00,
1247        ];
1248        let input = String::from_utf8_lossy(data);
1249
1250        // Whole feed
1251        let mut parser_whole = SseParser::new();
1252        let events_whole = parser_whole.feed(&input);
1253        let flush_whole = parser_whole.flush();
1254
1255        // Char-by-char
1256        let mut parser_char = SseParser::new();
1257        let mut events_char = Vec::new();
1258        for ch in input.chars() {
1259            let mut buf = [0u8; 4];
1260            events_char.extend(parser_char.feed(ch.encode_utf8(&mut buf)));
1261        }
1262        let flush_char = parser_char.flush();
1263
1264        // Split at midpoint
1265        let mid = input.len() / 2;
1266        let mut split_at = mid;
1267        while !input.is_char_boundary(split_at) && split_at < input.len() {
1268            split_at += 1;
1269        }
1270        let (part1, part2) = input.split_at(split_at);
1271        let mut parser_split = SseParser::new();
1272        let mut events_split: Vec<_> = parser_split.feed(part1);
1273        events_split.extend(parser_split.feed(part2));
1274        let flush_split = parser_split.flush();
1275
1276        assert_eq!(events_whole, events_split, "whole vs split events");
1277        assert_eq!(flush_whole, flush_split, "whole vs split flush");
1278        assert_eq!(events_whole, events_char, "whole vs char events");
1279        assert_eq!(flush_whole, flush_char, "whole vs char flush");
1280    }
1281
1282    proptest! {
1283        #![proptest_config(ProptestConfig {
1284            cases: 256,
1285            max_shrink_iters: 200,
1286            .. ProptestConfig::default()
1287        })]
1288
1289        #[test]
1290        fn sse_chunking_invariant(
1291            events in prop::collection::vec(event_strategy(), 1..10),
1292            chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1293            terminal_delimiter in any::<bool>(),
1294        ) {
1295            let input = render_stream(&events, terminal_delimiter);
1296            let expected = parse_all(&input);
1297            let actual = parse_chunked(&input, &chunk_sizes);
1298            prop_assert_eq!(actual, expected);
1299        }
1300
1301        #[test]
1302        fn sse_line_ending_chunking_invariant(
1303            events in prop::collection::vec(event_strategy(), 1..10),
1304            chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1305            terminal_delimiter in any::<bool>(),
1306            line_ending in line_ending_strategy(),
1307        ) {
1308            let input = render_stream_with_line_endings(&events, terminal_delimiter, line_ending);
1309            let expected = parse_all(&input);
1310            let actual = parse_chunked(&input, &chunk_sizes);
1311            prop_assert_eq!(actual, expected);
1312        }
1313
1314        #[test]
1315        fn sse_id_with_null_bytes_is_rejected(
1316            id in id_field_strategy(),
1317            data in ascii_line(),
1318        ) {
1319            let input = format!("id: {id}\ndata: {data}\n\n");
1320            let events = parse_all(&input);
1321
1322            prop_assert_eq!(events.len(), 1);
1323            let expected_id = if id.contains('\0') { None } else { Some(id) };
1324            prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1325        }
1326
1327        #[test]
1328        fn sse_retry_field_fuzz_last_assignment_wins(
1329            retry_values in prop::collection::vec(retry_field_strategy(), 1..6),
1330            data in ascii_line(),
1331        ) {
1332            let mut input = String::new();
1333            for value in &retry_values {
1334                let _ = writeln!(&mut input, "retry: {value}");
1335            }
1336            let _ = writeln!(&mut input, "data: {data}");
1337            input.push('\n');
1338
1339            let events = parse_all(&input);
1340            prop_assert_eq!(events.len(), 1);
1341
1342            let expected_retry = retry_values
1343                .last()
1344                .and_then(|value| value.parse::<u64>().ok());
1345            prop_assert_eq!(events[0].retry, expected_retry);
1346        }
1347
1348        #[test]
1349        fn sse_duplicate_fields_apply_spec_semantics(
1350            event_names in prop::collection::vec("[a-z_]{1,12}", 1..6),
1351            ids in prop::collection::vec(id_field_strategy(), 0..6),
1352            data_lines in prop::collection::vec(ascii_line(), 1..6),
1353        ) {
1354            let mut input = String::new();
1355
1356            for event_name in &event_names {
1357                let _ = writeln!(&mut input, "event: {event_name}");
1358            }
1359            for id in &ids {
1360                let _ = writeln!(&mut input, "id: {id}");
1361            }
1362            for line in &data_lines {
1363                let _ = writeln!(&mut input, "data: {line}");
1364            }
1365            input.push('\n');
1366
1367            let events = parse_all(&input);
1368            prop_assert_eq!(events.len(), 1);
1369            prop_assert_eq!(events[0].event.as_ref(), event_names.last().expect("non-empty"));
1370
1371            let expected_id = ids.iter().rfind(|id| !id.contains('\0')).cloned();
1372            prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1373
1374            let expected_data = data_lines.join("\n");
1375            prop_assert_eq!(events[0].data.as_str(), expected_data);
1376        }
1377
1378        #[test]
1379        fn sse_oversized_data_fields_round_trip(len in oversized_data_len_strategy(),) {
1380            let payload = "x".repeat(len);
1381            let input = format!("data: {payload}\n\n");
1382            let events = parse_all(&input);
1383
1384            prop_assert_eq!(events.len(), 1);
1385            prop_assert_eq!(events[0].data.len(), len);
1386            prop_assert_eq!(events[0].data.as_str(), payload);
1387        }
1388
1389        #[test]
1390        fn sse_stream_utf8_valid_input_chunking_invariant(
1391            lines in prop::collection::vec(unicode_line(), 1..5),
1392            chunk_sizes in prop::collection::vec(1usize..16, 0..24),
1393        ) {
1394            let mut input = String::new();
1395            for line in &lines {
1396                let _ = writeln!(&mut input, "data: {line}");
1397            }
1398            input.push('\n');
1399
1400            let (single_events, single_errors) = parse_stream_single_chunk(input.as_bytes());
1401            let (chunked_events, chunked_errors) =
1402                parse_stream_chunked(input.as_bytes(), &chunk_sizes);
1403
1404            prop_assert!(single_errors.is_empty(), "single-chunk had UTF-8 errors");
1405            prop_assert!(chunked_errors.is_empty(), "chunked parse had UTF-8 errors");
1406            prop_assert_eq!(chunked_events, single_events);
1407        }
1408
1409        #[test]
1410        fn sse_stream_bom_start_stripped_embedded_preserved(
1411            left in unicode_line(),
1412            right in unicode_line(),
1413            chunk_sizes in prop::collection::vec(1usize..8, 0..24),
1414        ) {
1415            let start_bom = format!("\u{FEFF}data: {left}{right}\n\n");
1416            let (start_events, start_errors) =
1417                parse_stream_chunked(start_bom.as_bytes(), &chunk_sizes);
1418            prop_assert!(start_errors.is_empty(), "start BOM should be valid UTF-8");
1419            prop_assert_eq!(start_events.len(), 1);
1420            let expected_start = format!("{left}{right}");
1421            prop_assert_eq!(start_events[0].data.as_str(), expected_start);
1422
1423            let embedded_bom = format!("data: {left}\u{FEFF}{right}\n\n");
1424            let (embedded_events, embedded_errors) =
1425                parse_stream_chunked(embedded_bom.as_bytes(), &chunk_sizes);
1426            prop_assert!(embedded_errors.is_empty(), "embedded BOM should be preserved");
1427            prop_assert_eq!(embedded_events.len(), 1);
1428            let expected_embedded = format!("{left}\u{FEFF}{right}");
1429            prop_assert_eq!(embedded_events[0].data.as_str(), expected_embedded);
1430        }
1431
1432        #[test]
1433        fn sse_stream_invalid_utf8_yields_invalid_data_errors(
1434            prefix in ascii_line(),
1435            suffix in ascii_line(),
1436            invalid_len in 1usize..4,
1437            chunk_sizes in prop::collection::vec(1usize..8, 0..20),
1438        ) {
1439            let mut bytes = format!("data: {prefix}\n\n").into_bytes();
1440            bytes.extend(std::iter::repeat_n(0xFFu8, invalid_len));
1441            bytes.extend(format!("data: {suffix}\n\n").as_bytes());
1442
1443            let (events, errors) = parse_stream_chunked_limited(&bytes, &chunk_sizes, 32);
1444            prop_assert!(
1445                events.iter().any(|event| event.data == prefix),
1446                "event before invalid sequence should still be surfaced"
1447            );
1448            prop_assert!(!errors.is_empty(), "invalid UTF-8 should emit at least one error");
1449            prop_assert!(
1450                errors.iter().all(|kind| *kind == ErrorKind::InvalidData),
1451                "all stream decoding errors must be InvalidData"
1452            );
1453        }
1454    }
1455}