actix_web_lab/
sse.rs

1//! Semantic server-sent events (SSE) responder
2//!
3//! # Examples
4//! ```no_run
5//! use std::{convert::Infallible, time::Duration};
6//!
7//! use actix_web::{Responder, get};
8//! use actix_web_lab::sse;
9//!
10//! #[get("/from-channel")]
11//! async fn from_channel() -> impl Responder {
12//!     let (tx, rx) = tokio::sync::mpsc::channel(10);
13//!
14//!     // note: sender will typically be spawned or handed off somewhere else
15//!     let _ = tx.send(sse::Event::Comment("my comment".into())).await;
16//!     let _ = tx
17//!         .send(sse::Data::new("my data").event("chat_msg").into())
18//!         .await;
19//!
20//!     sse::Sse::from_infallible_receiver(rx).with_retry_duration(Duration::from_secs(10))
21//! }
22//!
23//! #[get("/from-stream")]
24//! async fn from_stream() -> impl Responder {
25//!     let event_stream = futures_util::stream::iter([Ok::<_, Infallible>(sse::Event::Data(
26//!         sse::Data::new("foo"),
27//!     ))]);
28//!
29//!     sse::Sse::from_stream(event_stream).with_keep_alive(Duration::from_secs(5))
30//! }
31//! ```
32//!
33//! Complete usage examples can be found in the examples directory of the source code repo.
34#![doc(
35    alias = "server sent",
36    alias = "server-sent",
37    alias = "server sent events",
38    alias = "server-sent events",
39    alias = "event-stream"
40)]
41
42use std::{
43    pin::Pin,
44    task::{Context, Poll},
45    time::Duration,
46};
47
48use actix_web::{
49    HttpRequest, HttpResponse, Responder,
50    body::{BodySize, BoxBody, MessageBody},
51    http::header::ContentEncoding,
52};
53use bytes::{BufMut as _, Bytes, BytesMut};
54use bytestring::ByteString;
55use futures_core::Stream;
56use pin_project_lite::pin_project;
57use serde::Serialize;
58use tokio::{
59    sync::mpsc,
60    time::{Interval, interval},
61};
62use tokio_stream::wrappers::ReceiverStream;
63
64use crate::{
65    BoxError,
66    header::{CacheControl, CacheDirective},
67    util::InfallibleStream,
68};
69
70/// Server-sent events data message containing a `data` field and optional `id` and `event` fields.
71///
72/// # Examples
73/// ```
74/// # #[actix_web::main] async fn test() {
75/// use std::convert::Infallible;
76///
77/// use actix_web::body;
78/// use actix_web_lab::sse;
79/// use futures_util::stream;
80/// use serde::Serialize;
81///
82/// #[derive(serde::Serialize)]
83/// struct Foo {
84///     bar: u32,
85/// }
86///
87/// let sse = sse::Sse::from_stream(stream::iter([
88///     Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))),
89///     Ok::<_, Infallible>(sse::Event::Data(
90///         sse::Data::new_json(Foo { bar: 42 }).unwrap(),
91///     )),
92/// ]));
93///
94/// assert_eq!(
95///     body::to_bytes(sse).await.unwrap(),
96///     "data: foo\n\ndata: {\"bar\":42}\n\n",
97/// );
98/// # }; test();
99/// ```
100#[must_use]
101#[derive(Debug, Clone)]
102pub struct Data {
103    id: Option<ByteString>,
104    event: Option<ByteString>,
105    data: ByteString,
106}
107
108impl Data {
109    /// Constructs a new SSE data message with just the `data` field.
110    ///
111    /// # Examples
112    /// ```
113    /// use actix_web_lab::sse;
114    /// let event = sse::Event::Data(sse::Data::new("foo"));
115    /// ```
116    pub fn new(data: impl Into<ByteString>) -> Self {
117        Self {
118            id: None,
119            event: None,
120            data: data.into(),
121        }
122    }
123
124    /// Constructs a new SSE data message the `data` field set to `data` serialized as JSON.
125    ///
126    /// # Examples
127    /// ```
128    /// use actix_web_lab::sse;
129    ///
130    /// #[derive(serde::Serialize)]
131    /// struct Foo {
132    ///     bar: u32,
133    /// }
134    ///
135    /// let event = sse::Event::Data(sse::Data::new_json(Foo { bar: 42 }).unwrap());
136    /// ```
137    pub fn new_json(data: impl Serialize) -> Result<Self, serde_json::Error> {
138        Ok(Self {
139            id: None,
140            event: None,
141            data: serde_json::to_string(&data)?.into(),
142        })
143    }
144
145    /// Sets `data` field.
146    pub fn set_data(&mut self, data: impl Into<ByteString>) {
147        self.data = data.into();
148    }
149
150    /// Sets `id` field, returning a new data message.
151    pub fn id(mut self, id: impl Into<ByteString>) -> Self {
152        self.id = Some(id.into());
153        self
154    }
155
156    /// Sets `id` field.
157    pub fn set_id(&mut self, id: impl Into<ByteString>) {
158        self.id = Some(id.into());
159    }
160
161    /// Sets `event` name field, returning a new data message.
162    pub fn event(mut self, event: impl Into<ByteString>) -> Self {
163        self.event = Some(event.into());
164        self
165    }
166
167    /// Sets `event` name field.
168    pub fn set_event(&mut self, event: impl Into<ByteString>) {
169        self.event = Some(event.into());
170    }
171}
172
173impl From<Data> for Event {
174    fn from(data: Data) -> Self {
175        Self::Data(data)
176    }
177}
178
179/// Server-sent events message containing one or more fields.
180#[must_use]
181#[derive(Debug, Clone)]
182pub enum Event {
183    /// A `data` message with optional ID and event name.
184    ///
185    /// Data messages looks like this in the response stream.
186    /// ```plain
187    /// event: foo
188    /// id: 42
189    /// data: my data
190    ///
191    /// data: {
192    /// data:   "multiline": "data"
193    /// data: }
194    /// ```
195    Data(Data),
196
197    /// A comment message.
198    ///
199    /// Comments look like this in the response stream.
200    /// ```plain
201    /// : my comment
202    ///
203    /// : another comment
204    /// ```
205    Comment(ByteString),
206}
207
208impl Event {
209    /// Splits data into lines and prepend each line with `prefix`.
210    fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) {
211        // initial buffer size guess is len(data) + 10 lines of prefix + EOLs + EOF
212        buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1);
213
214        // append prefix + space + line to buffer
215        for line in data.split('\n') {
216            buf.put_slice(prefix.as_bytes());
217            buf.put_slice(line.as_bytes());
218            buf.put_u8(b'\n');
219        }
220    }
221
222    /// Serializes message into event-stream format.
223    fn into_bytes(self) -> Bytes {
224        let mut buf = BytesMut::new();
225
226        match self {
227            Event::Data(Data { id, event, data }) => {
228                if let Some(text) = id {
229                    buf.put_slice(b"id: ");
230                    buf.put_slice(text.as_bytes());
231                    buf.put_u8(b'\n');
232                }
233
234                if let Some(text) = event {
235                    buf.put_slice(b"event: ");
236                    buf.put_slice(text.as_bytes());
237                    buf.put_u8(b'\n');
238                }
239
240                Self::line_split_with_prefix(&mut buf, "data: ", data);
241            }
242
243            Event::Comment(text) => Self::line_split_with_prefix(&mut buf, ": ", text),
244        }
245
246        // final newline to mark end of message
247        buf.put_u8(b'\n');
248
249        buf.freeze()
250    }
251
252    /// Serializes retry message into event-stream format.
253    fn retry_to_bytes(retry: Duration) -> Bytes {
254        Bytes::from(format!("retry: {}\n\n", retry.as_millis()))
255    }
256
257    /// Serializes a keep-alive event-stream comment message into bytes.
258    const fn keep_alive_bytes() -> Bytes {
259        Bytes::from_static(b": keep-alive\n\n")
260    }
261}
262
263pin_project! {
264    /// Server-sent events (`text/event-stream`) responder.
265    ///
266    /// Constructed using a [Tokio channel](Self::from_receiver) or using your [own
267    /// stream](Self::from_stream).
268    #[must_use]
269    #[derive(Debug)]
270    pub struct Sse<S> {
271        #[pin]
272        stream: S,
273        keep_alive: Option<Interval>,
274        retry_interval: Option<Duration>,
275    }
276}
277
278impl<S, E> Sse<S>
279where
280    S: Stream<Item = Result<Event, E>> + 'static,
281    E: Into<BoxError>,
282{
283    /// Create an SSE response from a stream that yields SSE [Event]s.
284    pub fn from_stream(stream: S) -> Self {
285        Self {
286            stream,
287            keep_alive: None,
288            retry_interval: None,
289        }
290    }
291}
292
293impl<S> Sse<InfallibleStream<S>>
294where
295    S: Stream<Item = Event> + 'static,
296{
297    /// Create an SSE response from an infallible stream that yields SSE [Event]s.
298    pub fn from_infallible_stream(stream: S) -> Self {
299        Sse::from_stream(InfallibleStream::new(stream))
300    }
301}
302
303impl<E> Sse<ReceiverStream<Result<Event, E>>>
304where
305    E: Into<BoxError> + 'static,
306{
307    /// Create an SSE response from a receiver that yields SSE [Event]s.
308    pub fn from_receiver(receiver: mpsc::Receiver<Result<Event, E>>) -> Self {
309        Self::from_stream(ReceiverStream::new(receiver))
310    }
311}
312
313impl Sse<InfallibleStream<ReceiverStream<Event>>> {
314    /// Create an SSE response from a receiver that yields SSE [Event]s.
315    pub fn from_infallible_receiver(receiver: mpsc::Receiver<Event>) -> Self {
316        Self::from_stream(InfallibleStream::new(ReceiverStream::new(receiver)))
317    }
318}
319
320impl<S> Sse<S> {
321    /// Enables "keep-alive" messages to be send in the event stream after a period of inactivity.
322    ///
323    /// By default, no keep-alive is set up.
324    pub fn with_keep_alive(mut self, keep_alive_period: Duration) -> Self {
325        let mut int = interval(keep_alive_period);
326        int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
327
328        self.keep_alive = Some(int);
329        self
330    }
331
332    /// Queues first event message to inform client of custom retry period.
333    ///
334    /// Browsers default to retry every 3 seconds or so.
335    pub fn with_retry_duration(mut self, retry: Duration) -> Self {
336        self.retry_interval = Some(retry);
337        self
338    }
339}
340
341impl<S, E> Responder for Sse<S>
342where
343    S: Stream<Item = Result<Event, E>> + 'static,
344    E: Into<BoxError>,
345{
346    type Body = BoxBody;
347
348    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
349        HttpResponse::Ok()
350            .content_type(mime::TEXT_EVENT_STREAM)
351            .insert_header(ContentEncoding::Identity)
352            .insert_header(CacheControl(vec![CacheDirective::NoCache]))
353            .body(self)
354    }
355}
356
357impl<S, E> MessageBody for Sse<S>
358where
359    S: Stream<Item = Result<Event, E>>,
360    E: Into<BoxError>,
361{
362    type Error = BoxError;
363
364    fn size(&self) -> BodySize {
365        BodySize::Stream
366    }
367
368    fn poll_next(
369        self: Pin<&mut Self>,
370        cx: &mut Context<'_>,
371    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
372        let this = self.project();
373
374        if let Some(retry) = this.retry_interval.take() {
375            cx.waker().wake_by_ref();
376            return Poll::Ready(Some(Ok(Event::retry_to_bytes(retry))));
377        }
378
379        if let Poll::Ready(msg) = this.stream.poll_next(cx) {
380            return match msg {
381                Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))),
382                Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
383                None => Poll::Ready(None),
384            };
385        }
386
387        if let Some(keep_alive) = this.keep_alive {
388            if keep_alive.poll_tick(cx).is_ready() {
389                return Poll::Ready(Some(Ok(Event::keep_alive_bytes())));
390            }
391        }
392
393        Poll::Pending
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use std::convert::Infallible;
400
401    use actix_web::{body, test::TestRequest};
402    use futures_util::{FutureExt as _, StreamExt as _, future::poll_fn, stream, task::noop_waker};
403    use tokio::time::sleep;
404
405    use super::*;
406    use crate::{assert_response_matches, util::InfallibleStream};
407
408    #[test]
409    fn format_retry_message() {
410        assert_eq!(
411            Event::retry_to_bytes(Duration::from_millis(1)),
412            "retry: 1\n\n",
413        );
414        assert_eq!(
415            Event::retry_to_bytes(Duration::from_secs(10)),
416            "retry: 10000\n\n",
417        );
418    }
419
420    #[test]
421    fn line_split_format() {
422        let mut buf = BytesMut::new();
423        Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo"));
424        assert_eq!(buf, "data: foo\n");
425
426        let mut buf = BytesMut::new();
427        Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo\nbar"));
428        assert_eq!(buf, "data: foo\ndata: bar\n");
429    }
430
431    #[test]
432    fn into_bytes_format() {
433        assert_eq!(Event::Comment("foo".into()).into_bytes(), ": foo\n\n");
434
435        assert_eq!(
436            Event::Data(Data {
437                id: None,
438                event: None,
439                data: "foo".into()
440            })
441            .into_bytes(),
442            "data: foo\n\n"
443        );
444
445        assert_eq!(
446            Event::Data(Data {
447                id: None,
448                event: None,
449                data: "\n".into()
450            })
451            .into_bytes(),
452            "data: \ndata: \n\n"
453        );
454
455        assert_eq!(
456            Event::Data(Data {
457                id: Some("42".into()),
458                event: None,
459                data: "foo".into()
460            })
461            .into_bytes(),
462            "id: 42\ndata: foo\n\n"
463        );
464
465        assert_eq!(
466            Event::Data(Data {
467                id: None,
468                event: Some("bar".into()),
469                data: "foo".into()
470            })
471            .into_bytes(),
472            "event: bar\ndata: foo\n\n"
473        );
474
475        assert_eq!(
476            Event::Data(Data {
477                id: Some("42".into()),
478                event: Some("bar".into()),
479                data: "foo".into()
480            })
481            .into_bytes(),
482            "id: 42\nevent: bar\ndata: foo\n\n"
483        );
484    }
485
486    #[test]
487    fn retry_is_first_msg() {
488        let waker = noop_waker();
489        let mut cx = Context::from_waker(&waker);
490
491        let mut sse = Sse::from_stream(InfallibleStream::new(tokio_stream::empty()))
492            .with_retry_duration(Duration::from_millis(42));
493        match Pin::new(&mut sse).poll_next(&mut cx) {
494            Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"),
495            res => panic!("poll should return retry message, got {res:?}"),
496        }
497    }
498
499    #[actix_web::test]
500    async fn sse_from_external_streams() {
501        let st = stream::empty::<Result<_, Infallible>>();
502        let sse = Sse::from_stream(st);
503        assert_eq!(body::to_bytes(sse).await.unwrap(), "");
504
505        let st = stream::once(async { Ok::<_, Infallible>(Event::Data(Data::new("foo"))) });
506        let sse = Sse::from_stream(st);
507        assert_eq!(body::to_bytes(sse).await.unwrap(), "data: foo\n\n");
508
509        let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2);
510        let sse = Sse::from_stream(st);
511        assert_eq!(
512            body::to_bytes(sse).await.unwrap(),
513            "data: foo\n\ndata: foo\n\n",
514        );
515    }
516
517    #[actix_web::test]
518    async fn appropriate_headers_are_set_on_responder() {
519        let st = stream::empty::<Result<_, Infallible>>();
520        let sse = Sse::from_stream(st);
521
522        let res = sse.respond_to(&TestRequest::default().to_http_request());
523
524        assert_response_matches!(res, OK;
525            "content-type" => "text/event-stream"
526            "content-encoding" => "identity"
527            "cache-control" => "no-cache"
528        );
529    }
530
531    #[actix_web::test]
532    async fn messages_are_received_from_sender() {
533        let (sender, receiver) = tokio::sync::mpsc::channel(2);
534        let mut sse = Sse::from_infallible_receiver(receiver);
535
536        assert!(
537            poll_fn(|cx| Pin::new(&mut sse).poll_next(cx))
538                .now_or_never()
539                .is_none()
540        );
541
542        sender
543            .send(Data::new("bar").event("foo").into())
544            .await
545            .unwrap();
546
547        match poll_fn(|cx| Pin::new(&mut sse).poll_next(cx)).now_or_never() {
548            Some(Some(Ok(bytes))) => assert_eq!(bytes, "event: foo\ndata: bar\n\n"),
549            res => panic!("poll should return data message, got {res:?}"),
550        }
551    }
552
553    #[actix_web::test]
554    async fn keep_alive_is_sent() {
555        let waker = noop_waker();
556        let mut cx = Context::from_waker(&waker);
557
558        let (sender, receiver) = tokio::sync::mpsc::channel(2);
559        let mut sse =
560            Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_millis(4));
561
562        assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
563
564        sleep(Duration::from_millis(20)).await;
565
566        match Pin::new(&mut sse).poll_next(&mut cx) {
567            Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, ": keep-alive\n\n"),
568            res => panic!("poll should return data message, got {res:?}"),
569        }
570
571        assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
572
573        sender.send(Data::new("foo").into()).await.unwrap();
574
575        match Pin::new(&mut sse).poll_next(&mut cx) {
576            Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "data: foo\n\n"),
577            res => panic!("poll should return data message, got {res:?}"),
578        }
579    }
580}