micro_web/responder/
sse.rs

1use crate::responder::Responder;
2use crate::{RequestContext, ResponseBody};
3use bytes::Bytes;
4use futures::channel::mpsc::{SendError, channel};
5use futures::{Sink, SinkExt, Stream, StreamExt};
6use http::{HeaderValue, Response, StatusCode};
7use http_body::Frame;
8use http_body_util::StreamBody;
9use std::time::Duration;
10
11pub struct SseStream<S> {
12    stream: S,
13}
14
15pub struct SseEmitter<S> {
16    sink: S,
17}
18
19impl<S> SseStream<S>
20where
21    S: Stream<Item = Event>,
22{
23    fn new(stream: S) -> Self {
24        SseStream { stream }
25    }
26}
27
28impl<S> SseEmitter<S>
29where
30    S: Sink<Event, Error = SendError>,
31{
32    fn new(sink: S) -> Self {
33        SseEmitter { sink }
34    }
35}
36
37impl<S> SseEmitter<S>
38where
39    S: Sink<Event, Error = SendError> + Unpin,
40{
41    pub async fn send(&mut self, event: Event) -> Result<(), SendError> {
42        self.sink.send(event).await
43    }
44
45    pub async fn close(&mut self) -> Result<(), SendError> {
46        self.sink.close().await
47    }
48}
49
50pub fn build_sse_stream_emitter(buffer: usize) -> (SseStream<impl Stream<Item = Event>>, SseEmitter<impl Sink<Event, Error = SendError>>) {
51    let (sender, receiver) = channel::<Event>(buffer);
52    (SseStream::new(receiver), SseEmitter::new(sender))
53}
54
55pub enum Event {
56    Retry(Duration),
57    Message(Message),
58}
59
60pub struct Message {
61    // https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-last-event-id
62    pub id: Option<String>,
63    pub name: Option<String>,
64    // the message data
65    pub data: String,
66}
67
68impl Event {
69    pub fn message(data: String, id: Option<String>, name: Option<String>) -> Event {
70        Event::Message(Message { id, name, data })
71    }
72
73    pub fn from_data(data: String) -> Event {
74        Event::Message(Message { id: None, name: None, data })
75    }
76
77    pub fn retry(duration: impl Into<Duration>) -> Event {
78        Event::Retry(duration.into())
79    }
80}
81
82impl<S> Responder for SseStream<S>
83where
84    S: Stream<Item = Event> + Send + 'static,
85{
86    fn response_to(self, _req: &RequestContext) -> Response<ResponseBody> {
87        let mut builder = Response::builder();
88        let headers = builder.headers_mut().unwrap();
89        headers.reserve(16);
90        headers.insert(http::header::CONTENT_TYPE, mime::TEXT_EVENT_STREAM.as_ref().parse().unwrap());
91        headers.insert(http::header::CACHE_CONTROL, HeaderValue::from_static("no-cache"));
92        headers.insert(http::header::CONNECTION, HeaderValue::from_static("keep-alive"));
93
94        let event_stream = self.stream.map(|event| match event {
95            Event::Message(Message { id, name, data }) => {
96                let mut string = String::with_capacity(data.len());
97
98                if let Some(i) = id {
99                    string.push_str(&format!("id: {}\n", i));
100                }
101
102                if let Some(n) = name {
103                    string.push_str(&format!("event: {}\n", n));
104                }
105
106                let split = data.lines();
107
108                for s in split {
109                    string.push_str(&format!("data: {}\n", s));
110                }
111
112                string.push('\n');
113                Ok(Frame::data(Bytes::from(string)))
114            }
115            Event::Retry(duration) => Ok(Frame::data(Bytes::from(format!("retry: {}\n\n", duration.as_millis())))),
116        });
117
118        let stream_body = StreamBody::new(event_stream);
119
120        builder.status(StatusCode::OK).body(ResponseBody::stream(stream_body)).unwrap()
121    }
122}