gproxy_protocol/
stream.rs1pub fn sse_to_ndjson_stream(sse: &str) -> String {
6 let mut rewriter = SseToNdjsonRewriter::default();
7 let mut out = Vec::new();
8 out.extend_from_slice(rewriter.push_chunk(sse.as_bytes()).as_slice());
9 out.extend_from_slice(rewriter.finish().as_slice());
10 String::from_utf8_lossy(out.as_slice()).into_owned()
11}
12
13#[derive(Debug, Default, Clone, PartialEq, Eq)]
18pub struct SseToNdjsonRewriter {
19 pending: Vec<u8>,
20}
21
22impl SseToNdjsonRewriter {
23 pub fn push_chunk(&mut self, chunk: &[u8]) -> Vec<u8> {
26 self.pending.extend_from_slice(chunk);
27 let mut out = Vec::new();
28
29 while let Some(newline_index) = self.pending.iter().position(|byte| *byte == b'\n') {
30 let mut line = self.pending.drain(..=newline_index).collect::<Vec<u8>>();
31 if line.last().copied() == Some(b'\n') {
32 line.pop();
33 }
34 self.process_line(line.as_slice(), &mut out);
35 }
36
37 out
38 }
39
40 pub fn finish(&mut self) -> Vec<u8> {
42 if self.pending.is_empty() {
43 return Vec::new();
44 }
45 let line = std::mem::take(&mut self.pending);
46 let mut out = Vec::new();
47 self.process_line(line.as_slice(), &mut out);
48 out
49 }
50
51 fn process_line(&self, raw_line: &[u8], out: &mut Vec<u8>) {
52 let line = raw_line.strip_suffix(b"\r").unwrap_or(raw_line);
53 let Some(payload) = line.strip_prefix(b"data:") else {
54 return;
55 };
56
57 let payload = trim_ascii(payload);
58 if payload.is_empty() || payload == b"[DONE]" {
59 return;
60 }
61
62 out.extend_from_slice(payload);
63 out.push(b'\n');
64 }
65}
66
67fn trim_ascii(input: &[u8]) -> &[u8] {
68 let start = input
69 .iter()
70 .position(|byte| !byte.is_ascii_whitespace())
71 .unwrap_or(input.len());
72 let end = input
73 .iter()
74 .rposition(|byte| !byte.is_ascii_whitespace())
75 .map(|index| index + 1)
76 .unwrap_or(start);
77 &input[start..end]
78}
79
80pub fn drain_lines(pending: &mut Vec<u8>, out: &mut Vec<Vec<u8>>) {
87 while let Some(pos) = pending.iter().position(|byte| *byte == b'\n') {
88 let mut line = pending.drain(..=pos).collect::<Vec<u8>>();
89 if line.last().copied() == Some(b'\n') {
90 line.pop();
91 }
92 if line.last().copied() == Some(b'\r') {
93 line.pop();
94 }
95 if !line.is_empty() {
96 out.push(line);
97 }
98 }
99}
100
101pub fn split_lines(bytes: &[u8], out: &mut Vec<Vec<u8>>) {
105 if bytes.is_empty() {
106 return;
107 }
108 let mut pending = bytes.to_vec();
109 drain_lines(&mut pending, out);
110 if !pending.is_empty() {
111 out.push(pending);
112 }
113}
114
115pub fn split_lines_owned(bytes: &[u8]) -> Vec<Vec<u8>> {
117 let mut out = Vec::new();
118 split_lines(bytes, &mut out);
119 out
120}