Skip to main content

chat_core/transport/
sse.rs

1/// Incremental SSE (Server-Sent Events) parser.
2///
3/// Feed raw byte chunks via [`push`](SseParser::push) and pull parsed
4/// events via [`next_event`](SseParser::next_event). Each event is
5/// returned as an `(event_type, data)` pair matching the
6/// [`Event`](super::types::Event) type alias used by transports.
7#[derive(Default)]
8pub struct SseParser {
9    buffer: String,
10    current_data: String,
11    current_event_type: Option<String>,
12}
13
14impl SseParser {
15    pub fn push(&mut self, chunk: &[u8]) {
16        self.buffer.push_str(&String::from_utf8_lossy(chunk));
17    }
18
19    /// Returns the next SSE event as `(event_type, data)`.
20    ///
21    /// - `event_type` comes from the `event:` line (empty string if absent).
22    /// - `data` is the concatenated `data:` line(s).
23    ///
24    /// An event is emitted when a `data:` line is followed by either a blank
25    /// line or a new `event:` line, making the parser robust for streams that
26    /// may not always send blank-line separators between events.
27    pub fn next_event(&mut self) -> Option<(String, String)> {
28        loop {
29            let newline_pos = self.buffer.find('\n')?;
30            let line = self.buffer[..newline_pos]
31                .trim_end_matches('\r')
32                .to_string();
33            self.buffer = self.buffer[newline_pos + 1..].to_string();
34
35            if line.is_empty() {
36                if !self.current_data.is_empty() {
37                    let data = self.current_data.trim().to_string();
38                    self.current_data.clear();
39                    let event_type = self.current_event_type.take().unwrap_or_default();
40                    if data != "[DONE]" {
41                        return Some((event_type, data));
42                    }
43                }
44                continue;
45            }
46
47            if let Some(event_type) = line.strip_prefix("event: ") {
48                // A new event: line while we still have buffered data means the
49                // previous event ended without a blank line — flush it first.
50                if !self.current_data.is_empty() {
51                    let data = self.current_data.trim().to_string();
52                    self.current_data.clear();
53                    let prev_type = self.current_event_type.take().unwrap_or_default();
54                    self.current_event_type = Some(event_type.to_string());
55                    if data != "[DONE]" {
56                        return Some((prev_type, data));
57                    }
58                } else {
59                    self.current_event_type = Some(event_type.to_string());
60                }
61            } else if let Some(data) = line.strip_prefix("data: ") {
62                self.current_data.push_str(data);
63                self.current_data.push('\n');
64            } else if let Some(data) = line.strip_prefix("data:") {
65                self.current_data.push_str(data);
66                self.current_data.push('\n');
67            }
68        }
69    }
70}