1#![doc(
35 alias = "server sent",
36 alias = "server-sent",
37 alias = "server sent events",
38 alias = "server-sent events",
39 alias = "event-stream"
40)]
41
42use std::{
43 pin::Pin,
44 task::{Context, Poll},
45 time::Duration,
46};
47
48use actix_web::{
49 HttpRequest, HttpResponse, Responder,
50 body::{BodySize, BoxBody, MessageBody},
51 http::header::ContentEncoding,
52};
53use bytes::{BufMut as _, Bytes, BytesMut};
54use bytestring::ByteString;
55use futures_core::Stream;
56use pin_project_lite::pin_project;
57use serde::Serialize;
58use tokio::{
59 sync::mpsc,
60 time::{Interval, interval},
61};
62use tokio_stream::wrappers::ReceiverStream;
63
64use crate::{
65 BoxError,
66 header::{CacheControl, CacheDirective},
67 util::InfallibleStream,
68};
69
70#[must_use]
101#[derive(Debug, Clone)]
102pub struct Data {
103 id: Option<ByteString>,
104 event: Option<ByteString>,
105 data: ByteString,
106}
107
108impl Data {
109 pub fn new(data: impl Into<ByteString>) -> Self {
117 Self {
118 id: None,
119 event: None,
120 data: data.into(),
121 }
122 }
123
124 pub fn new_json(data: impl Serialize) -> Result<Self, serde_json::Error> {
138 Ok(Self {
139 id: None,
140 event: None,
141 data: serde_json::to_string(&data)?.into(),
142 })
143 }
144
145 pub fn set_data(&mut self, data: impl Into<ByteString>) {
147 self.data = data.into();
148 }
149
150 pub fn id(mut self, id: impl Into<ByteString>) -> Self {
152 self.id = Some(id.into());
153 self
154 }
155
156 pub fn set_id(&mut self, id: impl Into<ByteString>) {
158 self.id = Some(id.into());
159 }
160
161 pub fn event(mut self, event: impl Into<ByteString>) -> Self {
163 self.event = Some(event.into());
164 self
165 }
166
167 pub fn set_event(&mut self, event: impl Into<ByteString>) {
169 self.event = Some(event.into());
170 }
171}
172
173impl From<Data> for Event {
174 fn from(data: Data) -> Self {
175 Self::Data(data)
176 }
177}
178
179#[must_use]
181#[derive(Debug, Clone)]
182pub enum Event {
183 Data(Data),
196
197 Comment(ByteString),
206}
207
208impl Event {
209 fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) {
211 buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1);
213
214 for line in data.split('\n') {
216 buf.put_slice(prefix.as_bytes());
217 buf.put_slice(line.as_bytes());
218 buf.put_u8(b'\n');
219 }
220 }
221
222 fn into_bytes(self) -> Bytes {
224 let mut buf = BytesMut::new();
225
226 match self {
227 Event::Data(Data { id, event, data }) => {
228 if let Some(text) = id {
229 buf.put_slice(b"id: ");
230 buf.put_slice(text.as_bytes());
231 buf.put_u8(b'\n');
232 }
233
234 if let Some(text) = event {
235 buf.put_slice(b"event: ");
236 buf.put_slice(text.as_bytes());
237 buf.put_u8(b'\n');
238 }
239
240 Self::line_split_with_prefix(&mut buf, "data: ", data);
241 }
242
243 Event::Comment(text) => Self::line_split_with_prefix(&mut buf, ": ", text),
244 }
245
246 buf.put_u8(b'\n');
248
249 buf.freeze()
250 }
251
252 fn retry_to_bytes(retry: Duration) -> Bytes {
254 Bytes::from(format!("retry: {}\n\n", retry.as_millis()))
255 }
256
257 const fn keep_alive_bytes() -> Bytes {
259 Bytes::from_static(b": keep-alive\n\n")
260 }
261}
262
263pin_project! {
264 #[must_use]
269 #[derive(Debug)]
270 pub struct Sse<S> {
271 #[pin]
272 stream: S,
273 keep_alive: Option<Interval>,
274 retry_interval: Option<Duration>,
275 }
276}
277
278impl<S, E> Sse<S>
279where
280 S: Stream<Item = Result<Event, E>> + 'static,
281 E: Into<BoxError>,
282{
283 pub fn from_stream(stream: S) -> Self {
285 Self {
286 stream,
287 keep_alive: None,
288 retry_interval: None,
289 }
290 }
291}
292
293impl<S> Sse<InfallibleStream<S>>
294where
295 S: Stream<Item = Event> + 'static,
296{
297 pub fn from_infallible_stream(stream: S) -> Self {
299 Sse::from_stream(InfallibleStream::new(stream))
300 }
301}
302
303impl<E> Sse<ReceiverStream<Result<Event, E>>>
304where
305 E: Into<BoxError> + 'static,
306{
307 pub fn from_receiver(receiver: mpsc::Receiver<Result<Event, E>>) -> Self {
309 Self::from_stream(ReceiverStream::new(receiver))
310 }
311}
312
313impl Sse<InfallibleStream<ReceiverStream<Event>>> {
314 pub fn from_infallible_receiver(receiver: mpsc::Receiver<Event>) -> Self {
316 Self::from_stream(InfallibleStream::new(ReceiverStream::new(receiver)))
317 }
318}
319
320impl<S> Sse<S> {
321 pub fn with_keep_alive(mut self, keep_alive_period: Duration) -> Self {
325 let mut int = interval(keep_alive_period);
326 int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
327
328 self.keep_alive = Some(int);
329 self
330 }
331
332 pub fn with_retry_duration(mut self, retry: Duration) -> Self {
336 self.retry_interval = Some(retry);
337 self
338 }
339}
340
341impl<S, E> Responder for Sse<S>
342where
343 S: Stream<Item = Result<Event, E>> + 'static,
344 E: Into<BoxError>,
345{
346 type Body = BoxBody;
347
348 fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
349 HttpResponse::Ok()
350 .content_type(mime::TEXT_EVENT_STREAM)
351 .insert_header(ContentEncoding::Identity)
352 .insert_header(CacheControl(vec![CacheDirective::NoCache]))
353 .body(self)
354 }
355}
356
357impl<S, E> MessageBody for Sse<S>
358where
359 S: Stream<Item = Result<Event, E>>,
360 E: Into<BoxError>,
361{
362 type Error = BoxError;
363
364 fn size(&self) -> BodySize {
365 BodySize::Stream
366 }
367
368 fn poll_next(
369 self: Pin<&mut Self>,
370 cx: &mut Context<'_>,
371 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
372 let this = self.project();
373
374 if let Some(retry) = this.retry_interval.take() {
375 cx.waker().wake_by_ref();
376 return Poll::Ready(Some(Ok(Event::retry_to_bytes(retry))));
377 }
378
379 if let Poll::Ready(msg) = this.stream.poll_next(cx) {
380 return match msg {
381 Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))),
382 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
383 None => Poll::Ready(None),
384 };
385 }
386
387 if let Some(keep_alive) = this.keep_alive {
388 if keep_alive.poll_tick(cx).is_ready() {
389 return Poll::Ready(Some(Ok(Event::keep_alive_bytes())));
390 }
391 }
392
393 Poll::Pending
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use std::convert::Infallible;
400
401 use actix_web::{body, test::TestRequest};
402 use futures_util::{FutureExt as _, StreamExt as _, future::poll_fn, stream, task::noop_waker};
403 use tokio::time::sleep;
404
405 use super::*;
406 use crate::{assert_response_matches, util::InfallibleStream};
407
408 #[test]
409 fn format_retry_message() {
410 assert_eq!(
411 Event::retry_to_bytes(Duration::from_millis(1)),
412 "retry: 1\n\n",
413 );
414 assert_eq!(
415 Event::retry_to_bytes(Duration::from_secs(10)),
416 "retry: 10000\n\n",
417 );
418 }
419
420 #[test]
421 fn line_split_format() {
422 let mut buf = BytesMut::new();
423 Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo"));
424 assert_eq!(buf, "data: foo\n");
425
426 let mut buf = BytesMut::new();
427 Event::line_split_with_prefix(&mut buf, "data: ", ByteString::from("foo\nbar"));
428 assert_eq!(buf, "data: foo\ndata: bar\n");
429 }
430
431 #[test]
432 fn into_bytes_format() {
433 assert_eq!(Event::Comment("foo".into()).into_bytes(), ": foo\n\n");
434
435 assert_eq!(
436 Event::Data(Data {
437 id: None,
438 event: None,
439 data: "foo".into()
440 })
441 .into_bytes(),
442 "data: foo\n\n"
443 );
444
445 assert_eq!(
446 Event::Data(Data {
447 id: None,
448 event: None,
449 data: "\n".into()
450 })
451 .into_bytes(),
452 "data: \ndata: \n\n"
453 );
454
455 assert_eq!(
456 Event::Data(Data {
457 id: Some("42".into()),
458 event: None,
459 data: "foo".into()
460 })
461 .into_bytes(),
462 "id: 42\ndata: foo\n\n"
463 );
464
465 assert_eq!(
466 Event::Data(Data {
467 id: None,
468 event: Some("bar".into()),
469 data: "foo".into()
470 })
471 .into_bytes(),
472 "event: bar\ndata: foo\n\n"
473 );
474
475 assert_eq!(
476 Event::Data(Data {
477 id: Some("42".into()),
478 event: Some("bar".into()),
479 data: "foo".into()
480 })
481 .into_bytes(),
482 "id: 42\nevent: bar\ndata: foo\n\n"
483 );
484 }
485
486 #[test]
487 fn retry_is_first_msg() {
488 let waker = noop_waker();
489 let mut cx = Context::from_waker(&waker);
490
491 let mut sse = Sse::from_stream(InfallibleStream::new(tokio_stream::empty()))
492 .with_retry_duration(Duration::from_millis(42));
493 match Pin::new(&mut sse).poll_next(&mut cx) {
494 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"),
495 res => panic!("poll should return retry message, got {res:?}"),
496 }
497 }
498
499 #[actix_web::test]
500 async fn sse_from_external_streams() {
501 let st = stream::empty::<Result<_, Infallible>>();
502 let sse = Sse::from_stream(st);
503 assert_eq!(body::to_bytes(sse).await.unwrap(), "");
504
505 let st = stream::once(async { Ok::<_, Infallible>(Event::Data(Data::new("foo"))) });
506 let sse = Sse::from_stream(st);
507 assert_eq!(body::to_bytes(sse).await.unwrap(), "data: foo\n\n");
508
509 let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2);
510 let sse = Sse::from_stream(st);
511 assert_eq!(
512 body::to_bytes(sse).await.unwrap(),
513 "data: foo\n\ndata: foo\n\n",
514 );
515 }
516
517 #[actix_web::test]
518 async fn appropriate_headers_are_set_on_responder() {
519 let st = stream::empty::<Result<_, Infallible>>();
520 let sse = Sse::from_stream(st);
521
522 let res = sse.respond_to(&TestRequest::default().to_http_request());
523
524 assert_response_matches!(res, OK;
525 "content-type" => "text/event-stream"
526 "content-encoding" => "identity"
527 "cache-control" => "no-cache"
528 );
529 }
530
531 #[actix_web::test]
532 async fn messages_are_received_from_sender() {
533 let (sender, receiver) = tokio::sync::mpsc::channel(2);
534 let mut sse = Sse::from_infallible_receiver(receiver);
535
536 assert!(
537 poll_fn(|cx| Pin::new(&mut sse).poll_next(cx))
538 .now_or_never()
539 .is_none()
540 );
541
542 sender
543 .send(Data::new("bar").event("foo").into())
544 .await
545 .unwrap();
546
547 match poll_fn(|cx| Pin::new(&mut sse).poll_next(cx)).now_or_never() {
548 Some(Some(Ok(bytes))) => assert_eq!(bytes, "event: foo\ndata: bar\n\n"),
549 res => panic!("poll should return data message, got {res:?}"),
550 }
551 }
552
553 #[actix_web::test]
554 async fn keep_alive_is_sent() {
555 let waker = noop_waker();
556 let mut cx = Context::from_waker(&waker);
557
558 let (sender, receiver) = tokio::sync::mpsc::channel(2);
559 let mut sse =
560 Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_millis(4));
561
562 assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
563
564 sleep(Duration::from_millis(20)).await;
565
566 match Pin::new(&mut sse).poll_next(&mut cx) {
567 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, ": keep-alive\n\n"),
568 res => panic!("poll should return data message, got {res:?}"),
569 }
570
571 assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
572
573 sender.send(Data::new("foo").into()).await.unwrap();
574
575 match Pin::new(&mut sse).poll_next(&mut cx) {
576 Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "data: foo\n\n"),
577 res => panic!("poll should return data message, got {res:?}"),
578 }
579 }
580}