sparrow/provider/sse_buffer.rs
1//! Reassemble newline-delimited frames across arbitrary network chunks.
2//!
3//! Provider streams (SSE for OpenAI/Anthropic, NDJSON for Ollama) deliver
4//! "one frame per line". A single TCP/HTTP chunk can split a frame at any byte,
5//! which the previous code did not handle — it ran `text.lines()` on each
6//! chunk in isolation and silently dropped the trailing partial line, eating
7//! characters mid-stream (e.g. "à rebours" arrived as "àours" because the
8//! "reb" segment lived on the boundary).
9//!
10//! `LineBuffer` is the minimal fix: feed it chunk bytes, get back the complete
11//! lines that have accumulated. The unfinished tail stays inside the buffer
12//! until the next chunk completes it.
13
14/// Accumulates raw bytes and yields complete `\n`-terminated lines. Partial
15/// trailing data is preserved across calls. Tolerates `\r\n` and mixed UTF-8
16/// by accumulating bytes first and only converting to `&str` per complete line.
17#[derive(Default)]
18pub struct LineBuffer {
19 buf: Vec<u8>,
20}
21
22impl LineBuffer {
23 pub fn new() -> Self {
24 Self::default()
25 }
26
27 /// Append `bytes` and return every complete line that is now available.
28 /// The returned strings have any trailing `\r` stripped and do NOT include
29 /// the terminating `\n`.
30 pub fn push(&mut self, bytes: &[u8]) -> Vec<String> {
31 self.buf.extend_from_slice(bytes);
32 let mut out = Vec::new();
33 // Find each newline and split.
34 let mut start = 0usize;
35 let mut i = 0usize;
36 while i < self.buf.len() {
37 if self.buf[i] == b'\n' {
38 let end = if i > start && self.buf[i - 1] == b'\r' {
39 i - 1
40 } else {
41 i
42 };
43 // String::from_utf8_lossy here is intentional: if the LLM ever
44 // sends a UTF-8 character split across chunks we'd see U+FFFD,
45 // but at the byte level the entire char now sits on this side
46 // of the newline (because UTF-8 multibyte sequences never
47 // contain 0x0A), so this stays lossless for valid UTF-8.
48 out.push(String::from_utf8_lossy(&self.buf[start..end]).into_owned());
49 start = i + 1;
50 }
51 i += 1;
52 }
53 if start > 0 {
54 // Drop emitted bytes, keep the unfinished tail.
55 self.buf.drain(..start);
56 }
57 out
58 }
59
60 /// Drain anything left without requiring a final newline. Useful when the
61 /// upstream closes without flushing a trailing `\n`.
62 pub fn take_remaining(&mut self) -> Option<String> {
63 if self.buf.is_empty() {
64 return None;
65 }
66 let s = String::from_utf8_lossy(&self.buf).into_owned();
67 self.buf.clear();
68 if s.is_empty() { None } else { Some(s) }
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75
76 #[test]
77 fn full_line_on_one_push_returns_it() {
78 let mut b = LineBuffer::new();
79 let lines = b.push(b"hello\n");
80 assert_eq!(lines, vec!["hello".to_string()]);
81 assert!(b.take_remaining().is_none());
82 }
83
84 #[test]
85 fn split_line_is_reassembled_across_pushes() {
86 let mut b = LineBuffer::new();
87 assert!(b.push(b"hel").is_empty());
88 assert!(b.push(b"lo wor").is_empty());
89 let lines = b.push(b"ld\n");
90 assert_eq!(lines, vec!["hello world".to_string()]);
91 }
92
93 #[test]
94 fn multiple_lines_in_one_push() {
95 let mut b = LineBuffer::new();
96 let lines = b.push(b"a\nb\nc\n");
97 assert_eq!(lines, vec!["a", "b", "c"]);
98 }
99
100 #[test]
101 fn crlf_is_stripped() {
102 let mut b = LineBuffer::new();
103 let lines = b.push(b"data: foo\r\n");
104 assert_eq!(lines, vec!["data: foo".to_string()]);
105 }
106
107 #[test]
108 fn utf8_word_split_across_chunks_survives() {
109 // "à rebours" — accent is two bytes (0xC3 0xA0); the cut is between
110 // valid char boundaries here, between "à reb" and "ours\n".
111 let mut b = LineBuffer::new();
112 assert!(b.push("à reb".as_bytes()).is_empty());
113 let lines = b.push(b"ours\n");
114 assert_eq!(lines, vec!["à rebours".to_string()]);
115 }
116
117 #[test]
118 fn trailing_partial_stays_until_completed() {
119 let mut b = LineBuffer::new();
120 assert!(b.push(b"partial").is_empty());
121 assert_eq!(b.take_remaining(), Some("partial".to_string()));
122 assert!(b.take_remaining().is_none());
123 }
124}