Skip to main content

agent_sdk_providers/
streaming.rs

1//! Streaming types for LLM responses.
2//!
3//! This module provides types for handling streaming responses from LLM providers.
4//! The [`StreamDelta`] enum represents individual events in a streaming response,
5//! and [`StreamAccumulator`] helps collect these events into a final response.
6
7use agent_sdk_foundation::llm::{ContentBlock, StopReason, Usage};
8#[cfg(any(feature = "openai", feature = "openai-codex"))]
9use bytes::BytesMut;
10use futures::Stream;
11use std::collections::HashMap;
12use std::pin::Pin;
13
14/// Upper bound on the block index [`StreamAccumulator`] will materialize.
15///
16/// `block_index` is taken verbatim from provider wire data (the SSE `index`
17/// field) and `base_url` is user-configurable (any OpenAI-compatible endpoint),
18/// so a corrupted or hostile event carrying a huge index could otherwise drive
19/// an unbounded `Vec` allocation and exhaust host memory. Text/thinking deltas
20/// whose index exceeds this bound are dropped with a warning rather than grown
21/// into.
22const MAX_BLOCK_INDEX: usize = 4096;
23
24/// Incremental splitter for line-delimited SSE byte streams.
25///
26/// `reqwest`'s `bytes_stream` yields arbitrary byte boundaries, so a multi-byte
27/// UTF-8 character can land split across two network chunks. Decoding each raw
28/// chunk independently with `String::from_utf8_lossy` permanently corrupts such
29/// characters into `U+FFFD` in user-visible text deltas. This buffer instead
30/// accumulates raw bytes and only UTF-8-decodes *complete* lines (terminated by
31/// `\n`); because a newline byte (`0x0A`) can never be part of a multi-byte
32/// UTF-8 sequence, the end of a complete line is always a valid character
33/// boundary and decodes losslessly.
34///
35/// It also avoids the quadratic `buffer = buffer[pos + 1..].to_string()` copy of
36/// the naive splitter: [`BytesMut::split_to`] advances the read cursor without
37/// copying the unconsumed tail, so splitting is amortized O(1) per line instead
38/// of O(remaining-buffer).
39#[cfg(any(feature = "openai", feature = "openai-codex"))]
40#[derive(Debug, Default)]
41pub(crate) struct SseLineBuffer {
42    buf: BytesMut,
43}
44
45#[cfg(any(feature = "openai", feature = "openai-codex"))]
46impl SseLineBuffer {
47    /// Create an empty buffer.
48    #[must_use]
49    pub(crate) fn new() -> Self {
50        Self::default()
51    }
52
53    /// Append a freshly received network chunk.
54    pub(crate) fn extend(&mut self, chunk: &[u8]) {
55        self.buf.extend_from_slice(chunk);
56    }
57
58    /// Pop the next complete line (without its trailing `\n`), or `None` when no
59    /// full line is buffered yet. Incomplete trailing bytes — including a
60    /// multi-byte character split across a chunk boundary — stay buffered for the
61    /// next call.
62    pub(crate) fn next_line(&mut self) -> Option<String> {
63        let newline = self.buf.iter().position(|&b| b == b'\n')?;
64        let mut line = self.buf.split_to(newline + 1);
65        line.truncate(newline);
66        Some(String::from_utf8_lossy(&line).into_owned())
67    }
68}
69
70/// Events yielded during streaming LLM responses.
71///
72/// Each variant represents a different type of event that can occur
73/// during a streaming response from an LLM provider.
74#[derive(Debug, Clone)]
75#[non_exhaustive]
76pub enum StreamDelta {
77    /// A text delta for streaming text content.
78    TextDelta {
79        /// The text fragment to append
80        delta: String,
81        /// Index of the content block being streamed
82        block_index: usize,
83    },
84
85    /// A thinking delta for streaming thinking/reasoning content.
86    ThinkingDelta {
87        /// The thinking fragment to append
88        delta: String,
89        /// Index of the content block being streamed
90        block_index: usize,
91    },
92
93    /// Start of a tool use block (name and id are known).
94    ToolUseStart {
95        /// Unique identifier for this tool call
96        id: String,
97        /// Name of the tool being called
98        name: String,
99        /// Index of the content block
100        block_index: usize,
101        /// Optional thought signature (used by Gemini 3.x models)
102        thought_signature: Option<String>,
103    },
104
105    /// Incremental JSON for tool input (partial/incomplete JSON).
106    ToolInputDelta {
107        /// Tool call ID this delta belongs to
108        id: String,
109        /// JSON fragment to append
110        delta: String,
111        /// Index of the content block
112        block_index: usize,
113    },
114
115    /// Usage information (typically at stream end).
116    Usage(Usage),
117
118    /// Stream completed with stop reason.
119    Done {
120        /// Why the stream ended
121        stop_reason: Option<StopReason>,
122    },
123
124    /// A signature delta for a thinking block.
125    SignatureDelta {
126        /// The signature fragment to append
127        delta: String,
128        /// Index of the content block being streamed
129        block_index: usize,
130    },
131
132    /// A redacted thinking block received at `content_block_start`.
133    RedactedThinking {
134        /// Opaque data payload
135        data: String,
136        /// Index of the content block
137        block_index: usize,
138    },
139
140    /// Error during streaming.
141    Error {
142        /// Error message
143        message: String,
144        /// Categorization of the error so downstream consumers can map
145        /// it back to the correct [`agent_sdk_foundation::llm::ChatOutcome`]
146        /// variant or audit-record `TurnAttemptOutcome` without losing
147        /// the rate-limit / server-error / invalid-request distinction.
148        kind: StreamErrorKind,
149    },
150}
151
152/// Classification of a [`StreamDelta::Error`] event.
153///
154/// Mirrors [`ChatOutcome`](agent_sdk_foundation::llm::ChatOutcome)'s error
155/// variants so providers that emit errors via streaming preserve the
156/// same precision that non-streaming `chat()` callers see — every
157/// supported provider can map its underlying error (HTTP status,
158/// validation failure, mid-stream disconnect) directly onto one of
159/// these categories at the construction site.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161#[non_exhaustive]
162pub enum StreamErrorKind {
163    /// Provider returned HTTP 429 / explicit rate-limit signal.
164    RateLimited,
165    /// Provider returned HTTP 5xx, the connection dropped mid-stream,
166    /// or the provider reported a transient runtime failure.
167    ServerError,
168    /// Caller-side error: validation failure before dispatch, HTTP
169    /// 4xx other than 429, or a non-retriable provider rejection.
170    InvalidRequest,
171    /// Escape hatch for a streaming error a provider could not classify
172    /// into one of the categories above.
173    ///
174    /// Producers should prefer a specific variant whenever the
175    /// underlying signal (HTTP status, validation failure, mid-stream
176    /// disconnect) allows it; `Unknown` exists so future error sources
177    /// and providers can be added without a breaking change. It is
178    /// treated as non-recoverable by [`StreamErrorKind::is_recoverable`]
179    /// (callers should not blindly retry an unclassified failure).
180    Unknown,
181}
182
183impl StreamErrorKind {
184    /// `true` when the error is potentially transient and the caller
185    /// may retry.  Rate-limit and server errors are recoverable;
186    /// invalid-request is not.
187    #[must_use]
188    pub const fn is_recoverable(self) -> bool {
189        matches!(self, Self::RateLimited | Self::ServerError)
190    }
191}
192
193/// Type alias for a boxed stream of stream deltas.
194pub type StreamBox<'a> = Pin<Box<dyn Stream<Item = anyhow::Result<StreamDelta>> + Send + 'a>>;
195
196/// Helper to accumulate streamed content into a final response.
197///
198/// This struct collects [`StreamDelta`] events and can convert them
199/// into the final content blocks once the stream is complete.
200#[derive(Debug, Default)]
201pub struct StreamAccumulator {
202    /// Accumulated text for each block index
203    text_blocks: Vec<String>,
204    /// Accumulated thinking blocks for each block index
205    thinking_blocks: Vec<String>,
206    /// Accumulated signatures keyed by block index
207    thinking_signatures: HashMap<usize, String>,
208    /// Redacted thinking blocks: (`block_index`, data)
209    redacted_thinking_blocks: Vec<(usize, String)>,
210    /// Accumulated tool use calls
211    tool_uses: Vec<ToolUseAccumulator>,
212    /// Usage information from the stream
213    usage: Option<Usage>,
214    /// Stop reason from the stream
215    stop_reason: Option<StopReason>,
216}
217
218/// Accumulator for a single tool use during streaming.
219#[derive(Debug, Default)]
220pub struct ToolUseAccumulator {
221    /// Tool call ID
222    pub id: String,
223    /// Tool name
224    pub name: String,
225    /// Accumulated JSON input (may be incomplete during streaming)
226    pub input_json: String,
227    /// Block index for ordering
228    pub block_index: usize,
229    /// Optional thought signature (used by Gemini 3.x models)
230    pub thought_signature: Option<String>,
231}
232
233impl StreamAccumulator {
234    /// Create a new empty accumulator.
235    #[must_use]
236    pub fn new() -> Self {
237        Self::default()
238    }
239
240    /// Apply a stream delta to the accumulator.
241    pub fn apply(&mut self, delta: &StreamDelta) {
242        match delta {
243            StreamDelta::TextDelta { delta, block_index } => {
244                if *block_index > MAX_BLOCK_INDEX {
245                    log::warn!(
246                        "dropping TextDelta with out-of-range block_index {block_index} (max {MAX_BLOCK_INDEX})"
247                    );
248                    return;
249                }
250                while self.text_blocks.len() <= *block_index {
251                    self.text_blocks.push(String::new());
252                }
253                self.text_blocks[*block_index].push_str(delta);
254            }
255            StreamDelta::ThinkingDelta { delta, block_index } => {
256                if *block_index > MAX_BLOCK_INDEX {
257                    log::warn!(
258                        "dropping ThinkingDelta with out-of-range block_index {block_index} (max {MAX_BLOCK_INDEX})"
259                    );
260                    return;
261                }
262                while self.thinking_blocks.len() <= *block_index {
263                    self.thinking_blocks.push(String::new());
264                }
265                self.thinking_blocks[*block_index].push_str(delta);
266            }
267            StreamDelta::ToolUseStart {
268                id,
269                name,
270                block_index,
271                thought_signature,
272            } => {
273                self.tool_uses.push(ToolUseAccumulator {
274                    id: id.clone(),
275                    name: name.clone(),
276                    input_json: String::new(),
277                    block_index: *block_index,
278                    thought_signature: thought_signature.clone(),
279                });
280            }
281            StreamDelta::ToolInputDelta { id, delta, .. } => {
282                if let Some(tool) = self.tool_uses.iter_mut().find(|t| t.id == *id) {
283                    tool.input_json.push_str(delta);
284                }
285            }
286            StreamDelta::SignatureDelta { delta, block_index } => {
287                self.thinking_signatures
288                    .entry(*block_index)
289                    .or_default()
290                    .push_str(delta);
291            }
292            StreamDelta::RedactedThinking { data, block_index } => {
293                self.redacted_thinking_blocks
294                    .push((*block_index, data.clone()));
295            }
296            StreamDelta::Usage(u) => {
297                self.usage = Some(u.clone());
298            }
299            StreamDelta::Done { stop_reason } => {
300                self.stop_reason = *stop_reason;
301            }
302            StreamDelta::Error { .. } => {}
303        }
304    }
305
306    /// Get the accumulated usage information.
307    #[must_use]
308    pub const fn usage(&self) -> Option<&Usage> {
309        self.usage.as_ref()
310    }
311
312    /// Get the stop reason.
313    #[must_use]
314    pub const fn stop_reason(&self) -> Option<&StopReason> {
315        self.stop_reason.as_ref()
316    }
317
318    /// Convert accumulated content to `ContentBlock`s.
319    ///
320    /// This consumes the accumulator and returns the final content blocks.
321    /// Tool use JSON is parsed at this point; invalid JSON results in a null input.
322    #[must_use]
323    pub fn into_content_blocks(self) -> Vec<ContentBlock> {
324        let mut blocks: Vec<(usize, ContentBlock)> = Vec::new();
325
326        // Add thinking blocks with their indices, attaching signatures
327        let mut signatures = self.thinking_signatures;
328        for (idx, thinking) in self.thinking_blocks.into_iter().enumerate() {
329            if !thinking.is_empty() {
330                let signature = signatures.remove(&idx).filter(|s| !s.is_empty());
331                blocks.push((
332                    idx,
333                    ContentBlock::Thinking {
334                        thinking,
335                        signature,
336                    },
337                ));
338            }
339        }
340
341        // Add redacted thinking blocks
342        for (idx, data) in self.redacted_thinking_blocks {
343            blocks.push((idx, ContentBlock::RedactedThinking { data }));
344        }
345
346        // Add text blocks with their indices
347        for (idx, text) in self.text_blocks.into_iter().enumerate() {
348            if !text.is_empty() {
349                blocks.push((idx, ContentBlock::Text { text }));
350            }
351        }
352
353        // Add tool uses with their indices
354        for tool in self.tool_uses {
355            let input: serde_json::Value =
356                serde_json::from_str(&tool.input_json).unwrap_or_else(|e| {
357                    log::warn!(
358                        "Failed to parse streamed tool input JSON for tool '{}' (id={}): {} — \
359                         input_json ({} bytes): '{}'",
360                        tool.name,
361                        tool.id,
362                        e,
363                        tool.input_json.len(),
364                        tool.input_json.chars().take(500).collect::<String>(),
365                    );
366                    serde_json::json!({})
367                });
368            blocks.push((
369                tool.block_index,
370                ContentBlock::ToolUse {
371                    id: tool.id,
372                    name: tool.name,
373                    input,
374                    thought_signature: tool.thought_signature,
375                },
376            ));
377        }
378
379        // Sort by block index to maintain order
380        blocks.sort_by_key(|(idx, _)| *idx);
381
382        blocks.into_iter().map(|(_, block)| block).collect()
383    }
384
385    /// Take ownership of accumulated usage.
386    pub const fn take_usage(&mut self) -> Option<Usage> {
387        self.usage.take()
388    }
389
390    /// Take ownership of stop reason.
391    pub const fn take_stop_reason(&mut self) -> Option<StopReason> {
392        self.stop_reason.take()
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_accumulator_text_deltas() {
402        let mut acc = StreamAccumulator::new();
403
404        acc.apply(&StreamDelta::TextDelta {
405            delta: "Hello".to_string(),
406            block_index: 0,
407        });
408        acc.apply(&StreamDelta::TextDelta {
409            delta: " world".to_string(),
410            block_index: 0,
411        });
412
413        let blocks = acc.into_content_blocks();
414        assert_eq!(blocks.len(), 1);
415        assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello world"));
416    }
417
418    #[test]
419    fn test_accumulator_multiple_text_blocks() {
420        let mut acc = StreamAccumulator::new();
421
422        acc.apply(&StreamDelta::TextDelta {
423            delta: "First".to_string(),
424            block_index: 0,
425        });
426        acc.apply(&StreamDelta::TextDelta {
427            delta: "Second".to_string(),
428            block_index: 1,
429        });
430
431        let blocks = acc.into_content_blocks();
432        assert_eq!(blocks.len(), 2);
433        assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "First"));
434        assert!(matches!(&blocks[1], ContentBlock::Text { text } if text == "Second"));
435    }
436
437    #[test]
438    fn test_accumulator_thinking_signature() {
439        let mut acc = StreamAccumulator::new();
440
441        acc.apply(&StreamDelta::ThinkingDelta {
442            delta: "Reasoning".to_string(),
443            block_index: 0,
444        });
445        acc.apply(&StreamDelta::SignatureDelta {
446            delta: "sig_123".to_string(),
447            block_index: 0,
448        });
449
450        let blocks = acc.into_content_blocks();
451        assert_eq!(blocks.len(), 1);
452        assert!(matches!(
453            &blocks[0],
454            ContentBlock::Thinking { thinking, signature }
455            if thinking == "Reasoning" && signature.as_deref() == Some("sig_123")
456        ));
457    }
458
459    #[test]
460    fn test_accumulator_tool_use() {
461        let mut acc = StreamAccumulator::new();
462
463        acc.apply(&StreamDelta::ToolUseStart {
464            id: "call_123".to_string(),
465            name: "read_file".to_string(),
466            block_index: 0,
467            thought_signature: None,
468        });
469        acc.apply(&StreamDelta::ToolInputDelta {
470            id: "call_123".to_string(),
471            delta: r#"{"path":"#.to_string(),
472            block_index: 0,
473        });
474        acc.apply(&StreamDelta::ToolInputDelta {
475            id: "call_123".to_string(),
476            delta: r#""test.txt"}"#.to_string(),
477            block_index: 0,
478        });
479
480        let blocks = acc.into_content_blocks();
481        assert_eq!(blocks.len(), 1);
482        match &blocks[0] {
483            ContentBlock::ToolUse {
484                id, name, input, ..
485            } => {
486                assert_eq!(id, "call_123");
487                assert_eq!(name, "read_file");
488                assert_eq!(input["path"], "test.txt");
489            }
490            _ => panic!("Expected ToolUse block"),
491        }
492    }
493
494    #[test]
495    fn test_accumulator_mixed_content() {
496        let mut acc = StreamAccumulator::new();
497
498        acc.apply(&StreamDelta::TextDelta {
499            delta: "Let me read that file.".to_string(),
500            block_index: 0,
501        });
502        acc.apply(&StreamDelta::ToolUseStart {
503            id: "call_456".to_string(),
504            name: "read_file".to_string(),
505            block_index: 1,
506            thought_signature: None,
507        });
508        acc.apply(&StreamDelta::ToolInputDelta {
509            id: "call_456".to_string(),
510            delta: r#"{"path":"file.txt"}"#.to_string(),
511            block_index: 1,
512        });
513        acc.apply(&StreamDelta::Usage(Usage {
514            input_tokens: 100,
515            output_tokens: 50,
516            cached_input_tokens: 0,
517            cache_creation_input_tokens: 0,
518        }));
519        acc.apply(&StreamDelta::Done {
520            stop_reason: Some(StopReason::ToolUse),
521        });
522
523        assert!(acc.usage().is_some());
524        assert_eq!(acc.usage().map(|u| u.input_tokens), Some(100));
525        assert!(matches!(acc.stop_reason(), Some(StopReason::ToolUse)));
526
527        let blocks = acc.into_content_blocks();
528        assert_eq!(blocks.len(), 2);
529        assert!(matches!(&blocks[0], ContentBlock::Text { .. }));
530        assert!(matches!(&blocks[1], ContentBlock::ToolUse { .. }));
531    }
532
533    #[test]
534    fn test_accumulator_invalid_tool_json() {
535        let mut acc = StreamAccumulator::new();
536
537        acc.apply(&StreamDelta::ToolUseStart {
538            id: "call_789".to_string(),
539            name: "test_tool".to_string(),
540            block_index: 0,
541            thought_signature: None,
542        });
543        acc.apply(&StreamDelta::ToolInputDelta {
544            id: "call_789".to_string(),
545            delta: "invalid json {".to_string(),
546            block_index: 0,
547        });
548
549        let blocks = acc.into_content_blocks();
550        assert_eq!(blocks.len(), 1);
551        match &blocks[0] {
552            ContentBlock::ToolUse { input, .. } => {
553                assert!(input.is_object());
554            }
555            _ => panic!("Expected ToolUse block"),
556        }
557    }
558
559    #[test]
560    fn test_accumulator_empty_tool_input_falls_back_to_empty_object() {
561        // If no ToolInputDelta is received (e.g., stream interrupted or
562        // deltas had mismatched IDs), the tool use block should still be
563        // produced with an empty object so that the error is attributable
564        // to the tool rather than silently lost.
565        let mut acc = StreamAccumulator::new();
566
567        acc.apply(&StreamDelta::ToolUseStart {
568            id: "call_empty".to_string(),
569            name: "read".to_string(),
570            block_index: 0,
571            thought_signature: None,
572        });
573        // No ToolInputDelta applied
574
575        let blocks = acc.into_content_blocks();
576        assert_eq!(blocks.len(), 1);
577        match &blocks[0] {
578            ContentBlock::ToolUse { input, name, .. } => {
579                assert_eq!(name, "read");
580                assert_eq!(input, &serde_json::json!({}));
581            }
582            _ => panic!("Expected ToolUse block"),
583        }
584    }
585
586    #[test]
587    fn test_accumulator_mismatched_delta_id_drops_input() {
588        // If ToolInputDelta has a different ID than any ToolUseStart,
589        // the input is silently dropped (the tool gets empty {}).
590        let mut acc = StreamAccumulator::new();
591
592        acc.apply(&StreamDelta::ToolUseStart {
593            id: "call_A".to_string(),
594            name: "bash".to_string(),
595            block_index: 0,
596            thought_signature: None,
597        });
598        // Delta with wrong ID
599        acc.apply(&StreamDelta::ToolInputDelta {
600            id: "call_B".to_string(),
601            delta: r#"{"command":"ls"}"#.to_string(),
602            block_index: 0,
603        });
604
605        let blocks = acc.into_content_blocks();
606        assert_eq!(blocks.len(), 1);
607        match &blocks[0] {
608            ContentBlock::ToolUse { input, .. } => {
609                // Input should be empty because the delta had a mismatched ID
610                assert_eq!(input, &serde_json::json!({}));
611            }
612            _ => panic!("Expected ToolUse block"),
613        }
614    }
615
616    #[test]
617    fn test_accumulator_empty() {
618        let acc = StreamAccumulator::new();
619        let blocks = acc.into_content_blocks();
620        assert!(blocks.is_empty());
621    }
622
623    #[test]
624    fn test_accumulator_skips_empty_text() {
625        let mut acc = StreamAccumulator::new();
626
627        acc.apply(&StreamDelta::TextDelta {
628            delta: String::new(),
629            block_index: 0,
630        });
631        acc.apply(&StreamDelta::TextDelta {
632            delta: "Hello".to_string(),
633            block_index: 1,
634        });
635
636        let blocks = acc.into_content_blocks();
637        assert_eq!(blocks.len(), 1);
638        assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello"));
639    }
640
641    #[test]
642    fn test_accumulator_ignores_out_of_range_block_index() {
643        // A hostile/corrupted event with a huge block_index must not drive an
644        // unbounded Vec allocation. The delta is dropped, leaving the accumulator
645        // tiny rather than allocating billions of empty Strings.
646        let mut acc = StreamAccumulator::new();
647
648        acc.apply(&StreamDelta::TextDelta {
649            delta: "ok".to_string(),
650            block_index: 0,
651        });
652        acc.apply(&StreamDelta::TextDelta {
653            delta: "boom".to_string(),
654            block_index: usize::MAX,
655        });
656        acc.apply(&StreamDelta::ThinkingDelta {
657            delta: "boom".to_string(),
658            block_index: usize::MAX,
659        });
660
661        let blocks = acc.into_content_blocks();
662        assert_eq!(blocks.len(), 1);
663        assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "ok"));
664    }
665
666    #[cfg(any(feature = "openai", feature = "openai-codex"))]
667    #[test]
668    fn test_sse_line_buffer_splits_multiple_lines() {
669        let mut buf = SseLineBuffer::new();
670        buf.extend(b"data: one\ndata: two\n");
671        assert_eq!(buf.next_line().as_deref(), Some("data: one"));
672        assert_eq!(buf.next_line().as_deref(), Some("data: two"));
673        assert_eq!(buf.next_line(), None);
674    }
675
676    #[cfg(any(feature = "openai", feature = "openai-codex"))]
677    #[test]
678    fn test_sse_line_buffer_buffers_partial_line_until_newline() {
679        let mut buf = SseLineBuffer::new();
680        buf.extend(b"data: par");
681        assert_eq!(buf.next_line(), None);
682        buf.extend(b"tial\n");
683        assert_eq!(buf.next_line().as_deref(), Some("data: partial"));
684    }
685
686    #[cfg(any(feature = "openai", feature = "openai-codex"))]
687    #[test]
688    fn test_sse_line_buffer_handles_utf8_split_across_chunks() {
689        // "café" — the 'é' is the two bytes 0xC3 0xA9. Split the chunk boundary
690        // *inside* that character: the naive per-chunk from_utf8_lossy would emit
691        // a U+FFFD replacement char; the line buffer must decode it losslessly
692        // because it only decodes the complete line.
693        let mut buf = SseLineBuffer::new();
694        let line = "data: café\n";
695        let bytes = line.as_bytes();
696        let split = bytes.len() - 2; // between 0xC3 and 0xA9
697        buf.extend(&bytes[..split]);
698        assert_eq!(buf.next_line(), None);
699        buf.extend(&bytes[split..]);
700        assert_eq!(buf.next_line().as_deref(), Some("data: café"));
701    }
702}