Skip to main content

rust_web_server/sse/
mod.rs

1//! Server-Sent Events (SSE) response builder.
2//!
3//! [`Sse`] assembles a complete `text/event-stream` response body from a
4//! sequence of [`SseEvent`] values. The full body is buffered before any bytes
5//! are written to the socket, which suits pre-known event sequences (batch
6//! updates, enumerable progress steps, static push payloads).
7//!
8//! For live streaming where events arrive over time (AI token output, real-time
9//! sensor data), write `text/event-stream` headers and the raw event lines
10//! directly to the TCP stream in a custom accept loop — the same approach used
11//! for WebSocket connections.
12//!
13//! # Wire format
14//!
15//! Each event is a block of `field: value\n` lines terminated by a blank line:
16//!
17//! ```text
18//! id: 1
19//! event: update
20//! data: {"count":42}
21//!
22//! ```
23//!
24//! Lines with no field name (starting with `:`) are comments and are ignored
25//! by clients; they are used here as keep-alive pings.
26//!
27//! # Example
28//!
29//! ```rust,no_run
30//! use rust_web_server::sse::{Sse, SseEvent};
31//!
32//! let response = Sse::new()
33//!     .event("connected", "ready")
34//!     .push(SseEvent::data(r#"{"count":1}"#).id("1").event_type("update"))
35//!     .push(SseEvent::data(r#"{"count":2}"#).id("2").event_type("update"))
36//!     .comment("keep-alive")
37//!     .into_response();
38//! ```
39
40#[cfg(test)]
41mod tests;
42
43use crate::core::New;
44use crate::header::Header;
45use crate::range::Range;
46use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
47
48// ── SseEvent ─────────────────────────────────────────────────────────────────
49
50/// A single Server-Sent Event.
51///
52/// Build with [`SseEvent::data`] and chain the optional setter methods.
53/// Use [`Sse::push`] to add it to a response.
54pub struct SseEvent {
55    id: Option<String>,
56    event: Option<String>,
57    data: String,
58    retry_ms: Option<u32>,
59}
60
61impl SseEvent {
62    /// Create an event with the given data payload. Multi-line strings produce
63    /// multiple `data:` lines, which the client concatenates with `\n`.
64    pub fn data(data: impl Into<String>) -> Self {
65        SseEvent { id: None, event: None, data: data.into(), retry_ms: None }
66    }
67
68    /// Set the `id` field. Clients use this to resume from the last event
69    /// seen after a reconnection (`Last-Event-ID` request header).
70    pub fn id(mut self, id: impl Into<String>) -> Self {
71        self.id = Some(id.into());
72        self
73    }
74
75    /// Set the event type (`event` field). Clients listen for specific types
76    /// with `addEventListener("type", handler)`. Defaults to `"message"`.
77    pub fn event_type(mut self, event: impl Into<String>) -> Self {
78        self.event = Some(event.into());
79        self
80    }
81
82    /// Set the `retry` reconnection delay in milliseconds. Overrides the
83    /// client's default reconnect interval.
84    pub fn retry(mut self, ms: u32) -> Self {
85        self.retry_ms = Some(ms);
86        self
87    }
88
89    /// Encode this event as SSE wire-format bytes.
90    pub fn encode(&self) -> Vec<u8> {
91        let mut out = String::new();
92        if let Some(id) = &self.id {
93            out.push_str("id: ");
94            out.push_str(id);
95            out.push('\n');
96        }
97        if let Some(event) = &self.event {
98            out.push_str("event: ");
99            out.push_str(event);
100            out.push('\n');
101        }
102        for line in self.data.lines() {
103            out.push_str("data: ");
104            out.push_str(line);
105            out.push('\n');
106        }
107        if self.data.is_empty() {
108            out.push_str("data: \n");
109        }
110        if let Some(ms) = self.retry_ms {
111            out.push_str("retry: ");
112            out.push_str(&ms.to_string());
113            out.push('\n');
114        }
115        out.push('\n');
116        out.into_bytes()
117    }
118}
119
120// ── Sse ──────────────────────────────────────────────────────────────────────
121
122/// Builder for a buffered Server-Sent Events response.
123///
124/// Call [`Sse::into_response`] to obtain a [`Response`] with:
125/// - `200 OK`
126/// - `Content-Type: text/event-stream`
127/// - `Cache-Control: no-cache`
128/// - `X-Accel-Buffering: no` (disables nginx proxy buffering)
129///
130/// # Example
131///
132/// ```rust,no_run
133/// use rust_web_server::sse::Sse;
134///
135/// let response = Sse::new()
136///     .event("open", "")
137///     .data(r#"{"msg":"hello"}"#)
138///     .retry(5000)
139///     .into_response();
140/// ```
141pub struct Sse {
142    chunks: Vec<Vec<u8>>,
143}
144
145impl Sse {
146    /// Create an empty SSE response builder.
147    pub fn new() -> Self {
148        Sse { chunks: Vec::new() }
149    }
150
151    /// Append a named event with `data`.
152    ///
153    /// `event_type` becomes the `event:` field; `data` becomes the `data:` field.
154    pub fn event(mut self, event_type: &str, data: &str) -> Self {
155        self.chunks.push(
156            SseEvent::data(data).event_type(event_type).encode(),
157        );
158        self
159    }
160
161    /// Append a data-only event (no `event:` type field; clients receive it as
162    /// `"message"`).
163    pub fn data(mut self, data: &str) -> Self {
164        self.chunks.push(SseEvent::data(data).encode());
165        self
166    }
167
168    /// Append a fully configured [`SseEvent`] (id, type, retry, data).
169    pub fn push(mut self, event: SseEvent) -> Self {
170        self.chunks.push(event.encode());
171        self
172    }
173
174    /// Append a `retry:` directive that tells the client how many milliseconds
175    /// to wait before reconnecting after the connection is lost.
176    pub fn retry(mut self, ms: u32) -> Self {
177        self.chunks.push(format!("retry: {}\n\n", ms).into_bytes());
178        self
179    }
180
181    /// Append an SSE comment line (starts with `:`). Used as a keep-alive ping
182    /// or annotation; browsers ignore comment content.
183    pub fn comment(mut self, text: &str) -> Self {
184        self.chunks.push(format!(": {}\n\n", text).into_bytes());
185        self
186    }
187
188    /// Finalise the builder and return a [`Response`] with the correct SSE
189    /// headers and the accumulated event body.
190    pub fn into_response(self) -> Response {
191        let body: Vec<u8> = self.chunks.into_iter().flatten().collect();
192
193        let mut response = Response::new();
194        response.status_code = *STATUS_CODE_REASON_PHRASE.n200_ok.status_code;
195        response.reason_phrase =
196            STATUS_CODE_REASON_PHRASE.n200_ok.reason_phrase.to_string();
197        response.headers.push(Header {
198            name: Header::_CACHE_CONTROL.to_string(),
199            value: "no-cache".to_string(),
200        });
201        response.headers.push(Header {
202            name: "X-Accel-Buffering".to_string(),
203            value: "no".to_string(),
204        });
205        response.content_range_list = vec![Range::get_content_range(
206            body,
207            "text/event-stream".to_string(),
208        )];
209        response
210    }
211}