Skip to main content

arcly_http/realtime/
sse.rs

1//! High-performance Server-Sent Events engine.
2//!
3//! `SseStream` adapts any `futures::Stream` of [`SseEvent`]s into a compliant
4//! `text/event-stream` response. The conversion to the transport layer happens
5//! once, lazily, inside `IntoResponse` — handler code only ever touches
6//! arcly-owned types (`SseStream`, `SseEvent`).
7//!
8//! Back-pressure and flushing are delegated to the underlying async runtime:
9//! the stream is polled only as fast as the client drains the socket, so a slow
10//! consumer never buffers unboundedly inside the framework.
11
12use std::convert::Infallible;
13use std::time::Duration;
14
15use axum::response::sse::{Event, KeepAlive, Sse};
16use futures::stream::{BoxStream, Stream, StreamExt};
17use serde::Serialize;
18
19/// A single Server-Sent Event. Builder-style; all fields optional except data.
20#[derive(Debug, Default, Clone)]
21pub struct SseEvent {
22    event: Option<String>,
23    data: String,
24    id: Option<String>,
25    retry: Option<u64>,
26}
27
28impl SseEvent {
29    /// Set the raw `data:` payload.
30    pub fn data(mut self, data: impl Into<String>) -> Self {
31        self.data = data.into();
32        self
33    }
34
35    /// Serialise `value` as JSON into the `data:` field.
36    pub fn json_data<T: Serialize>(mut self, value: T) -> Result<Self, serde_json::Error> {
37        self.data = serde_json::to_string(&value)?;
38        Ok(self)
39    }
40
41    /// Set the `event:` (named event type) field.
42    pub fn event(mut self, name: impl Into<String>) -> Self {
43        self.event = Some(name.into());
44        self
45    }
46
47    /// Set the `id:` field (used by clients for `Last-Event-ID` resumption).
48    pub fn id(mut self, id: impl Into<String>) -> Self {
49        self.id = Some(id.into());
50        self
51    }
52
53    /// Set the client reconnection hint, in milliseconds.
54    pub fn retry(mut self, millis: u64) -> Self {
55        self.retry = Some(millis);
56        self
57    }
58
59    /// Lower into the transport event. Private — never escapes this module.
60    fn into_transport(self) -> Event {
61        let mut e = Event::default().data(self.data);
62        if let Some(name) = self.event {
63            e = e.event(name);
64        }
65        if let Some(id) = self.id {
66            e = e.id(id);
67        }
68        if let Some(ms) = self.retry {
69            e = e.retry(Duration::from_millis(ms));
70        }
71        e
72    }
73}
74
75/// A streaming `text/event-stream` response body.
76///
77/// Construct from any `Send + 'static` stream yielding `Result<SseEvent, _>`.
78/// A periodic keep-alive comment is injected automatically to hold idle
79/// connections open through proxies.
80pub struct SseStream {
81    inner: BoxStream<'static, Result<SseEvent, Infallible>>,
82    keep_alive: Duration,
83}
84
85impl SseStream {
86    /// Wrap a stream of events. The error type is fixed to [`Infallible`] —
87    /// fallible producers should map their errors into a terminal `SseEvent`.
88    pub fn new<S>(stream: S) -> Self
89    where
90        S: Stream<Item = Result<SseEvent, Infallible>> + Send + 'static,
91    {
92        Self {
93            inner: stream.boxed(),
94            keep_alive: Duration::from_secs(15),
95        }
96    }
97
98    /// Override the keep-alive interval (default 15s).
99    pub fn keep_alive(mut self, interval: Duration) -> Self {
100        self.keep_alive = interval;
101        self
102    }
103}
104
105impl axum::response::IntoResponse for SseStream {
106    fn into_response(self) -> axum::response::Response {
107        let interval = self.keep_alive;
108        let mapped = self.inner.map(|res| res.map(SseEvent::into_transport));
109        Sse::new(mapped)
110            .keep_alive(KeepAlive::new().interval(interval))
111            .into_response()
112    }
113}