Skip to main content

durable_streams_server/protocol/
sse.rs

1// SSE (Server-Sent Events) protocol types and helpers.
2//
3// Defines the control event payload structure and event builders
4// for the SSE read mode (PROTOCOL.md §5.8).
5//
6// We format SSE frames manually (without axum's Event::data) to
7// produce `data:<value>` without the optional space after the colon,
8// which the conformance test suite's raw-text assertions require.
9
10use base64::Engine;
11use bytes::Bytes;
12use serde::Serialize;
13
14/// JSON payload for `event: control` SSE events.
15///
16/// Field names use camelCase per PROTOCOL.md §5.8.
17#[derive(Debug, Serialize)]
18#[serde(rename_all = "camelCase")]
19pub struct ControlPayload {
20    /// Current tail position (always present).
21    pub stream_next_offset: String,
22    /// Cursor for CDN collapsing (present when stream is open).
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub stream_cursor: Option<String>,
25    /// True when client has caught up with all available data.
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub up_to_date: Option<bool>,
28    /// True when stream is closed and all data has been sent.
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub stream_closed: Option<bool>,
31}
32
33/// Format an `event: control` SSE frame from a typed payload.
34///
35/// Returns a raw SSE frame string ready to be written to the wire.
36///
37/// # Panics
38///
39/// Panics if `ControlPayload` fails to serialize, which should never happen
40/// since all fields are simple strings/booleans.
41#[must_use]
42pub fn format_control_frame(payload: &ControlPayload) -> String {
43    let json =
44        serde_json::to_string(payload).expect("ControlPayload serialization should not fail");
45    format!("event: control\ndata:{json}\n\n")
46}
47
48/// Format an `event: data` SSE frame for a single stored message.
49///
50/// If the stream content type is binary (per `is_binary_content_type`),
51/// the data is base64-encoded. For JSON content types, the message is
52/// wrapped in a JSON array. Otherwise it's sent as UTF-8 text.
53///
54/// For JSON streams with multiple messages from a single read, prefer
55/// `format_data_frames` which batches them into a single SSE event.
56///
57/// Multi-line data is split across multiple `data:` lines per the SSE spec.
58/// All line ending forms (`\r\n`, `\r`, `\n`) are treated as boundaries to
59/// prevent CRLF injection attacks — each segment gets its own `data:` prefix.
60#[must_use]
61pub fn format_data_frame(data: &Bytes, is_binary: bool, is_json: bool) -> String {
62    let text = if is_binary {
63        base64::engine::general_purpose::STANDARD.encode(data)
64    } else if is_json {
65        // Wrap JSON data in an array per conformance spec
66        let raw = String::from_utf8_lossy(data);
67        format!("[{raw}]")
68    } else {
69        // Safety: for text/* content types the data is expected to be
70        // valid UTF-8. If it's not, we use lossy conversion which
71        // replaces invalid sequences with the replacement character.
72        String::from_utf8_lossy(data).into_owned()
73    };
74
75    format_raw_data_frame(&text)
76}
77
78/// Format `event: data` SSE frames for a batch of messages from a single read.
79///
80/// For JSON streams, all messages are batched into a single `event: data` frame
81/// containing one JSON array — maintaining the 1:1 data-to-control relationship
82/// the spec requires (PROTOCOL.md §5.8). Without batching, the client would
83/// concatenate separate `[...][...]` arrays, producing invalid JSON.
84///
85/// For binary/text streams, each message gets its own `event: data` frame.
86///
87/// Returns an empty string if `messages` is empty.
88#[must_use]
89pub fn format_data_frames(messages: &[Bytes], is_binary: bool, is_json: bool) -> String {
90    if messages.is_empty() {
91        return String::new();
92    }
93
94    if is_json {
95        // Batch all messages into a single JSON array in one event: data frame
96        let mut array_content = String::new();
97        for (i, msg) in messages.iter().enumerate() {
98            if i > 0 {
99                array_content.push(',');
100            }
101            let raw = String::from_utf8_lossy(msg);
102            array_content.push_str(&raw);
103        }
104        let text = format!("[{array_content}]");
105        format_raw_data_frame(&text)
106    } else {
107        let mut result = String::new();
108        for msg in messages {
109            result.push_str(&format_data_frame(msg, is_binary, false));
110        }
111        result
112    }
113}
114
115/// Format a raw text payload as an `event: data` SSE frame.
116///
117/// Splits on all line-ending forms (`\r\n`, `\r`, `\n`) to prevent CRLF
118/// injection — each segment gets its own `data:` prefix.
119fn format_raw_data_frame(text: &str) -> String {
120    let mut frame = String::from("event: data\n");
121    for line in split_lines(text) {
122        frame.push_str("data:");
123        frame.push_str(line);
124        frame.push('\n');
125    }
126    frame.push('\n');
127    frame
128}
129
130/// Split text on all line-ending forms: `\r\n`, `\r`, and `\n`.
131///
132/// This is stricter than Rust's `str::lines()` which doesn't handle
133/// bare `\r` as a line terminator.
134fn split_lines(text: &str) -> Vec<&str> {
135    let mut lines = Vec::new();
136    let mut start = 0;
137    let bytes = text.as_bytes();
138    let mut i = 0;
139    while i < bytes.len() {
140        if bytes[i] == b'\r' {
141            lines.push(&text[start..i]);
142            // Consume \r\n as a single line ending
143            if i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
144                i += 2;
145            } else {
146                i += 1;
147            }
148            start = i;
149        } else if bytes[i] == b'\n' {
150            lines.push(&text[start..i]);
151            i += 1;
152            start = i;
153        } else {
154            i += 1;
155        }
156    }
157    // Remaining text after last line ending
158    lines.push(&text[start..]);
159    lines
160}
161
162/// Format an SSE keep-alive comment frame.
163#[must_use]
164pub fn format_keepalive_frame() -> &'static str {
165    ":\n\n"
166}
167
168/// Determine if a content type requires base64 encoding in SSE mode.
169///
170/// Returns `false` for `text/*` and `application/json` (UTF-8 safe).
171/// Returns `true` for everything else (binary).
172///
173/// Per PROTOCOL.md §5.8: "For streams with content-type: text/* or
174/// application/json, data events carry UTF-8 text directly. For streams
175/// with any other content-type (binary streams), servers MUST automatically
176/// base64-encode data events."
177#[must_use]
178pub fn is_binary_content_type(ct: &str) -> bool {
179    if ct.starts_with("text/") {
180        return false;
181    }
182    if ct == "application/json" {
183        return false;
184    }
185    true
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_is_binary_text_types() {
194        assert!(!is_binary_content_type("text/plain"));
195        assert!(!is_binary_content_type("text/html"));
196        assert!(!is_binary_content_type("text/csv"));
197    }
198
199    #[test]
200    fn test_is_binary_json() {
201        assert!(!is_binary_content_type("application/json"));
202    }
203
204    #[test]
205    fn test_is_binary_octet_stream() {
206        assert!(is_binary_content_type("application/octet-stream"));
207    }
208
209    #[test]
210    fn test_is_binary_protobuf() {
211        assert!(is_binary_content_type("application/x-protobuf"));
212    }
213
214    #[test]
215    fn test_is_binary_ndjson() {
216        // application/ndjson is not text/* or application/json, so it's binary
217        assert!(is_binary_content_type("application/ndjson"));
218    }
219
220    #[test]
221    fn test_control_payload_serializes_camel_case() {
222        let payload = ControlPayload {
223            stream_next_offset: "abc_123".to_string(),
224            stream_cursor: Some("cursor1".to_string()),
225            up_to_date: Some(true),
226            stream_closed: None,
227        };
228        let json = serde_json::to_string(&payload).unwrap();
229        assert!(json.contains("\"streamNextOffset\""));
230        assert!(json.contains("\"streamCursor\""));
231        assert!(json.contains("\"upToDate\""));
232        assert!(!json.contains("\"streamClosed\""));
233    }
234
235    #[test]
236    fn test_control_payload_skips_none_fields() {
237        let payload = ControlPayload {
238            stream_next_offset: "offset1".to_string(),
239            stream_cursor: None,
240            up_to_date: None,
241            stream_closed: Some(true),
242        };
243        let json = serde_json::to_string(&payload).unwrap();
244        assert!(json.contains("\"streamNextOffset\""));
245        assert!(!json.contains("\"streamCursor\""));
246        assert!(!json.contains("\"upToDate\""));
247        assert!(json.contains("\"streamClosed\":true"));
248    }
249
250    #[test]
251    fn test_format_data_frame_text() {
252        let data = Bytes::from("hello world");
253        let frame = format_data_frame(&data, false, false);
254        assert_eq!(frame, "event: data\ndata:hello world\n\n");
255    }
256
257    #[test]
258    fn test_format_data_frame_multiline_lf() {
259        let data = Bytes::from("line1\nline2\nline3");
260        let frame = format_data_frame(&data, false, false);
261        assert!(frame.contains("data:line1\n"));
262        assert!(frame.contains("data:line2\n"));
263        assert!(frame.contains("data:line3\n"));
264    }
265
266    #[test]
267    fn test_format_data_frame_multiline_crlf() {
268        let data = Bytes::from("line1\r\nline2\r\nline3");
269        let frame = format_data_frame(&data, false, false);
270        assert!(frame.contains("data:line1\n"));
271        assert!(frame.contains("data:line2\n"));
272        assert!(frame.contains("data:line3\n"));
273        // No \r should appear in the output
274        assert!(!frame.contains('\r'));
275    }
276
277    #[test]
278    fn test_format_data_frame_multiline_cr() {
279        let data = Bytes::from("line1\rline2\rline3");
280        let frame = format_data_frame(&data, false, false);
281        assert!(frame.contains("data:line1\n"));
282        assert!(frame.contains("data:line2\n"));
283        assert!(frame.contains("data:line3\n"));
284        assert!(!frame.contains('\r'));
285    }
286
287    #[test]
288    fn test_format_data_frame_json() {
289        let data = Bytes::from(r#"{"id":1}"#);
290        let frame = format_data_frame(&data, false, true);
291        assert!(frame.contains(r#"data:[{"id":1}]"#));
292    }
293
294    #[test]
295    fn test_format_data_frame_binary() {
296        let data = Bytes::from(vec![0x01, 0x02, 0x03]);
297        let frame = format_data_frame(&data, true, false);
298        assert!(frame.contains("data:AQID"));
299    }
300
301    #[test]
302    fn test_format_control_frame() {
303        let payload = ControlPayload {
304            stream_next_offset: "test".to_string(),
305            stream_cursor: None,
306            up_to_date: Some(true),
307            stream_closed: None,
308        };
309        let frame = format_control_frame(&payload);
310        assert!(frame.starts_with("event: control\n"));
311        assert!(frame.contains("data:"));
312        assert!(frame.ends_with("\n\n"));
313    }
314
315    #[test]
316    fn test_split_lines_lf() {
317        assert_eq!(split_lines("a\nb\nc"), vec!["a", "b", "c"]);
318    }
319
320    #[test]
321    fn test_split_lines_crlf() {
322        assert_eq!(split_lines("a\r\nb\r\nc"), vec!["a", "b", "c"]);
323    }
324
325    #[test]
326    fn test_split_lines_cr() {
327        assert_eq!(split_lines("a\rb\rc"), vec!["a", "b", "c"]);
328    }
329
330    #[test]
331    fn test_split_lines_mixed() {
332        assert_eq!(split_lines("a\nb\rc\r\nd"), vec!["a", "b", "c", "d"]);
333    }
334
335    #[test]
336    fn test_split_lines_empty_segments() {
337        // Consecutive line endings produce empty segments
338        assert_eq!(split_lines("a\n\nb"), vec!["a", "", "b"]);
339        assert_eq!(split_lines("a\r\rb"), vec!["a", "", "b"]);
340    }
341
342    #[test]
343    fn test_format_data_frames_json_batches_into_single_event() {
344        let messages = vec![
345            Bytes::from(r#"{"id":1}"#),
346            Bytes::from(r#"{"id":2}"#),
347            Bytes::from(r#"{"id":3}"#),
348        ];
349        let frame = format_data_frames(&messages, false, true);
350        // Should produce exactly one `event: data` with all messages in one array
351        assert_eq!(
352            frame.matches("event: data").count(),
353            1,
354            "JSON batch should produce exactly one SSE data event"
355        );
356        assert!(
357            frame.contains(r#"data:[{"id":1},{"id":2},{"id":3}]"#),
358            "JSON batch should contain all messages in one array"
359        );
360    }
361
362    #[test]
363    fn test_format_data_frames_json_single_message() {
364        let messages = vec![Bytes::from(r#"{"id":1}"#)];
365        let frame = format_data_frames(&messages, false, true);
366        assert_eq!(frame.matches("event: data").count(), 1);
367        assert!(frame.contains(r#"data:[{"id":1}]"#));
368    }
369
370    #[test]
371    fn test_format_data_frames_text_emits_per_message() {
372        let messages = vec![Bytes::from("hello"), Bytes::from("world")];
373        let frame = format_data_frames(&messages, false, false);
374        // Non-JSON should produce one event: data per message
375        assert_eq!(
376            frame.matches("event: data").count(),
377            2,
378            "Text batch should produce one SSE data event per message"
379        );
380    }
381
382    #[test]
383    fn test_format_data_frames_empty() {
384        let result = format_data_frames(&[], false, true);
385        assert!(result.is_empty());
386    }
387}