Skip to main content

sparrow/provider/
sse_buffer.rs

1//! Reassemble newline-delimited frames across arbitrary network chunks.
2//!
3//! Provider streams (SSE for OpenAI/Anthropic, NDJSON for Ollama) deliver
4//! "one frame per line". A single TCP/HTTP chunk can split a frame at any byte,
5//! which the previous code did not handle — it ran `text.lines()` on each
6//! chunk in isolation and silently dropped the trailing partial line, eating
7//! characters mid-stream (e.g. "à rebours" arrived as "àours" because the
8//! "reb" segment lived on the boundary).
9//!
10//! `LineBuffer` is the minimal fix: feed it chunk bytes, get back the complete
11//! lines that have accumulated. The unfinished tail stays inside the buffer
12//! until the next chunk completes it.
13
14/// Accumulates raw bytes and yields complete `\n`-terminated lines. Partial
15/// trailing data is preserved across calls. Tolerates `\r\n` and mixed UTF-8
16/// by accumulating bytes first and only converting to `&str` per complete line.
17#[derive(Default)]
18pub struct LineBuffer {
19    buf: Vec<u8>,
20}
21
22impl LineBuffer {
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    /// Append `bytes` and return every complete line that is now available.
28    /// The returned strings have any trailing `\r` stripped and do NOT include
29    /// the terminating `\n`.
30    pub fn push(&mut self, bytes: &[u8]) -> Vec<String> {
31        self.buf.extend_from_slice(bytes);
32        let mut out = Vec::new();
33        // Find each newline and split.
34        let mut start = 0usize;
35        let mut i = 0usize;
36        while i < self.buf.len() {
37            if self.buf[i] == b'\n' {
38                let end = if i > start && self.buf[i - 1] == b'\r' {
39                    i - 1
40                } else {
41                    i
42                };
43                // String::from_utf8_lossy here is intentional: if the LLM ever
44                // sends a UTF-8 character split across chunks we'd see U+FFFD,
45                // but at the byte level the entire char now sits on this side
46                // of the newline (because UTF-8 multibyte sequences never
47                // contain 0x0A), so this stays lossless for valid UTF-8.
48                out.push(String::from_utf8_lossy(&self.buf[start..end]).into_owned());
49                start = i + 1;
50            }
51            i += 1;
52        }
53        if start > 0 {
54            // Drop emitted bytes, keep the unfinished tail.
55            self.buf.drain(..start);
56        }
57        out
58    }
59
60    /// Drain anything left without requiring a final newline. Useful when the
61    /// upstream closes without flushing a trailing `\n`.
62    pub fn take_remaining(&mut self) -> Option<String> {
63        if self.buf.is_empty() {
64            return None;
65        }
66        let s = String::from_utf8_lossy(&self.buf).into_owned();
67        self.buf.clear();
68        if s.is_empty() { None } else { Some(s) }
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    #[test]
77    fn full_line_on_one_push_returns_it() {
78        let mut b = LineBuffer::new();
79        let lines = b.push(b"hello\n");
80        assert_eq!(lines, vec!["hello".to_string()]);
81        assert!(b.take_remaining().is_none());
82    }
83
84    #[test]
85    fn split_line_is_reassembled_across_pushes() {
86        let mut b = LineBuffer::new();
87        assert!(b.push(b"hel").is_empty());
88        assert!(b.push(b"lo wor").is_empty());
89        let lines = b.push(b"ld\n");
90        assert_eq!(lines, vec!["hello world".to_string()]);
91    }
92
93    #[test]
94    fn multiple_lines_in_one_push() {
95        let mut b = LineBuffer::new();
96        let lines = b.push(b"a\nb\nc\n");
97        assert_eq!(lines, vec!["a", "b", "c"]);
98    }
99
100    #[test]
101    fn crlf_is_stripped() {
102        let mut b = LineBuffer::new();
103        let lines = b.push(b"data: foo\r\n");
104        assert_eq!(lines, vec!["data: foo".to_string()]);
105    }
106
107    #[test]
108    fn utf8_word_split_across_chunks_survives() {
109        // "à rebours" — accent is two bytes (0xC3 0xA0); the cut is between
110        // valid char boundaries here, between "à reb" and "ours\n".
111        let mut b = LineBuffer::new();
112        assert!(b.push("à reb".as_bytes()).is_empty());
113        let lines = b.push(b"ours\n");
114        assert_eq!(lines, vec!["à rebours".to_string()]);
115    }
116
117    #[test]
118    fn trailing_partial_stays_until_completed() {
119        let mut b = LineBuffer::new();
120        assert!(b.push(b"partial").is_empty());
121        assert_eq!(b.take_remaining(), Some("partial".to_string()));
122        assert!(b.take_remaining().is_none());
123    }
124}