Skip to main content

gproxy_protocol/
stream.rs

1/// Converts internal standard SSE payload (`data: ...\n\n`) into NDJSON.
2///
3/// Gemini transport can consume NDJSON for final streaming output while
4/// internal stream handling remains standard SSE.
5pub fn sse_to_ndjson_stream(sse: &str) -> String {
6    let mut rewriter = SseToNdjsonRewriter::default();
7    let mut out = Vec::new();
8    out.extend_from_slice(rewriter.push_chunk(sse.as_bytes()).as_slice());
9    out.extend_from_slice(rewriter.finish().as_slice());
10    String::from_utf8_lossy(out.as_slice()).into_owned()
11}
12
13/// Incremental SSE -> NDJSON converter.
14///
15/// Feed bytes via [`SseToNdjsonRewriter::push_chunk`], then call
16/// [`SseToNdjsonRewriter::finish`] when upstream ends.
17#[derive(Debug, Default, Clone, PartialEq, Eq)]
18pub struct SseToNdjsonRewriter {
19    pending: Vec<u8>,
20}
21
22impl SseToNdjsonRewriter {
23    /// Pushes one upstream chunk and returns converted NDJSON bytes ready
24    /// for downstream emission.
25    pub fn push_chunk(&mut self, chunk: &[u8]) -> Vec<u8> {
26        self.pending.extend_from_slice(chunk);
27        let mut out = Vec::new();
28
29        while let Some(newline_index) = self.pending.iter().position(|byte| *byte == b'\n') {
30            let mut line = self.pending.drain(..=newline_index).collect::<Vec<u8>>();
31            if line.last().copied() == Some(b'\n') {
32                line.pop();
33            }
34            self.process_line(line.as_slice(), &mut out);
35        }
36
37        out
38    }
39
40    /// Flushes trailing buffered bytes (if any) at stream end.
41    pub fn finish(&mut self) -> Vec<u8> {
42        if self.pending.is_empty() {
43            return Vec::new();
44        }
45        let line = std::mem::take(&mut self.pending);
46        let mut out = Vec::new();
47        self.process_line(line.as_slice(), &mut out);
48        out
49    }
50
51    fn process_line(&self, raw_line: &[u8], out: &mut Vec<u8>) {
52        let line = raw_line.strip_suffix(b"\r").unwrap_or(raw_line);
53        let Some(payload) = line.strip_prefix(b"data:") else {
54            return;
55        };
56
57        let payload = trim_ascii(payload);
58        if payload.is_empty() || payload == b"[DONE]" {
59            return;
60        }
61
62        out.extend_from_slice(payload);
63        out.push(b'\n');
64    }
65}
66
67fn trim_ascii(input: &[u8]) -> &[u8] {
68    let start = input
69        .iter()
70        .position(|byte| !byte.is_ascii_whitespace())
71        .unwrap_or(input.len());
72    let end = input
73        .iter()
74        .rposition(|byte| !byte.is_ascii_whitespace())
75        .map(|index| index + 1)
76        .unwrap_or(start);
77    &input[start..end]
78}
79
80// ---------------------------------------------------------------------------
81// Generic newline-delimited splitters
82// ---------------------------------------------------------------------------
83
84/// Drains complete `\n`-terminated lines from `pending` into `out`,
85/// stripping trailing `\r\n`.  Empty lines are skipped.
86pub fn drain_lines(pending: &mut Vec<u8>, out: &mut Vec<Vec<u8>>) {
87    while let Some(pos) = pending.iter().position(|byte| *byte == b'\n') {
88        let mut line = pending.drain(..=pos).collect::<Vec<u8>>();
89        if line.last().copied() == Some(b'\n') {
90            line.pop();
91        }
92        if line.last().copied() == Some(b'\r') {
93            line.pop();
94        }
95        if !line.is_empty() {
96            out.push(line);
97        }
98    }
99}
100
101/// Splits `bytes` into newline-delimited lines.  Complete lines go through
102/// [`drain_lines`]; any trailing bytes without a terminating `\n` are also
103/// emitted as a final element.
104pub fn split_lines(bytes: &[u8], out: &mut Vec<Vec<u8>>) {
105    if bytes.is_empty() {
106        return;
107    }
108    let mut pending = bytes.to_vec();
109    drain_lines(&mut pending, out);
110    if !pending.is_empty() {
111        out.push(pending);
112    }
113}
114
115/// Convenience wrapper that returns owned lines instead of appending to a vec.
116pub fn split_lines_owned(bytes: &[u8]) -> Vec<Vec<u8>> {
117    let mut out = Vec::new();
118    split_lines(bytes, &mut out);
119    out
120}