durable_streams_server/protocol/
sse.rs1use base64::Engine;
11use bytes::Bytes;
12use serde::Serialize;
13
14#[derive(Debug, Serialize)]
18#[serde(rename_all = "camelCase")]
19pub struct ControlPayload {
20 pub stream_next_offset: String,
22 #[serde(skip_serializing_if = "Option::is_none")]
24 pub stream_cursor: Option<String>,
25 #[serde(skip_serializing_if = "Option::is_none")]
27 pub up_to_date: Option<bool>,
28 #[serde(skip_serializing_if = "Option::is_none")]
30 pub stream_closed: Option<bool>,
31}
32
33#[must_use]
42pub fn format_control_frame(payload: &ControlPayload) -> String {
43 let json =
44 serde_json::to_string(payload).expect("ControlPayload serialization should not fail");
45 format!("event: control\ndata:{json}\n\n")
46}
47
48#[must_use]
61pub fn format_data_frame(data: &Bytes, is_binary: bool, is_json: bool) -> String {
62 let text = if is_binary {
63 base64::engine::general_purpose::STANDARD.encode(data)
64 } else if is_json {
65 let raw = String::from_utf8_lossy(data);
67 format!("[{raw}]")
68 } else {
69 String::from_utf8_lossy(data).into_owned()
73 };
74
75 format_raw_data_frame(&text)
76}
77
78#[must_use]
89pub fn format_data_frames(messages: &[Bytes], is_binary: bool, is_json: bool) -> String {
90 if messages.is_empty() {
91 return String::new();
92 }
93
94 if is_json {
95 let mut array_content = String::new();
97 for (i, msg) in messages.iter().enumerate() {
98 if i > 0 {
99 array_content.push(',');
100 }
101 let raw = String::from_utf8_lossy(msg);
102 array_content.push_str(&raw);
103 }
104 let text = format!("[{array_content}]");
105 format_raw_data_frame(&text)
106 } else {
107 let mut result = String::new();
108 for msg in messages {
109 result.push_str(&format_data_frame(msg, is_binary, false));
110 }
111 result
112 }
113}
114
115fn format_raw_data_frame(text: &str) -> String {
120 let mut frame = String::from("event: data\n");
121 for line in split_lines(text) {
122 frame.push_str("data:");
123 frame.push_str(line);
124 frame.push('\n');
125 }
126 frame.push('\n');
127 frame
128}
129
130fn split_lines(text: &str) -> Vec<&str> {
135 let mut lines = Vec::new();
136 let mut start = 0;
137 let bytes = text.as_bytes();
138 let mut i = 0;
139 while i < bytes.len() {
140 if bytes[i] == b'\r' {
141 lines.push(&text[start..i]);
142 if i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
144 i += 2;
145 } else {
146 i += 1;
147 }
148 start = i;
149 } else if bytes[i] == b'\n' {
150 lines.push(&text[start..i]);
151 i += 1;
152 start = i;
153 } else {
154 i += 1;
155 }
156 }
157 lines.push(&text[start..]);
159 lines
160}
161
162#[must_use]
164pub fn format_keepalive_frame() -> &'static str {
165 ":\n\n"
166}
167
168#[must_use]
178pub fn is_binary_content_type(ct: &str) -> bool {
179 if ct.starts_with("text/") {
180 return false;
181 }
182 if ct == "application/json" {
183 return false;
184 }
185 true
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn test_is_binary_text_types() {
194 assert!(!is_binary_content_type("text/plain"));
195 assert!(!is_binary_content_type("text/html"));
196 assert!(!is_binary_content_type("text/csv"));
197 }
198
199 #[test]
200 fn test_is_binary_json() {
201 assert!(!is_binary_content_type("application/json"));
202 }
203
204 #[test]
205 fn test_is_binary_octet_stream() {
206 assert!(is_binary_content_type("application/octet-stream"));
207 }
208
209 #[test]
210 fn test_is_binary_protobuf() {
211 assert!(is_binary_content_type("application/x-protobuf"));
212 }
213
214 #[test]
215 fn test_is_binary_ndjson() {
216 assert!(is_binary_content_type("application/ndjson"));
218 }
219
220 #[test]
221 fn test_control_payload_serializes_camel_case() {
222 let payload = ControlPayload {
223 stream_next_offset: "abc_123".to_string(),
224 stream_cursor: Some("cursor1".to_string()),
225 up_to_date: Some(true),
226 stream_closed: None,
227 };
228 let json = serde_json::to_string(&payload).unwrap();
229 assert!(json.contains("\"streamNextOffset\""));
230 assert!(json.contains("\"streamCursor\""));
231 assert!(json.contains("\"upToDate\""));
232 assert!(!json.contains("\"streamClosed\""));
233 }
234
235 #[test]
236 fn test_control_payload_skips_none_fields() {
237 let payload = ControlPayload {
238 stream_next_offset: "offset1".to_string(),
239 stream_cursor: None,
240 up_to_date: None,
241 stream_closed: Some(true),
242 };
243 let json = serde_json::to_string(&payload).unwrap();
244 assert!(json.contains("\"streamNextOffset\""));
245 assert!(!json.contains("\"streamCursor\""));
246 assert!(!json.contains("\"upToDate\""));
247 assert!(json.contains("\"streamClosed\":true"));
248 }
249
250 #[test]
251 fn test_format_data_frame_text() {
252 let data = Bytes::from("hello world");
253 let frame = format_data_frame(&data, false, false);
254 assert_eq!(frame, "event: data\ndata:hello world\n\n");
255 }
256
257 #[test]
258 fn test_format_data_frame_multiline_lf() {
259 let data = Bytes::from("line1\nline2\nline3");
260 let frame = format_data_frame(&data, false, false);
261 assert!(frame.contains("data:line1\n"));
262 assert!(frame.contains("data:line2\n"));
263 assert!(frame.contains("data:line3\n"));
264 }
265
266 #[test]
267 fn test_format_data_frame_multiline_crlf() {
268 let data = Bytes::from("line1\r\nline2\r\nline3");
269 let frame = format_data_frame(&data, false, false);
270 assert!(frame.contains("data:line1\n"));
271 assert!(frame.contains("data:line2\n"));
272 assert!(frame.contains("data:line3\n"));
273 assert!(!frame.contains('\r'));
275 }
276
277 #[test]
278 fn test_format_data_frame_multiline_cr() {
279 let data = Bytes::from("line1\rline2\rline3");
280 let frame = format_data_frame(&data, false, false);
281 assert!(frame.contains("data:line1\n"));
282 assert!(frame.contains("data:line2\n"));
283 assert!(frame.contains("data:line3\n"));
284 assert!(!frame.contains('\r'));
285 }
286
287 #[test]
288 fn test_format_data_frame_json() {
289 let data = Bytes::from(r#"{"id":1}"#);
290 let frame = format_data_frame(&data, false, true);
291 assert!(frame.contains(r#"data:[{"id":1}]"#));
292 }
293
294 #[test]
295 fn test_format_data_frame_binary() {
296 let data = Bytes::from(vec![0x01, 0x02, 0x03]);
297 let frame = format_data_frame(&data, true, false);
298 assert!(frame.contains("data:AQID"));
299 }
300
301 #[test]
302 fn test_format_control_frame() {
303 let payload = ControlPayload {
304 stream_next_offset: "test".to_string(),
305 stream_cursor: None,
306 up_to_date: Some(true),
307 stream_closed: None,
308 };
309 let frame = format_control_frame(&payload);
310 assert!(frame.starts_with("event: control\n"));
311 assert!(frame.contains("data:"));
312 assert!(frame.ends_with("\n\n"));
313 }
314
315 #[test]
316 fn test_split_lines_lf() {
317 assert_eq!(split_lines("a\nb\nc"), vec!["a", "b", "c"]);
318 }
319
320 #[test]
321 fn test_split_lines_crlf() {
322 assert_eq!(split_lines("a\r\nb\r\nc"), vec!["a", "b", "c"]);
323 }
324
325 #[test]
326 fn test_split_lines_cr() {
327 assert_eq!(split_lines("a\rb\rc"), vec!["a", "b", "c"]);
328 }
329
330 #[test]
331 fn test_split_lines_mixed() {
332 assert_eq!(split_lines("a\nb\rc\r\nd"), vec!["a", "b", "c", "d"]);
333 }
334
335 #[test]
336 fn test_split_lines_empty_segments() {
337 assert_eq!(split_lines("a\n\nb"), vec!["a", "", "b"]);
339 assert_eq!(split_lines("a\r\rb"), vec!["a", "", "b"]);
340 }
341
342 #[test]
343 fn test_format_data_frames_json_batches_into_single_event() {
344 let messages = vec![
345 Bytes::from(r#"{"id":1}"#),
346 Bytes::from(r#"{"id":2}"#),
347 Bytes::from(r#"{"id":3}"#),
348 ];
349 let frame = format_data_frames(&messages, false, true);
350 assert_eq!(
352 frame.matches("event: data").count(),
353 1,
354 "JSON batch should produce exactly one SSE data event"
355 );
356 assert!(
357 frame.contains(r#"data:[{"id":1},{"id":2},{"id":3}]"#),
358 "JSON batch should contain all messages in one array"
359 );
360 }
361
362 #[test]
363 fn test_format_data_frames_json_single_message() {
364 let messages = vec![Bytes::from(r#"{"id":1}"#)];
365 let frame = format_data_frames(&messages, false, true);
366 assert_eq!(frame.matches("event: data").count(), 1);
367 assert!(frame.contains(r#"data:[{"id":1}]"#));
368 }
369
370 #[test]
371 fn test_format_data_frames_text_emits_per_message() {
372 let messages = vec![Bytes::from("hello"), Bytes::from("world")];
373 let frame = format_data_frames(&messages, false, false);
374 assert_eq!(
376 frame.matches("event: data").count(),
377 2,
378 "Text batch should produce one SSE data event per message"
379 );
380 }
381
382 #[test]
383 fn test_format_data_frames_empty() {
384 let result = format_data_frames(&[], false, true);
385 assert!(result.is_empty());
386 }
387}