Skip to main content

fastapi_core/
sse.rs

1//! Server-Sent Events (SSE) support.
2//!
3//! This module provides types and utilities for implementing server-sent events
4//! as defined in the [HTML Living Standard](https://html.spec.whatwg.org/multipage/server-sent-events.html).
5//!
6//! # Overview
7//!
8//! Server-Sent Events allow servers to push data to clients over HTTP. The connection
9//! stays open and the server can send events whenever new data is available.
10//!
11//! # Example
12//!
13//! ```ignore
14//! use fastapi_core::sse::{SseEvent, SseResponse};
15//! use fastapi_core::Response;
16//! use asupersync::stream;
17//!
18//! async fn event_stream() -> Response {
19//!     // Create an async stream of events
20//!     let events = stream::iter(vec![
21//!         SseEvent::message("Hello!"),
22//!         SseEvent::new("World!")
23//!             .event_type("greeting")
24//!             .id("1"),
25//!     ]);
26//!
27//!     SseResponse::new(events).into_response()
28//! }
29//! ```
30//!
31//! # Event Format
32//!
33//! Each event is sent as a text block with the following format:
34//!
35//! ```text
36//! event: <event-type>\n
37//! id: <id>\n
38//! retry: <milliseconds>\n
39//! data: <data-line-1>\n
40//! data: <data-line-2>\n
41//! \n
42//! ```
43//!
44//! - `event`: Optional event type (default is "message")
45//! - `id`: Optional event ID for resumption
46//! - `retry`: Optional reconnection time in milliseconds
47//! - `data`: The actual payload (required, can be multiple lines)
48//!
49//! # Keep-Alive
50//!
51//! Use [`SseEvent::comment()`] to send keep-alive comments that prevent
52//! connection timeouts without sending actual events.
53//!
54//! # Cancellation
55//!
56//! SSE streams integrate with asupersync's cancellation. When the client
57//! disconnects, the stream will be cancelled at the next checkpoint.
58
59use std::pin::Pin;
60use std::task::{Context, Poll};
61use std::time::Duration;
62
63use asupersync::stream::Stream;
64
65use crate::response::{Response, ResponseBody, StatusCode};
66
67/// A Server-Sent Event.
68///
69/// Events consist of one or more fields:
70/// - `data`: The message payload (required, can contain newlines)
71/// - `event`: The event type (optional, defaults to "message")
72/// - `id`: An identifier for the event (optional, used for resumption)
73/// - `retry`: Reconnection time in milliseconds (optional)
74/// - `comment`: A comment (not delivered to the event listener)
75///
76/// # Example
77///
78/// ```ignore
79/// use fastapi_core::sse::SseEvent;
80///
81/// // Simple message
82/// let event = SseEvent::message("Hello, World!");
83///
84/// // Event with type and ID
85/// let event = SseEvent::new("user joined")
86///     .event_type("join")
87///     .id("123");
88///
89/// // JSON data
90/// let event = SseEvent::new(r#"{"user":"alice","action":"login"}"#)
91///     .event_type("user_event");
92///
93/// // Keep-alive comment
94/// let event = SseEvent::comment("keep-alive");
95/// ```
96#[derive(Debug, Clone)]
97pub struct SseEvent {
98    data: Option<String>,
99    event_type: Option<String>,
100    id: Option<String>,
101    retry: Option<u64>,
102    comment: Option<String>,
103}
104
105impl SseEvent {
106    /// Create a new SSE event with the given data.
107    ///
108    /// The data can contain newlines - each line will be prefixed with `data: `.
109    #[must_use]
110    pub fn new(data: impl Into<String>) -> Self {
111        Self {
112            data: Some(data.into()),
113            event_type: None,
114            id: None,
115            retry: None,
116            comment: None,
117        }
118    }
119
120    /// Create a simple message event.
121    ///
122    /// Equivalent to `SseEvent::new(data)`.
123    #[must_use]
124    pub fn message(data: impl Into<String>) -> Self {
125        Self::new(data)
126    }
127
128    /// Create a keep-alive comment.
129    ///
130    /// Comments are sent but not delivered to event listeners.
131    /// Use these to prevent connection timeouts.
132    ///
133    /// # Example
134    ///
135    /// ```ignore
136    /// let keepalive = SseEvent::comment("heartbeat");
137    /// ```
138    #[must_use]
139    pub fn comment(comment: impl Into<String>) -> Self {
140        Self {
141            data: None,
142            event_type: None,
143            id: None,
144            retry: None,
145            comment: Some(comment.into()),
146        }
147    }
148
149    /// Set the event type.
150    ///
151    /// If not set, the event type defaults to "message".
152    ///
153    /// # Example
154    ///
155    /// ```ignore
156    /// let event = SseEvent::new("user logged in")
157    ///     .event_type("login");
158    /// ```
159    #[must_use]
160    pub fn event_type(mut self, event_type: impl Into<String>) -> Self {
161        self.event_type = Some(event_type.into());
162        self
163    }
164
165    /// Set the event ID.
166    ///
167    /// The client stores this ID and sends it in the `Last-Event-ID` header
168    /// when reconnecting, allowing the server to resume from where it left off.
169    ///
170    /// # Example
171    ///
172    /// ```ignore
173    /// let event = SseEvent::new("data")
174    ///     .id("12345");
175    /// ```
176    #[must_use]
177    pub fn id(mut self, id: impl Into<String>) -> Self {
178        self.id = Some(id.into());
179        self
180    }
181
182    /// Set the retry timeout in milliseconds.
183    ///
184    /// This tells the client how long to wait before attempting to reconnect
185    /// if the connection is lost.
186    ///
187    /// # Example
188    ///
189    /// ```ignore
190    /// let event = SseEvent::new("data")
191    ///     .retry_ms(5000); // 5 seconds
192    /// ```
193    #[must_use]
194    pub fn retry_ms(mut self, milliseconds: u64) -> Self {
195        self.retry = Some(milliseconds);
196        self
197    }
198
199    /// Set the retry timeout from a Duration.
200    ///
201    /// # Example
202    ///
203    /// ```ignore
204    /// use std::time::Duration;
205    ///
206    /// let event = SseEvent::new("data")
207    ///     .retry(Duration::from_secs(5));
208    /// ```
209    #[must_use]
210    pub fn retry(self, duration: Duration) -> Self {
211        self.retry_ms(duration.as_millis() as u64)
212    }
213
214    /// Format the event as SSE wire format.
215    ///
216    /// Returns bytes ready to be sent to the client.
217    #[must_use]
218    pub fn to_bytes(&self) -> Vec<u8> {
219        let mut output = Vec::with_capacity(256);
220
221        // Write comment if present
222        if let Some(ref comment) = self.comment {
223            for line in comment.lines() {
224                output.extend_from_slice(b": ");
225                output.extend_from_slice(line.as_bytes());
226                output.push(b'\n');
227            }
228        }
229
230        // Write event type if present
231        if let Some(ref event_type) = self.event_type {
232            output.extend_from_slice(b"event: ");
233            output.extend_from_slice(event_type.as_bytes());
234            output.push(b'\n');
235        }
236
237        // Write ID if present
238        if let Some(ref id) = self.id {
239            output.extend_from_slice(b"id: ");
240            output.extend_from_slice(id.as_bytes());
241            output.push(b'\n');
242        }
243
244        // Write retry if present
245        if let Some(retry) = self.retry {
246            output.extend_from_slice(b"retry: ");
247            output.extend_from_slice(retry.to_string().as_bytes());
248            output.push(b'\n');
249        }
250
251        // Write data lines (each line gets "data: " prefix)
252        if let Some(ref data) = self.data {
253            for line in data.lines() {
254                output.extend_from_slice(b"data: ");
255                output.extend_from_slice(line.as_bytes());
256                output.push(b'\n');
257            }
258            // Handle case where data is empty or ends with newline
259            if data.is_empty() {
260                output.extend_from_slice(b"data: \n");
261            }
262        }
263
264        // Events are terminated by a blank line
265        output.push(b'\n');
266
267        output
268    }
269}
270
271impl From<&str> for SseEvent {
272    fn from(data: &str) -> Self {
273        Self::new(data)
274    }
275}
276
277impl From<String> for SseEvent {
278    fn from(data: String) -> Self {
279        Self::new(data)
280    }
281}
282
283/// A wrapper that converts an async stream of SSE events into formatted bytes.
284///
285/// This stream produces `Vec<u8>` chunks suitable for sending over HTTP.
286pub struct SseStream<S> {
287    inner: S,
288}
289
290impl<S> SseStream<S> {
291    /// Create a new SSE stream wrapper.
292    pub fn new(stream: S) -> Self {
293        Self { inner: stream }
294    }
295}
296
297impl<S> Stream for SseStream<S>
298where
299    S: Stream<Item = SseEvent> + Unpin,
300{
301    type Item = Vec<u8>;
302
303    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304        match Pin::new(&mut self.inner).poll_next(cx) {
305            Poll::Ready(Some(event)) => Poll::Ready(Some(event.to_bytes())),
306            Poll::Ready(None) => Poll::Ready(None),
307            Poll::Pending => Poll::Pending,
308        }
309    }
310}
311
312/// Configuration for SSE responses.
313#[derive(Debug, Clone)]
314pub struct SseConfig {
315    /// Keep-alive interval in seconds (0 = disabled).
316    pub keep_alive_secs: u64,
317    /// Comment to send for keep-alive.
318    pub keep_alive_comment: String,
319}
320
321impl Default for SseConfig {
322    fn default() -> Self {
323        Self {
324            keep_alive_secs: 30,
325            keep_alive_comment: "keep-alive".to_string(),
326        }
327    }
328}
329
330impl SseConfig {
331    /// Create a new SSE configuration with defaults.
332    #[must_use]
333    pub fn new() -> Self {
334        Self::default()
335    }
336
337    /// Set the keep-alive interval.
338    #[must_use]
339    pub fn keep_alive_secs(mut self, seconds: u64) -> Self {
340        self.keep_alive_secs = seconds;
341        self
342    }
343
344    /// Disable keep-alive.
345    #[must_use]
346    pub fn disable_keep_alive(mut self) -> Self {
347        self.keep_alive_secs = 0;
348        self
349    }
350
351    /// Set the keep-alive comment text.
352    #[must_use]
353    pub fn keep_alive_comment(mut self, comment: impl Into<String>) -> Self {
354        self.keep_alive_comment = comment.into();
355        self
356    }
357}
358
359/// Builder for creating SSE responses.
360///
361/// # Example
362///
363/// ```ignore
364/// use fastapi_core::sse::{SseEvent, SseResponse};
365/// use asupersync::stream;
366///
367/// async fn events() -> Response {
368///     let events = stream::iter(vec![
369///         SseEvent::message("Hello"),
370///         SseEvent::message("World"),
371///     ]);
372///
373///     SseResponse::new(events)
374///         .into_response()
375/// }
376/// ```
377pub struct SseResponse<S> {
378    stream: S,
379    _config: SseConfig,
380}
381
382impl<S> SseResponse<S>
383where
384    S: Stream<Item = SseEvent> + Send + Unpin + 'static,
385{
386    /// Create a new SSE response from an event stream.
387    pub fn new(stream: S) -> Self {
388        Self {
389            stream,
390            _config: SseConfig::default(),
391        }
392    }
393
394    /// Create an SSE response with custom configuration.
395    pub fn with_config(stream: S, config: SseConfig) -> Self {
396        Self {
397            stream,
398            _config: config,
399        }
400    }
401
402    /// Convert to an HTTP Response.
403    ///
404    /// Sets the appropriate headers for SSE:
405    /// - `Content-Type: text/event-stream`
406    /// - `Cache-Control: no-cache`
407    /// - `Connection: keep-alive`
408    #[must_use]
409    pub fn into_response(self) -> Response {
410        let sse_stream = SseStream::new(self.stream);
411
412        Response::with_status(StatusCode::OK)
413            .header("content-type", b"text/event-stream".to_vec())
414            .header("cache-control", b"no-cache".to_vec())
415            .header("connection", b"keep-alive".to_vec())
416            .header("x-accel-buffering", b"no".to_vec()) // Disable nginx buffering
417            .body(ResponseBody::stream(sse_stream))
418    }
419}
420
421/// Convenience function to create an SSE response from an iterator.
422///
423/// # Example
424///
425/// ```ignore
426/// use fastapi_core::sse::{sse_response, SseEvent};
427///
428/// let events = vec![
429///     SseEvent::message("Hello"),
430///     SseEvent::message("World"),
431/// ];
432///
433/// let response = sse_response(asupersync::stream::iter(events));
434/// ```
435pub fn sse_response<S>(stream: S) -> Response
436where
437    S: Stream<Item = SseEvent> + Send + Unpin + 'static,
438{
439    SseResponse::new(stream).into_response()
440}
441
442// ============================================================================
443// Tests
444// ============================================================================
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    #[test]
451    fn event_simple_message() {
452        let event = SseEvent::message("Hello, World!");
453        let bytes = event.to_bytes();
454        let output = String::from_utf8_lossy(&bytes);
455        assert!(output.contains("data: Hello, World!\n"));
456        assert!(output.ends_with("\n\n"));
457    }
458
459    #[test]
460    fn event_with_type() {
461        let event = SseEvent::new("user joined").event_type("join");
462        let bytes = event.to_bytes();
463        let output = String::from_utf8_lossy(&bytes);
464        assert!(output.contains("event: join\n"));
465        assert!(output.contains("data: user joined\n"));
466    }
467
468    #[test]
469    fn event_with_id() {
470        let event = SseEvent::new("data").id("12345");
471        let bytes = event.to_bytes();
472        let output = String::from_utf8_lossy(&bytes);
473        assert!(output.contains("id: 12345\n"));
474    }
475
476    #[test]
477    fn event_with_retry() {
478        let event = SseEvent::new("data").retry_ms(5000);
479        let bytes = event.to_bytes();
480        let output = String::from_utf8_lossy(&bytes);
481        assert!(output.contains("retry: 5000\n"));
482    }
483
484    #[test]
485    fn event_multiline_data() {
486        let event = SseEvent::new("line1\nline2\nline3");
487        let bytes = event.to_bytes();
488        let output = String::from_utf8_lossy(&bytes);
489        assert!(output.contains("data: line1\n"));
490        assert!(output.contains("data: line2\n"));
491        assert!(output.contains("data: line3\n"));
492    }
493
494    #[test]
495    fn event_comment() {
496        let event = SseEvent::comment("keep-alive");
497        let bytes = event.to_bytes();
498        let output = String::from_utf8_lossy(&bytes);
499        assert!(output.contains(": keep-alive\n"));
500    }
501
502    #[test]
503    fn event_full_format() {
504        let event = SseEvent::new("payload")
505            .event_type("update")
506            .id("42")
507            .retry_ms(3000);
508        let bytes = event.to_bytes();
509        let output = String::from_utf8_lossy(&bytes);
510
511        // Check order: event, id, retry, data
512        let event_pos = output.find("event:").unwrap();
513        let id_pos = output.find("id:").unwrap();
514        let retry_pos = output.find("retry:").unwrap();
515        let data_pos = output.find("data:").unwrap();
516
517        assert!(event_pos < id_pos);
518        assert!(id_pos < retry_pos);
519        assert!(retry_pos < data_pos);
520    }
521
522    #[test]
523    fn event_from_str() {
524        let event: SseEvent = "Hello".into();
525        let bytes = event.to_bytes();
526        let output = String::from_utf8_lossy(&bytes);
527        assert!(output.contains("data: Hello\n"));
528    }
529
530    #[test]
531    fn event_from_string() {
532        let event: SseEvent = String::from("World").into();
533        let bytes = event.to_bytes();
534        let output = String::from_utf8_lossy(&bytes);
535        assert!(output.contains("data: World\n"));
536    }
537
538    #[test]
539    fn config_defaults() {
540        let config = SseConfig::default();
541        assert_eq!(config.keep_alive_secs, 30);
542        assert_eq!(config.keep_alive_comment, "keep-alive");
543    }
544
545    #[test]
546    fn config_custom() {
547        let config = SseConfig::new()
548            .keep_alive_secs(60)
549            .keep_alive_comment("heartbeat");
550        assert_eq!(config.keep_alive_secs, 60);
551        assert_eq!(config.keep_alive_comment, "heartbeat");
552    }
553
554    #[test]
555    fn config_disable_keepalive() {
556        let config = SseConfig::new().disable_keep_alive();
557        assert_eq!(config.keep_alive_secs, 0);
558    }
559
560    #[test]
561    fn event_empty_data() {
562        let event = SseEvent::new("");
563        let bytes = event.to_bytes();
564        let output = String::from_utf8_lossy(&bytes);
565        assert!(output.contains("data: \n"));
566    }
567
568    #[test]
569    fn retry_from_duration() {
570        let event = SseEvent::new("data").retry(Duration::from_secs(10));
571        let bytes = event.to_bytes();
572        let output = String::from_utf8_lossy(&bytes);
573        assert!(output.contains("retry: 10000\n"));
574    }
575}