rust_web_server/sse/mod.rs
1//! Server-Sent Events (SSE) response builder.
2//!
3//! [`Sse`] assembles a complete `text/event-stream` response body from a
4//! sequence of [`SseEvent`] values. The full body is buffered before any bytes
5//! are written to the socket, which suits pre-known event sequences (batch
6//! updates, enumerable progress steps, static push payloads).
7//!
8//! For live streaming where events arrive over time (AI token output, real-time
9//! sensor data), write `text/event-stream` headers and the raw event lines
10//! directly to the TCP stream in a custom accept loop — the same approach used
11//! for WebSocket connections.
12//!
13//! # Wire format
14//!
15//! Each event is a block of `field: value\n` lines terminated by a blank line:
16//!
17//! ```text
18//! id: 1
19//! event: update
20//! data: {"count":42}
21//!
22//! ```
23//!
24//! Lines with no field name (starting with `:`) are comments and are ignored
25//! by clients; they are used here as keep-alive pings.
26//!
27//! # Example
28//!
29//! ```rust,no_run
30//! use rust_web_server::sse::{Sse, SseEvent};
31//!
32//! let response = Sse::new()
33//! .event("connected", "ready")
34//! .push(SseEvent::data(r#"{"count":1}"#).id("1").event_type("update"))
35//! .push(SseEvent::data(r#"{"count":2}"#).id("2").event_type("update"))
36//! .comment("keep-alive")
37//! .into_response();
38//! ```
39
40#[cfg(test)]
41mod tests;
42
43use crate::core::New;
44use crate::header::Header;
45use crate::range::Range;
46use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
47
48// ── SseEvent ─────────────────────────────────────────────────────────────────
49
50/// A single Server-Sent Event.
51///
52/// Build with [`SseEvent::data`] and chain the optional setter methods.
53/// Use [`Sse::push`] to add it to a response.
54pub struct SseEvent {
55 id: Option<String>,
56 event: Option<String>,
57 data: String,
58 retry_ms: Option<u32>,
59}
60
61impl SseEvent {
62 /// Create an event with the given data payload. Multi-line strings produce
63 /// multiple `data:` lines, which the client concatenates with `\n`.
64 pub fn data(data: impl Into<String>) -> Self {
65 SseEvent { id: None, event: None, data: data.into(), retry_ms: None }
66 }
67
68 /// Set the `id` field. Clients use this to resume from the last event
69 /// seen after a reconnection (`Last-Event-ID` request header).
70 pub fn id(mut self, id: impl Into<String>) -> Self {
71 self.id = Some(id.into());
72 self
73 }
74
75 /// Set the event type (`event` field). Clients listen for specific types
76 /// with `addEventListener("type", handler)`. Defaults to `"message"`.
77 pub fn event_type(mut self, event: impl Into<String>) -> Self {
78 self.event = Some(event.into());
79 self
80 }
81
82 /// Set the `retry` reconnection delay in milliseconds. Overrides the
83 /// client's default reconnect interval.
84 pub fn retry(mut self, ms: u32) -> Self {
85 self.retry_ms = Some(ms);
86 self
87 }
88
89 /// Encode this event as SSE wire-format bytes.
90 pub fn encode(&self) -> Vec<u8> {
91 let mut out = String::new();
92 if let Some(id) = &self.id {
93 out.push_str("id: ");
94 out.push_str(id);
95 out.push('\n');
96 }
97 if let Some(event) = &self.event {
98 out.push_str("event: ");
99 out.push_str(event);
100 out.push('\n');
101 }
102 for line in self.data.lines() {
103 out.push_str("data: ");
104 out.push_str(line);
105 out.push('\n');
106 }
107 if self.data.is_empty() {
108 out.push_str("data: \n");
109 }
110 if let Some(ms) = self.retry_ms {
111 out.push_str("retry: ");
112 out.push_str(&ms.to_string());
113 out.push('\n');
114 }
115 out.push('\n');
116 out.into_bytes()
117 }
118}
119
120// ── Sse ──────────────────────────────────────────────────────────────────────
121
122/// Builder for a buffered Server-Sent Events response.
123///
124/// Call [`Sse::into_response`] to obtain a [`Response`] with:
125/// - `200 OK`
126/// - `Content-Type: text/event-stream`
127/// - `Cache-Control: no-cache`
128/// - `X-Accel-Buffering: no` (disables nginx proxy buffering)
129///
130/// # Example
131///
132/// ```rust,no_run
133/// use rust_web_server::sse::Sse;
134///
135/// let response = Sse::new()
136/// .event("open", "")
137/// .data(r#"{"msg":"hello"}"#)
138/// .retry(5000)
139/// .into_response();
140/// ```
141pub struct Sse {
142 chunks: Vec<Vec<u8>>,
143}
144
145impl Sse {
146 /// Create an empty SSE response builder.
147 pub fn new() -> Self {
148 Sse { chunks: Vec::new() }
149 }
150
151 /// Append a named event with `data`.
152 ///
153 /// `event_type` becomes the `event:` field; `data` becomes the `data:` field.
154 pub fn event(mut self, event_type: &str, data: &str) -> Self {
155 self.chunks.push(
156 SseEvent::data(data).event_type(event_type).encode(),
157 );
158 self
159 }
160
161 /// Append a data-only event (no `event:` type field; clients receive it as
162 /// `"message"`).
163 pub fn data(mut self, data: &str) -> Self {
164 self.chunks.push(SseEvent::data(data).encode());
165 self
166 }
167
168 /// Append a fully configured [`SseEvent`] (id, type, retry, data).
169 pub fn push(mut self, event: SseEvent) -> Self {
170 self.chunks.push(event.encode());
171 self
172 }
173
174 /// Append a `retry:` directive that tells the client how many milliseconds
175 /// to wait before reconnecting after the connection is lost.
176 pub fn retry(mut self, ms: u32) -> Self {
177 self.chunks.push(format!("retry: {}\n\n", ms).into_bytes());
178 self
179 }
180
181 /// Append an SSE comment line (starts with `:`). Used as a keep-alive ping
182 /// or annotation; browsers ignore comment content.
183 pub fn comment(mut self, text: &str) -> Self {
184 self.chunks.push(format!(": {}\n\n", text).into_bytes());
185 self
186 }
187
188 /// Finalise the builder and return a [`Response`] with the correct SSE
189 /// headers and the accumulated event body.
190 pub fn into_response(self) -> Response {
191 let body: Vec<u8> = self.chunks.into_iter().flatten().collect();
192
193 let mut response = Response::new();
194 response.status_code = *STATUS_CODE_REASON_PHRASE.n200_ok.status_code;
195 response.reason_phrase =
196 STATUS_CODE_REASON_PHRASE.n200_ok.reason_phrase.to_string();
197 response.headers.push(Header {
198 name: Header::_CACHE_CONTROL.to_string(),
199 value: "no-cache".to_string(),
200 });
201 response.headers.push(Header {
202 name: "X-Accel-Buffering".to_string(),
203 value: "no".to_string(),
204 });
205 response.content_range_list = vec![Range::get_content_range(
206 body,
207 "text/event-stream".to_string(),
208 )];
209 response
210 }
211}