Skip to main content

pi/
sse.rs

1//! Server-Sent Events (SSE) parser for asupersync HTTP client.
2//!
3//! Implements the SSE protocol (text/event-stream) on top of asupersync's
4//! HTTP client for streaming LLM responses.
5
6use std::borrow::Cow;
7use std::collections::VecDeque;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11const MAX_EVENT_DATA_BYTES: usize = 100 * 1024 * 1024;
12
13/// A parsed SSE event.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct SseEvent {
16    /// Event type (from "event:" field, defaults to "message").
17    pub event: Cow<'static, str>,
18    /// Event data (from "data:" field(s), joined with newlines).
19    pub data: String,
20    /// Last event ID (from "id:" field).
21    pub id: Option<String>,
22    /// Retry interval hint in milliseconds (from "retry:" field).
23    pub retry: Option<u64>,
24}
25
26impl Default for SseEvent {
27    fn default() -> Self {
28        Self {
29            event: Cow::Borrowed("message"),
30            data: String::new(),
31            id: None,
32            retry: None,
33        }
34    }
35}
36
37/// Parser state for SSE stream.
38#[derive(Debug)]
39pub struct SseParser {
40    buffer: String,
41    current: SseEvent,
42    has_data: bool,
43    /// Whether we've already stripped the BOM from the first feed.
44    bom_checked: bool,
45    /// Number of bytes in `buffer` that have already been scanned for newlines.
46    scanned_len: usize,
47    /// Per-event data accumulation cap in bytes.
48    max_event_data_bytes: usize,
49}
50
51impl Default for SseParser {
52    fn default() -> Self {
53        Self {
54            buffer: String::new(),
55            current: SseEvent::default(),
56            has_data: false,
57            bom_checked: false,
58            scanned_len: 0,
59            max_event_data_bytes: MAX_EVENT_DATA_BYTES,
60        }
61    }
62}
63
64impl SseParser {
65    /// Create a new SSE parser.
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Create a parser with a custom per-event data cap (for testing).
71    #[cfg(test)]
72    fn with_max_event_data_bytes(limit: usize) -> Self {
73        Self {
74            max_event_data_bytes: limit,
75            ..Self::default()
76        }
77    }
78
79    /// Intern common SSE event type names to avoid per-event String allocation.
80    /// LLM streaming APIs use a fixed set of event types; matching them to
81    /// `Cow::Borrowed` static strings eliminates one allocation per event.
82    #[inline]
83    fn intern_event_type(value: &str) -> Cow<'static, str> {
84        match value {
85            // Anthropic streaming events
86            "message" => Cow::Borrowed("message"),
87            "message_start" => Cow::Borrowed("message_start"),
88            "message_stop" => Cow::Borrowed("message_stop"),
89            "message_delta" => Cow::Borrowed("message_delta"),
90            "content_block_start" => Cow::Borrowed("content_block_start"),
91            "content_block_delta" => Cow::Borrowed("content_block_delta"),
92            "content_block_stop" => Cow::Borrowed("content_block_stop"),
93            // OpenAI Responses API streaming events
94            "response.completed" => Cow::Borrowed("response.completed"),
95            "response.done" => Cow::Borrowed("response.done"),
96            "response.failed" => Cow::Borrowed("response.failed"),
97            "response.incomplete" => Cow::Borrowed("response.incomplete"),
98            "response.output_text.delta" => Cow::Borrowed("response.output_text.delta"),
99            "response.output_text.done" => Cow::Borrowed("response.output_text.done"),
100            "response.output_item.added" => Cow::Borrowed("response.output_item.added"),
101            "response.output_item.done" => Cow::Borrowed("response.output_item.done"),
102            "response.content_part.done" => Cow::Borrowed("response.content_part.done"),
103            "response.function_call_arguments.delta" => {
104                Cow::Borrowed("response.function_call_arguments.delta")
105            }
106            "response.reasoning_text.delta" => Cow::Borrowed("response.reasoning_text.delta"),
107            "response.reasoning_text.done" => Cow::Borrowed("response.reasoning_text.done"),
108            "response.reasoning_summary_text.delta" => {
109                Cow::Borrowed("response.reasoning_summary_text.delta")
110            }
111            "response.reasoning_summary_text.done" => {
112                Cow::Borrowed("response.reasoning_summary_text.done")
113            }
114            "response.reasoning_summary_part.done" => {
115                Cow::Borrowed("response.reasoning_summary_part.done")
116            }
117            "response.created" => Cow::Borrowed("response.created"),
118            // Common shared events
119            "ping" => Cow::Borrowed("ping"),
120            "error" => Cow::Borrowed("error"),
121            _ => Cow::Owned(value.to_string()),
122        }
123    }
124
125    #[inline]
126    fn append_data_line(
127        current: &mut SseEvent,
128        value: &str,
129        has_data: &mut bool,
130        max_event_data_bytes: usize,
131    ) {
132        let projected_len = current
133            .data
134            .len()
135            .saturating_add(value.len())
136            .saturating_add(1);
137        if projected_len > max_event_data_bytes {
138            // Preserve the event boundary even when we drop this data line.
139            // This avoids silently skipping oversized events, which can cause
140            // downstream state machines to hang waiting for a completion event.
141            *has_data = true;
142            return;
143        }
144        current.data.push_str(value);
145        current.data.push('\n');
146        *has_data = true;
147    }
148
149    /// Process a single line of SSE data.
150    fn process_line(
151        line: &str,
152        current: &mut SseEvent,
153        has_data: &mut bool,
154        max_event_data_bytes: usize,
155    ) {
156        if let Some(rest) = line.strip_prefix(':') {
157            // Comment line - ignore (but could be used for keep-alive)
158            let _ = rest;
159        } else if let Some((field, value)) = line.split_once(':') {
160            // Field: value
161            let value = value.strip_prefix(' ').unwrap_or(value);
162            match field {
163                "event" => current.event = Self::intern_event_type(value),
164                "data" => Self::append_data_line(current, value, has_data, max_event_data_bytes),
165                "id" if !value.contains('\0') => {
166                    current.id = Some(value.to_string());
167                }
168                "retry" => current.retry = value.parse().ok(),
169                _ => {} // Unknown field - ignore
170            }
171        } else {
172            // Field with no value
173            match line {
174                "event" => current.event = Cow::Borrowed(""),
175                "data" => Self::append_data_line(current, "", has_data, max_event_data_bytes),
176                "id" => current.id = Some(String::new()),
177                _ => {}
178            }
179        }
180    }
181
182    #[inline]
183    fn reset_current_for_next_event(current: &mut SseEvent) {
184        current.event = Cow::Borrowed("message");
185        current.data.clear();
186    }
187
188    #[inline]
189    fn carry_forward_event_state(current: &SseEvent) -> SseEvent {
190        SseEvent {
191            id: current.id.clone(),
192            retry: current.retry,
193            ..Default::default()
194        }
195    }
196
197    #[inline]
198    fn reset_after_buffer_limit<F>(&mut self, emit: &mut F)
199    where
200        F: FnMut(SseEvent),
201    {
202        self.buffer = String::new();
203        self.current = SseEvent::default();
204        self.has_data = false;
205        self.bom_checked = false;
206        self.scanned_len = 0;
207        emit(SseEvent {
208            event: Cow::Borrowed("error"),
209            data: "SSE buffer limit exceeded".to_string(),
210            ..Default::default()
211        });
212    }
213
214    /// Process complete lines from `source`, dispatching events via `emit`.
215    /// Returns the byte offset of the first unconsumed byte.
216    #[inline]
217    fn process_source<F>(
218        source: &str,
219        scan_start: usize,
220        bom_checked: &mut bool,
221        current: &mut SseEvent,
222        has_data: &mut bool,
223        max_event_data_bytes: usize,
224        emit: &mut F,
225    ) -> usize
226    where
227        F: FnMut(SseEvent),
228    {
229        let bytes = source.as_bytes();
230        let mut start = 0usize;
231        let mut search_pos = scan_start;
232
233        // Strip UTF-8 BOM from the beginning of the stream (SSE spec compliance).
234        if !*bom_checked && !source.is_empty() {
235            *bom_checked = true;
236            if source.starts_with('\u{FEFF}') {
237                start = 3;
238                if search_pos < 3 {
239                    search_pos = 3;
240                }
241            }
242        }
243
244        // Use memchr2 to find either \r or \n
245        while let Some(rel_pos) = memchr::memchr2(b'\r', b'\n', &bytes[search_pos..]) {
246            let pos = search_pos + rel_pos;
247            let b = bytes[pos];
248
249            let line_end;
250            let next_start;
251
252            if b == b'\n' {
253                // Bare LF
254                line_end = pos;
255                next_start = pos + 1;
256            } else {
257                // Found \r
258                if pos + 1 < source.len() {
259                    line_end = pos;
260                    next_start = if bytes[pos + 1] == b'\n' {
261                        // CRLF
262                        pos + 2
263                    } else {
264                        // Bare CR
265                        pos + 1
266                    };
267                } else {
268                    // CR at end of buffer - wait for more data to check for \n
269                    break;
270                }
271            }
272
273            let line = &source[start..line_end];
274            start = next_start;
275            search_pos = next_start;
276
277            if line.is_empty() {
278                // Blank line = event boundary
279                if *has_data {
280                    // Trim trailing newline from data
281                    if current.data.ends_with('\n') {
282                        current.data.pop();
283                    }
284                    // Per SSE spec, an empty event name dispatches as "message".
285                    if current.event.is_empty() {
286                        current.event = Cow::Borrowed("message");
287                    }
288                    let next_event = Self::carry_forward_event_state(current);
289                    emit(std::mem::take(current));
290                    *current = next_event;
291                    *has_data = false;
292                } else {
293                    Self::reset_current_for_next_event(current);
294                }
295            } else {
296                Self::process_line(line, current, has_data, max_event_data_bytes);
297            }
298        }
299
300        start
301    }
302
303    /// Feed data to the parser and emit any complete events to `emit`.
304    fn feed_into<F>(&mut self, data: &str, mut emit: F)
305    where
306        F: FnMut(SseEvent),
307    {
308        const MAX_BUFFER_SIZE: usize = 10 * 1024 * 1024;
309        if self.buffer.is_empty() {
310            // Fast path: process data directly without copying to buffer.
311            let consumed = Self::process_source(
312                data,
313                0,
314                &mut self.bom_checked,
315                &mut self.current,
316                &mut self.has_data,
317                self.max_event_data_bytes,
318                &mut emit,
319            );
320            if consumed < data.len() {
321                self.buffer.push_str(&data[consumed..]);
322                if self.buffer.len() > MAX_BUFFER_SIZE {
323                    self.reset_after_buffer_limit(&mut emit);
324                    return;
325                }
326            }
327        } else {
328            // Slow path: parse against a temporary combined source so we only
329            // retain the truly unconsumed tail instead of a giant drained buffer.
330            let mut combined = std::mem::take(&mut self.buffer);
331            combined.push_str(data);
332            // Re-scan from the last safe point (minus 1 to handle split CRLF).
333            let scan_start = self.scanned_len.saturating_sub(1);
334            let consumed = Self::process_source(
335                &combined,
336                scan_start,
337                &mut self.bom_checked,
338                &mut self.current,
339                &mut self.has_data,
340                self.max_event_data_bytes,
341                &mut emit,
342            );
343            if consumed < combined.len() {
344                // Build buffer fresh from truly unprocessed tail only (no duplication).
345                self.buffer = combined[consumed..].to_string();
346            }
347            if self.buffer.len() > MAX_BUFFER_SIZE {
348                self.reset_after_buffer_limit(&mut emit);
349                return;
350            }
351        }
352        // Whether we drained or not, the entire remaining buffer has been scanned.
353        self.scanned_len = self.buffer.len();
354    }
355
356    /// Feed data to the parser and extract any complete events.
357    ///
358    /// Returns a vector of parsed events. Events are delimited by blank lines.
359    pub fn feed(&mut self, data: &str) -> Vec<SseEvent> {
360        let mut events = Vec::with_capacity(4);
361        self.feed_into(data, |event| events.push(event));
362        events
363    }
364
365    /// Check if the parser has any pending data.
366    pub fn has_pending(&self) -> bool {
367        !self.buffer.is_empty() || self.has_data
368    }
369
370    /// Flush any pending event (called when stream ends).
371    pub fn flush(&mut self) -> Option<SseEvent> {
372        // First, process any remaining buffer content that doesn't end with newline
373        if !self.buffer.is_empty() {
374            let line = std::mem::take(&mut self.buffer);
375            let line = line.trim_end_matches('\r');
376            Self::process_line(
377                line,
378                &mut self.current,
379                &mut self.has_data,
380                self.max_event_data_bytes,
381            );
382        }
383
384        if self.has_data {
385            if self.current.data.ends_with('\n') {
386                self.current.data.pop();
387            }
388            if self.current.event.is_empty() {
389                self.current.event = Cow::Borrowed("message");
390            }
391            let event = std::mem::take(&mut self.current);
392            self.current = SseEvent::default();
393            self.has_data = false;
394            Some(event)
395        } else {
396            None
397        }
398    }
399}
400
401/// Stream wrapper for SSE events.
402///
403/// Converts a byte stream into an SSE event stream.
404pub struct SseStream<S> {
405    inner: S,
406    parser: SseParser,
407    pending_events: VecDeque<SseEvent>,
408    pending_error: Option<std::io::Error>,
409    pending_error_is_terminal: bool,
410    terminated: bool,
411    utf8_buffer: Vec<u8>,
412}
413
414impl<S> SseStream<S> {
415    /// Create a new SSE stream from a byte stream.
416    pub fn new(inner: S) -> Self {
417        Self {
418            inner,
419            parser: SseParser::new(),
420            pending_events: VecDeque::new(),
421            pending_error: None,
422            pending_error_is_terminal: false,
423            terminated: false,
424            utf8_buffer: Vec::new(),
425        }
426    }
427}
428
429impl<S> SseStream<S>
430where
431    S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
432{
433    #[inline]
434    fn invalid_utf8_error() -> std::io::Error {
435        std::io::Error::new(
436            std::io::ErrorKind::InvalidData,
437            "Invalid UTF-8 in SSE stream",
438        )
439    }
440
441    fn feed_parsed_chunk(parser: &mut SseParser, pending: &mut VecDeque<SseEvent>, s: &str) {
442        parser.feed_into(s, |event| pending.push_back(event));
443    }
444
445    fn feed_to_pending(&mut self, s: &str) {
446        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
447    }
448
449    fn process_chunk_without_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
450        let mut processed = 0;
451        let mut first_error: Option<std::io::Error> = None;
452        loop {
453            match std::str::from_utf8(&bytes[processed..]) {
454                Ok(s) => {
455                    if !s.is_empty() {
456                        self.feed_to_pending(s);
457                    }
458                    return first_error.map_or(Ok(()), Err);
459                }
460                Err(err) => {
461                    let valid_len = err.valid_up_to();
462                    if valid_len > 0 {
463                        let s = std::str::from_utf8(&bytes[processed..processed + valid_len])
464                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
465                        self.feed_to_pending(s);
466                        processed += valid_len;
467                    }
468
469                    if let Some(invalid_len) = err.error_len() {
470                        processed += invalid_len;
471                        if first_error.is_none() {
472                            first_error = Some(Self::invalid_utf8_error());
473                        }
474                    } else {
475                        self.utf8_buffer.extend_from_slice(&bytes[processed..]);
476                        return first_error.map_or(Ok(()), Err);
477                    }
478                }
479            }
480        }
481    }
482
483    fn process_chunk_with_utf8_tail(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
484        self.utf8_buffer.extend_from_slice(bytes);
485        let mut processed = 0;
486        let mut first_error: Option<std::io::Error> = None;
487        loop {
488            match std::str::from_utf8(&self.utf8_buffer[processed..]) {
489                Ok(s) => {
490                    if !s.is_empty() {
491                        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
492                    }
493                    self.utf8_buffer.clear();
494                    return first_error.map_or(Ok(()), Err);
495                }
496                Err(err) => {
497                    let valid_len = err.valid_up_to();
498                    if valid_len > 0 {
499                        let s = std::str::from_utf8(
500                            &self.utf8_buffer[processed..processed + valid_len],
501                        )
502                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
503                        Self::feed_parsed_chunk(&mut self.parser, &mut self.pending_events, s);
504                        processed += valid_len;
505                    }
506
507                    if let Some(invalid_len) = err.error_len() {
508                        processed += invalid_len;
509                        if first_error.is_none() {
510                            first_error = Some(Self::invalid_utf8_error());
511                        }
512                    } else {
513                        // Move remaining bytes to start of utf8_buffer
514                        let remaining = self.utf8_buffer.len() - processed;
515                        self.utf8_buffer.copy_within(processed.., 0);
516                        self.utf8_buffer.truncate(remaining);
517                        return first_error.map_or(Ok(()), Err);
518                    }
519                }
520            }
521        }
522    }
523
524    fn process_chunk(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
525        if self.utf8_buffer.is_empty() {
526            self.process_chunk_without_utf8_tail(bytes)
527        } else {
528            self.process_chunk_with_utf8_tail(bytes)
529        }
530    }
531
532    fn poll_stream_end(&mut self) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
533        if !self.utf8_buffer.is_empty() {
534            // EOF with an incomplete UTF-8 tail is a terminal stream error.
535            // Clear parser state so repeated polls don't emit the same error forever.
536            self.utf8_buffer.clear();
537            self.pending_events.clear();
538            self.pending_error = None;
539            self.parser = SseParser::new();
540            return Poll::Ready(Some(Err(std::io::Error::new(
541                std::io::ErrorKind::InvalidData,
542                "Stream ended with incomplete UTF-8 sequence",
543            ))));
544        }
545
546        if let Some(event) = self.parser.flush() {
547            return Poll::Ready(Some(Ok(event)));
548        }
549        Poll::Ready(None)
550    }
551
552    /// Poll for the next SSE event.
553    pub fn poll_next_event(
554        mut self: Pin<&mut Self>,
555        cx: &mut Context<'_>,
556    ) -> Poll<Option<Result<SseEvent, std::io::Error>>> {
557        if let Some(event) = self.pending_events.pop_front() {
558            return Poll::Ready(Some(Ok(event)));
559        }
560        if let Some(err) = self.pending_error.take() {
561            if self.pending_error_is_terminal {
562                self.pending_error_is_terminal = false;
563                self.pending_events.clear();
564                self.utf8_buffer.clear();
565                self.parser = SseParser::new();
566                self.terminated = true;
567            }
568            return Poll::Ready(Some(Err(err)));
569        }
570        if self.terminated {
571            return Poll::Ready(None);
572        }
573
574        loop {
575            match Pin::new(&mut self.inner).poll_next(cx) {
576                Poll::Ready(Some(Ok(bytes))) => {
577                    if let Err(err) = self.process_chunk(&bytes) {
578                        if let Some(event) = self.pending_events.pop_front() {
579                            self.pending_error = Some(err);
580                            self.pending_error_is_terminal = true;
581                            return Poll::Ready(Some(Ok(event)));
582                        }
583                        self.pending_events.clear();
584                        self.utf8_buffer.clear();
585                        self.parser = SseParser::new();
586                        self.terminated = true;
587                        return Poll::Ready(Some(Err(err)));
588                    }
589
590                    if let Some(event) = self.pending_events.pop_front() {
591                        return Poll::Ready(Some(Ok(event)));
592                    }
593                }
594                Poll::Ready(Some(Err(e))) => {
595                    return Poll::Ready(Some(Err(e)));
596                }
597                Poll::Ready(None) => {
598                    return self.poll_stream_end();
599                }
600                Poll::Pending => {
601                    return Poll::Pending;
602                }
603            }
604        }
605    }
606}
607
608impl<S> futures::Stream for SseStream<S>
609where
610    S: futures::Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin,
611{
612    type Item = Result<SseEvent, std::io::Error>;
613
614    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
615        self.poll_next_event(cx)
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use futures::StreamExt;
623    use futures::stream;
624    use proptest::prelude::*;
625    use serde_json::json;
626    use std::fmt::Write as _;
627    use std::io::ErrorKind;
628
629    #[derive(Debug, Clone)]
630    struct GeneratedEvent {
631        event: Option<String>,
632        id: Option<String>,
633        retry: Option<u32>,
634        data: Vec<String>,
635        comment: Option<String>,
636    }
637
638    #[derive(Debug, Clone, Copy)]
639    enum LineEnding {
640        Lf,
641        Cr,
642        CrLf,
643    }
644
645    impl LineEnding {
646        fn as_str(self) -> &'static str {
647            match self {
648                Self::Lf => "\n",
649                Self::Cr => "\r",
650                Self::CrLf => "\r\n",
651            }
652        }
653    }
654
655    impl GeneratedEvent {
656        fn render(&self) -> String {
657            let mut out = String::new();
658            if let Some(comment) = &self.comment {
659                out.push(':');
660                out.push_str(comment);
661                out.push('\n');
662            }
663            if let Some(event) = &self.event {
664                out.push_str("event: ");
665                out.push_str(event);
666                out.push('\n');
667            }
668            if let Some(id) = &self.id {
669                out.push_str("id: ");
670                out.push_str(id);
671                out.push('\n');
672            }
673            if let Some(retry) = &self.retry {
674                out.push_str("retry: ");
675                out.push_str(&retry.to_string());
676                out.push('\n');
677            }
678            for line in &self.data {
679                out.push_str("data: ");
680                out.push_str(line);
681                out.push('\n');
682            }
683            out.push('\n');
684            out
685        }
686    }
687
688    fn ascii_line() -> impl Strategy<Value = String> {
689        // ASCII printable range (no CR/LF), keeps chunking safe with byte splits.
690        "[ -~]{0,24}".prop_map(|s| s)
691    }
692
693    fn event_strategy() -> impl Strategy<Value = GeneratedEvent> {
694        (
695            prop::option::of("[a-z_]{1,12}"),
696            prop::option::of("[0-9]{1,8}"),
697            prop::option::of(0u32..5000),
698            prop::collection::vec(ascii_line(), 1..4),
699            prop::option::of(ascii_line()),
700        )
701            .prop_map(|(event, id, retry, data, comment)| GeneratedEvent {
702                event,
703                id,
704                retry,
705                data,
706                comment,
707            })
708    }
709
710    fn line_ending_strategy() -> impl Strategy<Value = LineEnding> {
711        prop_oneof![
712            Just(LineEnding::Lf),
713            Just(LineEnding::Cr),
714            Just(LineEnding::CrLf),
715        ]
716    }
717
718    fn unicode_line() -> impl Strategy<Value = String> {
719        prop::collection::vec(
720            any::<char>().prop_filter("no CR/LF", |c| *c != '\r' && *c != '\n'),
721            0..24,
722        )
723        .prop_map(|chars| chars.into_iter().collect())
724    }
725
726    fn id_field_strategy() -> impl Strategy<Value = String> {
727        prop_oneof![
728            4 => "[ -~]{0,24}".prop_map(|s| s),
729            1 => ("[ -~]{0,12}", "[ -~]{0,12}").prop_map(|(head, tail)| format!("{head}\0{tail}")),
730        ]
731    }
732
733    fn retry_field_strategy() -> impl Strategy<Value = String> {
734        prop_oneof![
735            6 => (0u64..=50_000u64).prop_map(|n| n.to_string()),
736            2 => (u64::MAX - 10..=u64::MAX).prop_map(|n| n.to_string()),
737            2 => "[a-zA-Z]{1,16}".prop_map(|s| s),
738            1 => "-[0-9]{1,24}".prop_map(|s| s),
739            1 => ((u128::from(u64::MAX) + 1)..=(u128::from(u64::MAX) + 50_000))
740                .prop_map(|n| n.to_string()),
741            1 => Just(String::new()),
742        ]
743    }
744
745    fn oversized_data_len_strategy() -> impl Strategy<Value = usize> {
746        // Keep average runtime reasonable while still exercising multi-megabyte inputs.
747        prop_oneof![
748            10 => 1024usize..=65_536usize,
749            5 => 65_537usize..=262_144usize,
750            2 => 262_145usize..=1_048_576usize,
751            1 => 1_048_577usize..=3_145_728usize,
752        ]
753    }
754
755    fn render_stream(events: &[GeneratedEvent], terminal_delimiter: bool) -> String {
756        let mut out = String::new();
757        for event in events {
758            out.push_str(&event.render());
759        }
760        if !terminal_delimiter && out.ends_with('\n') {
761            out.pop();
762        }
763        out
764    }
765
766    fn render_stream_with_line_endings(
767        events: &[GeneratedEvent],
768        terminal_delimiter: bool,
769        line_ending: LineEnding,
770    ) -> String {
771        let canonical = render_stream(events, terminal_delimiter);
772        if matches!(line_ending, LineEnding::Lf) {
773            canonical
774        } else {
775            canonical.replace('\n', line_ending.as_str())
776        }
777    }
778
779    fn parse_all(input: &str) -> Vec<SseEvent> {
780        let mut parser = SseParser::new();
781        let mut events = parser.feed(input);
782        if let Some(event) = parser.flush() {
783            events.push(event);
784        }
785        events
786    }
787
788    fn parse_chunked(input: &str, chunk_sizes: &[usize]) -> Vec<SseEvent> {
789        let mut parser = SseParser::new();
790        let mut events = Vec::new();
791        let bytes = input.as_bytes();
792        let mut start = 0usize;
793
794        for &size in chunk_sizes {
795            if start >= bytes.len() {
796                break;
797            }
798            let end = (start + size).min(bytes.len());
799            let chunk = std::str::from_utf8(&bytes[start..end]).expect("ascii chunks");
800            events.extend(parser.feed(chunk));
801            start = end;
802        }
803
804        if start < bytes.len() {
805            let chunk = std::str::from_utf8(&bytes[start..]).expect("ascii remainder");
806            events.extend(parser.feed(chunk));
807        }
808
809        if let Some(event) = parser.flush() {
810            events.push(event);
811        }
812
813        events
814    }
815
816    fn split_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
817        let mut chunks = Vec::new();
818        let mut start = 0usize;
819
820        for &size in chunk_sizes {
821            if start >= input.len() {
822                break;
823            }
824            let end = (start + size).min(input.len());
825            chunks.push(input[start..end].to_vec());
826            start = end;
827        }
828
829        if start < input.len() {
830            chunks.push(input[start..].to_vec());
831        }
832
833        chunks
834    }
835
836    fn parse_stream_chunks(chunks: Vec<Vec<u8>>) -> (Vec<SseEvent>, Vec<ErrorKind>) {
837        let mut stream = SseStream::new(stream::iter(
838            chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
839        ));
840        let mut events = Vec::new();
841        let mut errors = Vec::new();
842
843        futures::executor::block_on(async {
844            while let Some(item) = stream.next().await {
845                match item {
846                    Ok(event) => events.push(event),
847                    Err(err) => errors.push(err.kind()),
848                }
849            }
850        });
851
852        (events, errors)
853    }
854
855    fn parse_stream_chunks_limited(
856        chunks: Vec<Vec<u8>>,
857        max_items: usize,
858    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
859        let mut stream = SseStream::new(stream::iter(
860            chunks.into_iter().map(Ok::<Vec<u8>, std::io::Error>),
861        ));
862        let mut events = Vec::new();
863        let mut errors = Vec::new();
864
865        futures::executor::block_on(async {
866            for _ in 0..max_items {
867                let Some(item) = stream.next().await else {
868                    break;
869                };
870                match item {
871                    Ok(event) => events.push(event),
872                    Err(err) => errors.push(err.kind()),
873                }
874            }
875        });
876
877        (events, errors)
878    }
879
880    fn parse_stream_single_chunk(input: &[u8]) -> (Vec<SseEvent>, Vec<ErrorKind>) {
881        parse_stream_chunks(vec![input.to_vec()])
882    }
883
884    fn parse_stream_chunked(
885        input: &[u8],
886        chunk_sizes: &[usize],
887    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
888        let chunks = split_bytes(input, chunk_sizes);
889        parse_stream_chunks(chunks)
890    }
891
892    fn parse_stream_chunked_limited(
893        input: &[u8],
894        chunk_sizes: &[usize],
895        max_items: usize,
896    ) -> (Vec<SseEvent>, Vec<ErrorKind>) {
897        let chunks = split_bytes(input, chunk_sizes);
898        parse_stream_chunks_limited(chunks, max_items)
899    }
900
901    fn diag_json(
902        fixture_id: &str,
903        parser: &SseParser,
904        input: &str,
905        expected: &str,
906        actual: &str,
907    ) -> String {
908        json!({
909            "fixture_id": fixture_id,
910            "seed": "deterministic-static",
911            "env": {
912                "os": std::env::consts::OS,
913                "arch": std::env::consts::ARCH,
914                "cwd": std::env::current_dir().ok().map(|path| path.display().to_string()),
915            },
916            "input_preview": input,
917            "parser_state": {
918                "has_pending": parser.has_pending(),
919            },
920            "expected": expected,
921            "actual": actual,
922        })
923        .to_string()
924    }
925
926    #[test]
927    fn test_simple_event() {
928        let mut parser = SseParser::new();
929        let events = parser.feed("data: hello\n\n");
930        assert_eq!(events.len(), 1);
931        assert_eq!(events[0].event, "message");
932        assert_eq!(events[0].data, "hello");
933    }
934
935    #[test]
936    fn test_multiline_data() {
937        let mut parser = SseParser::new();
938        let events = parser.feed("data: line1\ndata: line2\n\n");
939        assert_eq!(events.len(), 1);
940        assert_eq!(events[0].data, "line1\nline2");
941    }
942
943    #[test]
944    fn test_named_event() {
945        let mut parser = SseParser::new();
946        let events = parser.feed("event: ping\ndata: {}\n\n");
947        assert_eq!(events.len(), 1);
948        assert_eq!(events[0].event, "ping");
949        assert_eq!(events[0].data, "{}");
950    }
951
952    #[test]
953    fn test_event_with_id() {
954        let mut parser = SseParser::new();
955        let events = parser.feed("id: 123\ndata: test\n\n");
956        assert_eq!(events.len(), 1);
957        assert_eq!(events[0].id, Some("123".to_string()));
958        assert_eq!(events[0].data, "test");
959    }
960
961    #[test]
962    fn test_last_event_id_persists_across_dispatched_events() {
963        let mut parser = SseParser::new();
964        let events = parser.feed("id: 123\ndata: first\n\ndata: second\n\n");
965        assert_eq!(events.len(), 2);
966        assert_eq!(events[0].id.as_deref(), Some("123"));
967        assert_eq!(events[1].id.as_deref(), Some("123"));
968        assert_eq!(events[1].data, "second");
969    }
970
971    #[test]
972    fn test_multiple_events() {
973        let mut parser = SseParser::new();
974        let events = parser.feed("data: first\n\ndata: second\n\n");
975        assert_eq!(events.len(), 2);
976        assert_eq!(events[0].data, "first");
977        assert_eq!(events[1].data, "second");
978    }
979
980    #[test]
981    fn test_incremental_feed() {
982        let mut parser = SseParser::new();
983
984        // Feed partial data
985        let events = parser.feed("data: hel");
986        assert!(events.is_empty());
987
988        // Feed more
989        let events = parser.feed("lo\n");
990        assert!(events.is_empty());
991
992        // Feed blank line to complete event
993        let events = parser.feed("\n");
994        assert_eq!(events.len(), 1);
995        assert_eq!(events[0].data, "hello");
996    }
997
998    #[test]
999    fn test_buffer_no_duplication_on_straddle() {
1000        // Test that buffer content is not duplicated when data straddles event boundaries.
1001        // This exercises the slow-path scenario that previously had a buffer duplication bug.
1002        let mut parser = SseParser::new();
1003
1004        // Feed partial event data that fills buffer but doesn't complete event
1005        let events = parser.feed("data: start");
1006        assert!(events.is_empty());
1007        assert!(parser.has_pending()); // Buffer should have partial data
1008
1009        // Feed more data that triggers slow path processing and leaves partial data
1010        let events = parser.feed("_middle_incomplete");
1011        assert!(events.is_empty());
1012        assert!(parser.has_pending()); // Should still have buffered data
1013
1014        // Complete the event - this should work without duplicated data
1015        let events = parser.feed("\n\n");
1016        assert_eq!(events.len(), 1);
1017        assert_eq!(events[0].data, "start_middle_incomplete");
1018
1019        // Verify buffer is clean for next event
1020        let events = parser.feed("data: second\n\n");
1021        assert_eq!(events.len(), 1);
1022        assert_eq!(events[0].data, "second");
1023    }
1024
1025    #[test]
1026    fn test_comment_ignored() {
1027        let mut parser = SseParser::new();
1028        let events = parser.feed(":this is a comment\ndata: actual\n\n");
1029        assert_eq!(events.len(), 1);
1030        assert_eq!(events[0].data, "actual");
1031    }
1032
1033    #[test]
1034    fn test_retry_field() {
1035        let mut parser = SseParser::new();
1036        let events = parser.feed("retry: 3000\ndata: test\n\n");
1037        assert_eq!(events.len(), 1);
1038        assert_eq!(events[0].retry, Some(3000));
1039    }
1040
1041    #[test]
1042    fn test_retry_hint_persists_across_dispatched_events() {
1043        let mut parser = SseParser::new();
1044        let events = parser.feed("retry: 3000\ndata: first\n\ndata: second\n\n");
1045        assert_eq!(events.len(), 2);
1046        assert_eq!(events[0].retry, Some(3000));
1047        assert_eq!(events[1].retry, Some(3000));
1048    }
1049
1050    #[test]
1051    fn test_append_data_line_enforces_projected_limit() {
1052        let mut current = SseEvent::default();
1053        let mut has_data = false;
1054
1055        SseParser::append_data_line(&mut current, "ab", &mut has_data, 3);
1056        assert_eq!(current.data, "ab\n");
1057        assert!(has_data);
1058
1059        SseParser::append_data_line(&mut current, "c", &mut has_data, 3);
1060        assert_eq!(current.data, "ab\n");
1061    }
1062
1063    #[test]
1064    fn test_data_cap_single_oversized_line_via_feed() {
1065        // A single oversized data line should still emit an event boundary
1066        // (with empty data) rather than being silently dropped.
1067        let mut parser = SseParser::with_max_event_data_bytes(10);
1068        let events = parser.feed("data: this-is-longer-than-ten-bytes\n\n");
1069        assert_eq!(events.len(), 1);
1070        assert_eq!(events[0].data, "");
1071    }
1072
1073    #[test]
1074    fn test_data_cap_accumulation_via_feed() {
1075        // Multiple small data lines that collectively exceed the cap:
1076        // accepted lines are kept, the line that would push past the cap is rejected.
1077        let mut parser = SseParser::with_max_event_data_bytes(10);
1078        // "abc\n" = 4 bytes after first append
1079        let events = parser.feed("data: abc\ndata: def\ndata: ghi\n\n");
1080        assert_eq!(events.len(), 1);
1081        // "abc\n" (4) + "def\n" (4) = 8 bytes; "ghi\n" (4) would make 12 > 10, rejected.
1082        // Trailing newline stripped on emit → "abc\ndef"
1083        assert_eq!(events[0].data, "abc\ndef");
1084    }
1085
1086    #[test]
1087    fn test_data_cap_exact_boundary_via_feed() {
1088        // Data that exactly reaches the cap should be accepted.
1089        let mut parser = SseParser::with_max_event_data_bytes(4);
1090        // "abc\n" = 4 bytes, exactly at cap
1091        let events = parser.feed("data: abc\n\n");
1092        assert_eq!(events.len(), 1);
1093        assert_eq!(events[0].data, "abc");
1094    }
1095
1096    #[test]
1097    fn test_data_cap_next_event_resets() {
1098        // After a capped event, the next event should start fresh.
1099        let mut parser = SseParser::with_max_event_data_bytes(6);
1100        let events = parser.feed("data: abcde\ndata: rejected\n\ndata: ok\n\n");
1101        assert_eq!(events.len(), 2);
1102        // First event: "abcde\n" = 6 bytes at cap; "rejected\n" would exceed → dropped.
1103        assert_eq!(events[0].data, "abcde");
1104        // Second event starts fresh with a clean data buffer.
1105        assert_eq!(events[1].data, "ok");
1106    }
1107
1108    #[test]
1109    fn test_data_cap_chunked_delivery() {
1110        // Cap enforcement must work even when data arrives in small chunks.
1111        let mut parser = SseParser::with_max_event_data_bytes(10);
1112        parser.feed("data: abc\n");
1113        parser.feed("data: def\n");
1114        // "abc\n" (4) + "def\n" (4) = 8; "toolong\n" (8) would make 16 > 10
1115        let events = parser.feed("data: toolong\n\n");
1116        assert_eq!(events.len(), 1);
1117        assert_eq!(events[0].data, "abc\ndef");
1118    }
1119
1120    #[test]
1121    fn test_data_cap_flush_path() {
1122        // Cap must also be enforced when the stream ends without a trailing blank line.
1123        let mut parser = SseParser::with_max_event_data_bytes(6);
1124        parser.feed("data: abcde\n");
1125        parser.feed("data: no\n");
1126        // "abcde\n" = 6 bytes at cap; "no\n" (3) would make 9 > 6 → rejected.
1127        let event = parser.flush().expect("should flush pending event");
1128        assert_eq!(event.data, "abcde");
1129    }
1130
1131    #[test]
1132    fn test_keep_alive_comment_does_not_emit_event() {
1133        let mut parser = SseParser::new();
1134        let events = parser.feed(": keepalive\n\n");
1135        assert!(events.is_empty());
1136    }
1137
1138    #[test]
1139    fn test_crlf_handling() {
1140        let mut parser = SseParser::new();
1141        let events = parser.feed("data: hello\r\n\r\n");
1142        assert_eq!(events.len(), 1);
1143        assert_eq!(events[0].data, "hello");
1144    }
1145
1146    #[test]
1147    fn test_flush_pending() {
1148        let mut parser = SseParser::new();
1149        let events = parser.feed("data: incomplete");
1150        assert!(events.is_empty());
1151        assert!(parser.has_pending());
1152
1153        // Flush at stream end
1154        let event = parser.flush();
1155        assert!(event.is_some());
1156        assert_eq!(event.unwrap().data, "incomplete");
1157    }
1158
1159    #[test]
1160    fn test_event_without_data_is_ignored() {
1161        let mut parser = SseParser::new();
1162        let events = parser.feed("event: ping\n\n");
1163        assert!(
1164            events.is_empty(),
1165            "event block without data should not emit an event"
1166        );
1167    }
1168
1169    #[test]
1170    fn test_unknown_field_is_ignored() {
1171        let mut parser = SseParser::new();
1172        let events = parser.feed("foo: bar\ndata: hello\n\n");
1173        assert_eq!(events.len(), 1);
1174        assert_eq!(events[0].data, "hello");
1175        assert_eq!(events[0].event, "message");
1176    }
1177
1178    #[test]
1179    fn test_error_event_parsing() {
1180        let mut parser = SseParser::new();
1181        let events = parser.feed("event: error\ndata: {\"message\":\"boom\"}\n\n");
1182        assert_eq!(events.len(), 1);
1183        assert_eq!(events[0].event, "error");
1184        assert_eq!(events[0].data, "{\"message\":\"boom\"}");
1185    }
1186
1187    #[test]
1188    fn test_empty_event_field_defaults_to_message() {
1189        let mut parser = SseParser::new();
1190        let input = "event\ndata: hello\n\n";
1191        let events = parser.feed(input);
1192        let diag = diag_json(
1193            "sse-empty-event-field-default",
1194            &parser,
1195            input,
1196            r#"{"event":"message","data":"hello"}"#,
1197            &format!("{events:?}"),
1198        );
1199
1200        assert_eq!(events.len(), 1, "{diag}");
1201        assert_eq!(events[0].event, "message", "{diag}");
1202        assert_eq!(events[0].data, "hello", "{diag}");
1203    }
1204
1205    #[test]
1206    fn test_large_payload_event() {
1207        let mut parser = SseParser::new();
1208        let payload = "x".repeat(128 * 1024);
1209        let input = format!("data: {payload}\n\n");
1210        let events = parser.feed(&input);
1211        assert_eq!(events.len(), 1);
1212        assert_eq!(events[0].data.len(), payload.len());
1213        assert_eq!(events[0].data, payload);
1214    }
1215
1216    #[test]
1217    fn test_buffer_limit_overflow_resets_parser_state() {
1218        let mut parser = SseParser::new();
1219        assert!(parser.feed("data: stale\n").is_empty());
1220
1221        let oversized = "x".repeat(10 * 1024 * 1024 + 1);
1222        let overflow_events = parser.feed(&oversized);
1223        assert_eq!(overflow_events.len(), 1);
1224        assert_eq!(overflow_events[0].event, "error");
1225        assert_eq!(overflow_events[0].data, "SSE buffer limit exceeded");
1226
1227        assert!(!parser.has_pending());
1228        assert!(parser.buffer.capacity() < 1024);
1229        assert!(parser.flush().is_none());
1230
1231        let fresh = parser.feed("data: fresh\n\n");
1232        assert_eq!(fresh.len(), 1);
1233        assert_eq!(fresh[0].data, "fresh");
1234    }
1235
1236    #[test]
1237    fn test_large_complete_chunk_does_not_trip_buffer_limit_fast_path() {
1238        let mut parser = SseParser::new();
1239        let payload = "x".repeat(10 * 1024 * 1024 + 1);
1240        let events = parser.feed(&format!("data: {payload}\n\n"));
1241
1242        assert_eq!(events.len(), 1);
1243        assert_eq!(events[0].event, "message");
1244        assert_eq!(events[0].data.len(), payload.len());
1245        assert_eq!(events[0].data, payload);
1246        assert!(!parser.has_pending());
1247        assert!(parser.buffer.capacity() < 1024);
1248        assert!(parser.flush().is_none());
1249    }
1250
1251    #[test]
1252    fn test_large_complete_chunk_does_not_trip_buffer_limit_with_buffered_prefix() {
1253        let mut parser = SseParser::new();
1254        assert!(parser.feed("data: ").is_empty());
1255
1256        let payload = "x".repeat(10 * 1024 * 1024 + 1);
1257        let events = parser.feed(&format!("{payload}\n\n"));
1258
1259        assert_eq!(events.len(), 1);
1260        assert_eq!(events[0].event, "message");
1261        assert_eq!(events[0].data.len(), payload.len());
1262        assert_eq!(events[0].data, payload);
1263        assert!(!parser.has_pending());
1264        assert!(parser.buffer.capacity() < 1024);
1265        assert!(parser.flush().is_none());
1266    }
1267
1268    #[test]
1269    fn test_rapid_sequential_events() {
1270        let mut parser = SseParser::new();
1271        let mut input = String::new();
1272        for i in 0..200 {
1273            let _ = write!(&mut input, "event: e{i}\ndata: payload{i}\n\n");
1274        }
1275        let events = parser.feed(&input);
1276        assert_eq!(events.len(), 200);
1277        assert_eq!(events[0].event, "e0");
1278        assert_eq!(events[0].data, "payload0");
1279        assert_eq!(events[199].event, "e199");
1280        assert_eq!(events[199].data, "payload199");
1281    }
1282
1283    #[test]
1284    fn test_stream_event_name_matrix() {
1285        let names = [
1286            "message_start",
1287            "content_block_start",
1288            "content_block_delta",
1289            "content_block_stop",
1290            "message_delta",
1291            "message_stop",
1292            "message",
1293            "error",
1294            "ping",
1295            "response.created",
1296            "response.output_text.delta",
1297            "response.completed",
1298        ];
1299
1300        let mut parser = SseParser::new();
1301        let mut input = String::new();
1302        for name in names {
1303            let _ = write!(&mut input, "event: {name}\ndata: {{}}\n\n");
1304        }
1305
1306        let events = parser.feed(&input);
1307        assert_eq!(events.len(), names.len());
1308        for (idx, name) in names.iter().enumerate() {
1309            assert_eq!(events[idx].event, *name);
1310            assert_eq!(events[idx].data, "{}");
1311        }
1312    }
1313
1314    #[test]
1315    fn test_anthropic_style_events() {
1316        let mut parser = SseParser::new();
1317
1318        // Simulate Anthropic API response
1319        let events = parser.feed(
1320            r#"event: message_start
1321data: {"type":"message_start","message":{"id":"msg_123"}}
1322
1323event: content_block_start
1324data: {"type":"content_block_start","index":0,"content_block":{"type":"text"}}
1325
1326event: content_block_delta
1327data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
1328
1329event: content_block_stop
1330data: {"type":"content_block_stop","index":0}
1331
1332event: message_stop
1333data: {"type":"message_stop"}
1334
1335"#,
1336        );
1337
1338        assert_eq!(events.len(), 5);
1339        assert_eq!(events[0].event, "message_start");
1340        assert!(events[0].data.contains("message_start"));
1341        assert_eq!(events[1].event, "content_block_start");
1342        assert_eq!(events[2].event, "content_block_delta");
1343        assert!(events[2].data.contains("Hello"));
1344        assert_eq!(events[3].event, "content_block_stop");
1345        assert_eq!(events[4].event, "message_stop");
1346    }
1347
1348    #[test]
1349    fn test_stream_yields_multiple_events_from_one_chunk() {
1350        let bytes = b"data: first\n\ndata: second\n\n".to_vec();
1351        let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1352
1353        futures::executor::block_on(async {
1354            let first = stream.next().await.expect("first event").expect("ok");
1355            assert_eq!(first.data, "first");
1356
1357            let second = stream.next().await.expect("second event").expect("ok");
1358            assert_eq!(second.data, "second");
1359
1360            assert!(stream.next().await.is_none());
1361        });
1362    }
1363
1364    #[test]
1365    fn test_stream_handles_utf8_split_across_chunks() {
1366        // Snowman is a 3-byte UTF-8 sequence: E2 98 83. Split it across chunks.
1367        let chunks = vec![Ok(b"data: \xE2".to_vec()), Ok(b"\x98\x83\n\n".to_vec())];
1368        let mut stream = SseStream::new(stream::iter(chunks));
1369
1370        futures::executor::block_on(async {
1371            let event = stream.next().await.expect("event").expect("ok");
1372            assert_eq!(event.data, "☃");
1373            assert!(stream.next().await.is_none());
1374        });
1375    }
1376
1377    #[test]
1378    fn test_stream_handles_crlf_split_across_partial_frames() {
1379        let chunks = vec![
1380            Ok(b"data: first\r".to_vec()),
1381            Ok(b"\n".to_vec()),
1382            Ok(b"\r".to_vec()),
1383            Ok(b"\n".to_vec()),
1384        ];
1385        let mut stream = SseStream::new(stream::iter(chunks));
1386
1387        futures::executor::block_on(async {
1388            let first = stream.next().await.expect("first event").expect("ok");
1389            let diag = json!({
1390                "fixture_id": "sse-crlf-split-across-chunks",
1391                "seed": "deterministic-static",
1392                "expected": {"event": "message", "data": "first"},
1393                "actual": {"event": first.event, "data": first.data},
1394            })
1395            .to_string();
1396            assert_eq!(first.data, "first", "{diag}");
1397            assert!(stream.next().await.is_none(), "{diag}");
1398        });
1399    }
1400
1401    #[test]
1402    fn test_stream_flushes_pending_event_at_end() {
1403        let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: last".to_vec())]));
1404
1405        futures::executor::block_on(async {
1406            let event = stream.next().await.expect("event").expect("ok");
1407            assert_eq!(event.data, "last");
1408            assert!(stream.next().await.is_none());
1409        });
1410    }
1411
1412    #[test]
1413    fn test_stream_errors_on_incomplete_utf8_at_end() {
1414        let mut stream = SseStream::new(stream::iter(vec![Ok(b"data: \xE2".to_vec())]));
1415
1416        futures::executor::block_on(async {
1417            let err = stream
1418                .next()
1419                .await
1420                .expect("expected a result")
1421                .expect_err("expected utf8 error");
1422            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1423            assert!(
1424                stream.next().await.is_none(),
1425                "incomplete UTF-8 at EOF should produce a terminal error"
1426            );
1427        });
1428    }
1429
1430    #[test]
1431    fn test_stream_surfaces_pending_event_before_utf8_error() {
1432        // Input: "data: ok\n\ndata: \xFF\n\n"
1433        // The parser feeds valid prefix "data: ok\n\ndata: " → emits event("ok"),
1434        // then recovers remainder "\n\n" after the 0xFF → completes partial "data: "
1435        // → emits event(""). All pending events drain before the error.
1436        let chunks = vec![Ok(b"data: ok\n\ndata: \xFF\n\n".to_vec())];
1437        let mut stream = SseStream::new(stream::iter(chunks));
1438
1439        futures::executor::block_on(async {
1440            let first = stream.next().await.expect("first item").expect("first ok");
1441            let diag = json!({
1442                "fixture_id": "sse-valid-event-before-invalid-utf8",
1443                "seed": "deterministic-static",
1444                "expected_sequence": ["Ok(data=ok)", "Ok(data=)", "Err(invalid utf8)"],
1445                "actual_first": {"event": first.event, "data": first.data},
1446            })
1447            .to_string();
1448            assert_eq!(first.data, "ok", "{diag}");
1449
1450            // The recovered remainder "\n\n" completes the partial "data: " line,
1451            // producing an empty-data event before the error surfaces.
1452            let second = stream
1453                .next()
1454                .await
1455                .expect("second item")
1456                .expect("second ok");
1457            assert_eq!(second.data, "", "{diag}");
1458
1459            let err = stream
1460                .next()
1461                .await
1462                .expect("third item")
1463                .expect_err("third should be utf8 error");
1464            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData, "{diag}");
1465        });
1466    }
1467
1468    #[test]
1469    fn test_stream_resumes_parsing_remainder_after_utf8_error() {
1470        // "data: ok\n\n" (valid) + 0xFF (invalid) + "data: after\n\n" (valid)
1471        // Sent in one chunk.
1472        // The recovery code feeds remainder "data: after\n\n" to pending_events
1473        // before the error is stored, so events drain first:
1474        // Expect: Ok(ok), Ok(after), Err(invalid)
1475
1476        let mut bytes = b"data: ok\n\n".to_vec();
1477        bytes.push(0xFF);
1478        bytes.extend_from_slice(b"data: after\n\n");
1479
1480        let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1481
1482        futures::executor::block_on(async {
1483            // 1. "ok" — from valid prefix before 0xFF
1484            let first = stream.next().await.expect("1").expect("ok");
1485            assert_eq!(first.data, "ok");
1486
1487            // 2. "after" — recovered from remainder after 0xFF (pending events drain first)
1488            let second = stream.next().await.expect("2").expect("after");
1489            assert_eq!(second.data, "after");
1490
1491            // 3. Error — surfaces after all pending events are delivered
1492            let err = stream.next().await.expect("3").expect_err("error");
1493            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1494        });
1495    }
1496
1497    #[test]
1498    fn test_stream_does_not_flush_partial_tail_after_utf8_error() {
1499        let mut bytes = b"data: ok\n\n".to_vec();
1500        bytes.push(0xFF);
1501        bytes.extend_from_slice(b"data: partial");
1502
1503        let mut stream = SseStream::new(stream::iter(vec![Ok(bytes)]));
1504
1505        futures::executor::block_on(async {
1506            let first = stream.next().await.expect("1").expect("ok");
1507            assert_eq!(first.data, "ok");
1508
1509            let err = stream.next().await.expect("2").expect_err("error");
1510            assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1511            assert!(
1512                stream.next().await.is_none(),
1513                "utf-8 parse errors should terminate the stream without flushing a partial tail"
1514            );
1515        });
1516    }
1517
1518    #[test]
1519    fn test_bom_stripping_with_preceding_empty_chunk() {
1520        let mut parser = SseParser::new();
1521        // Feed empty chunk first - should not mark BOM as checked
1522        let events = parser.feed("");
1523        assert!(events.is_empty());
1524
1525        // Feed content with BOM
1526        let events = parser.feed("\u{FEFF}data: hello\n\n");
1527        assert_eq!(events.len(), 1);
1528        assert_eq!(events[0].data, "hello");
1529        // Ensure the BOM didn't end up in the field name (causing it to be ignored)
1530        assert_eq!(events[0].event, "message");
1531    }
1532
1533    /// Regression test for fuzz crash artifact
1534    /// (crash-28de6b0685a7989f4c24ea8feea29aaa57d50053).
1535    ///
1536    /// Verifies the chunking invariant: feeding data whole, char-by-char, and
1537    /// split at midpoint must produce identical events and flush results.
1538    #[test]
1539    fn test_fuzz_regression_crash_28de6b() {
1540        let data: &[u8] = &[
1541            0x64, 0x3d, 0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x31, 0x0a, 0x5a, 0x61,
1542            0x74, 0x61, 0x3a, 0x20, 0x6c, 0x69, 0x6e, 0x65, 0x32, 0x0a, 0x64, 0x61, 0x74, 0x61,
1543            0x3a, 0x20, 0x6c, 0x9f, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1544            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x28, 0xcd, 0xcd, 0xa1,
1545            0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1,
1546            0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd,
1547            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82,
1548            0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x82, 0x40, 0x82, 0xcd,
1549            0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0xcd, 0x91,
1550            0x9a, 0x93, 0x69, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1551            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1552            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
1553            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1554            0x00, 0x00, 0x00,
1555        ];
1556        let input = String::from_utf8_lossy(data);
1557
1558        // Whole feed
1559        let mut parser_whole = SseParser::new();
1560        let events_whole = parser_whole.feed(&input);
1561        let flush_whole = parser_whole.flush();
1562
1563        // Char-by-char
1564        let mut parser_char = SseParser::new();
1565        let mut events_char = Vec::new();
1566        for ch in input.chars() {
1567            let mut buf = [0u8; 4];
1568            events_char.extend(parser_char.feed(ch.encode_utf8(&mut buf)));
1569        }
1570        let flush_char = parser_char.flush();
1571
1572        // Split at midpoint
1573        let mid = input.len() / 2;
1574        let mut split_at = mid;
1575        while !input.is_char_boundary(split_at) && split_at < input.len() {
1576            split_at += 1;
1577        }
1578        let (part1, part2) = input.split_at(split_at);
1579        let mut parser_split = SseParser::new();
1580        let mut events_split: Vec<_> = parser_split.feed(part1);
1581        events_split.extend(parser_split.feed(part2));
1582        let flush_split = parser_split.flush();
1583
1584        assert_eq!(events_whole, events_split, "whole vs split events");
1585        assert_eq!(flush_whole, flush_split, "whole vs split flush");
1586        assert_eq!(events_whole, events_char, "whole vs char events");
1587        assert_eq!(flush_whole, flush_char, "whole vs char flush");
1588    }
1589
1590    proptest! {
1591        #![proptest_config(ProptestConfig {
1592            cases: 256,
1593            max_shrink_iters: 200,
1594            .. ProptestConfig::default()
1595        })]
1596
1597        #[test]
1598        fn sse_chunking_invariant(
1599            events in prop::collection::vec(event_strategy(), 1..10),
1600            chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1601            terminal_delimiter in any::<bool>(),
1602        ) {
1603            let input = render_stream(&events, terminal_delimiter);
1604            let expected = parse_all(&input);
1605            let actual = parse_chunked(&input, &chunk_sizes);
1606            prop_assert_eq!(actual, expected);
1607        }
1608
1609        #[test]
1610        fn sse_line_ending_chunking_invariant(
1611            events in prop::collection::vec(event_strategy(), 1..10),
1612            chunk_sizes in prop::collection::vec(1usize..32, 0..20),
1613            terminal_delimiter in any::<bool>(),
1614            line_ending in line_ending_strategy(),
1615        ) {
1616            let input = render_stream_with_line_endings(&events, terminal_delimiter, line_ending);
1617            let expected = parse_all(&input);
1618            let actual = parse_chunked(&input, &chunk_sizes);
1619            prop_assert_eq!(actual, expected);
1620        }
1621
1622        #[test]
1623        fn sse_id_with_null_bytes_is_rejected(
1624            id in id_field_strategy(),
1625            data in ascii_line(),
1626        ) {
1627            let input = format!("id: {id}\ndata: {data}\n\n");
1628            let events = parse_all(&input);
1629
1630            prop_assert_eq!(events.len(), 1);
1631            let expected_id = if id.contains('\0') { None } else { Some(id) };
1632            prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1633        }
1634
1635        #[test]
1636        fn sse_retry_field_fuzz_last_assignment_wins(
1637            retry_values in prop::collection::vec(retry_field_strategy(), 1..6),
1638            data in ascii_line(),
1639        ) {
1640            let mut input = String::new();
1641            for value in &retry_values {
1642                let _ = writeln!(&mut input, "retry: {value}");
1643            }
1644            let _ = writeln!(&mut input, "data: {data}");
1645            input.push('\n');
1646
1647            let events = parse_all(&input);
1648            prop_assert_eq!(events.len(), 1);
1649
1650            let expected_retry = retry_values
1651                .last()
1652                .and_then(|value| value.parse::<u64>().ok());
1653            prop_assert_eq!(events[0].retry, expected_retry);
1654        }
1655
1656        #[test]
1657        fn sse_duplicate_fields_apply_spec_semantics(
1658            event_names in prop::collection::vec("[a-z_]{1,12}", 1..6),
1659            ids in prop::collection::vec(id_field_strategy(), 0..6),
1660            data_lines in prop::collection::vec(ascii_line(), 1..6),
1661        ) {
1662            let mut input = String::new();
1663
1664            for event_name in &event_names {
1665                let _ = writeln!(&mut input, "event: {event_name}");
1666            }
1667            for id in &ids {
1668                let _ = writeln!(&mut input, "id: {id}");
1669            }
1670            for line in &data_lines {
1671                let _ = writeln!(&mut input, "data: {line}");
1672            }
1673            input.push('\n');
1674
1675            let events = parse_all(&input);
1676            prop_assert_eq!(events.len(), 1);
1677            prop_assert_eq!(events[0].event.as_ref(), event_names.last().expect("non-empty"));
1678
1679            let expected_id = ids.iter().rfind(|id| !id.contains('\0')).cloned();
1680            prop_assert_eq!(events[0].id.as_ref(), expected_id.as_ref());
1681
1682            let expected_data = data_lines.join("\n");
1683            prop_assert_eq!(events[0].data.as_str(), expected_data);
1684        }
1685
1686        #[test]
1687        fn sse_oversized_data_fields_round_trip(len in oversized_data_len_strategy(),) {
1688            let payload = "x".repeat(len);
1689            let input = format!("data: {payload}\n\n");
1690            let events = parse_all(&input);
1691
1692            prop_assert_eq!(events.len(), 1);
1693            prop_assert_eq!(events[0].data.len(), len);
1694            prop_assert_eq!(events[0].data.as_str(), payload);
1695        }
1696
1697        #[test]
1698        fn sse_stream_utf8_valid_input_chunking_invariant(
1699            lines in prop::collection::vec(unicode_line(), 1..5),
1700            chunk_sizes in prop::collection::vec(1usize..16, 0..24),
1701        ) {
1702            let mut input = String::new();
1703            for line in &lines {
1704                let _ = writeln!(&mut input, "data: {line}");
1705            }
1706            input.push('\n');
1707
1708            let (single_events, single_errors) = parse_stream_single_chunk(input.as_bytes());
1709            let (chunked_events, chunked_errors) =
1710                parse_stream_chunked(input.as_bytes(), &chunk_sizes);
1711
1712            prop_assert!(single_errors.is_empty(), "single-chunk had UTF-8 errors");
1713            prop_assert!(chunked_errors.is_empty(), "chunked parse had UTF-8 errors");
1714            prop_assert_eq!(chunked_events, single_events);
1715        }
1716
1717        #[test]
1718        fn sse_stream_bom_start_stripped_embedded_preserved(
1719            left in unicode_line(),
1720            right in unicode_line(),
1721            chunk_sizes in prop::collection::vec(1usize..8, 0..24),
1722        ) {
1723            let start_bom = format!("\u{FEFF}data: {left}{right}\n\n");
1724            let (start_events, start_errors) =
1725                parse_stream_chunked(start_bom.as_bytes(), &chunk_sizes);
1726            prop_assert!(start_errors.is_empty(), "start BOM should be valid UTF-8");
1727            prop_assert_eq!(start_events.len(), 1);
1728            let expected_start = format!("{left}{right}");
1729            prop_assert_eq!(start_events[0].data.as_str(), expected_start);
1730
1731            let embedded_bom = format!("data: {left}\u{FEFF}{right}\n\n");
1732            let (embedded_events, embedded_errors) =
1733                parse_stream_chunked(embedded_bom.as_bytes(), &chunk_sizes);
1734            prop_assert!(embedded_errors.is_empty(), "embedded BOM should be preserved");
1735            prop_assert_eq!(embedded_events.len(), 1);
1736            let expected_embedded = format!("{left}\u{FEFF}{right}");
1737            prop_assert_eq!(embedded_events[0].data.as_str(), expected_embedded);
1738        }
1739
1740        #[test]
1741        fn sse_stream_invalid_utf8_yields_invalid_data_errors(
1742            prefix in ascii_line(),
1743            suffix in ascii_line(),
1744            invalid_len in 1usize..4,
1745            chunk_sizes in prop::collection::vec(1usize..8, 0..20),
1746        ) {
1747            let mut bytes = format!("data: {prefix}\n\n").into_bytes();
1748            bytes.extend(std::iter::repeat_n(0xFFu8, invalid_len));
1749            bytes.extend(format!("data: {suffix}\n\n").as_bytes());
1750
1751            let (events, errors) = parse_stream_chunked_limited(&bytes, &chunk_sizes, 32);
1752            prop_assert!(
1753                events.iter().any(|event| event.data == prefix),
1754                "event before invalid sequence should still be surfaced"
1755            );
1756            prop_assert!(!errors.is_empty(), "invalid UTF-8 should emit at least one error");
1757            prop_assert!(
1758                errors.iter().all(|kind| *kind == ErrorKind::InvalidData),
1759                "all stream decoding errors must be InvalidData"
1760            );
1761        }
1762    }
1763}