chat_core/transport/
sse.rs1#[derive(Default)]
8pub struct SseParser {
9 buffer: String,
10 current_data: String,
11 current_event_type: Option<String>,
12}
13
14impl SseParser {
15 pub fn push(&mut self, chunk: &[u8]) {
16 self.buffer.push_str(&String::from_utf8_lossy(chunk));
17 }
18
19 pub fn next_event(&mut self) -> Option<(String, String)> {
28 loop {
29 let newline_pos = self.buffer.find('\n')?;
30 let line = self.buffer[..newline_pos]
31 .trim_end_matches('\r')
32 .to_string();
33 self.buffer = self.buffer[newline_pos + 1..].to_string();
34
35 if line.is_empty() {
36 if !self.current_data.is_empty() {
37 let data = self.current_data.trim().to_string();
38 self.current_data.clear();
39 let event_type = self.current_event_type.take().unwrap_or_default();
40 if data != "[DONE]" {
41 return Some((event_type, data));
42 }
43 }
44 continue;
45 }
46
47 if let Some(event_type) = line.strip_prefix("event: ") {
48 if !self.current_data.is_empty() {
51 let data = self.current_data.trim().to_string();
52 self.current_data.clear();
53 let prev_type = self.current_event_type.take().unwrap_or_default();
54 self.current_event_type = Some(event_type.to_string());
55 if data != "[DONE]" {
56 return Some((prev_type, data));
57 }
58 } else {
59 self.current_event_type = Some(event_type.to_string());
60 }
61 } else if let Some(data) = line.strip_prefix("data: ") {
62 self.current_data.push_str(data);
63 self.current_data.push('\n');
64 } else if let Some(data) = line.strip_prefix("data:") {
65 self.current_data.push_str(data);
66 self.current_data.push('\n');
67 }
68 }
69 }
70}