micro_web/responder/
sse.rs1use crate::responder::Responder;
2use crate::{RequestContext, ResponseBody};
3use bytes::Bytes;
4use futures::channel::mpsc::{SendError, channel};
5use futures::{Sink, SinkExt, Stream, StreamExt};
6use http::{HeaderValue, Response, StatusCode};
7use http_body::Frame;
8use http_body_util::StreamBody;
9use std::time::Duration;
10
11#[derive(Debug)]
12pub struct SseStream<S> {
13 stream: S,
14}
15
16#[derive(Debug)]
17pub struct SseEmitter<S> {
18 sink: S,
19}
20
21impl<S> SseStream<S>
22where
23 S: Stream<Item = Event>,
24{
25 fn new(stream: S) -> Self {
26 SseStream { stream }
27 }
28}
29
30impl<S> SseEmitter<S>
31where
32 S: Sink<Event, Error = SendError>,
33{
34 fn new(sink: S) -> Self {
35 SseEmitter { sink }
36 }
37}
38
39impl<S> SseEmitter<S>
40where
41 S: Sink<Event, Error = SendError> + Unpin,
42{
43 pub async fn send(&mut self, event: Event) -> Result<(), SendError> {
44 self.sink.send(event).await
45 }
46
47 pub async fn close(&mut self) -> Result<(), SendError> {
48 self.sink.close().await
49 }
50}
51
52pub fn build_sse_stream_emitter(buffer: usize) -> (SseStream<impl Stream<Item = Event>>, SseEmitter<impl Sink<Event, Error = SendError>>) {
53 let (sender, receiver) = channel::<Event>(buffer);
54 (SseStream::new(receiver), SseEmitter::new(sender))
55}
56
57#[derive(Debug)]
58pub enum Event {
59 Retry(Duration),
60 Message(Message),
61}
62
63#[derive(Debug)]
64pub struct Message {
65 pub id: Option<String>,
67 pub name: Option<String>,
68 pub data: String,
70}
71
72impl Event {
73 pub fn message(data: String, id: Option<String>, name: Option<String>) -> Event {
74 Event::Message(Message { id, name, data })
75 }
76
77 pub fn from_data(data: String) -> Event {
78 Event::Message(Message { id: None, name: None, data })
79 }
80
81 pub fn retry(duration: impl Into<Duration>) -> Event {
82 Event::Retry(duration.into())
83 }
84}
85
86impl<S> Responder for SseStream<S>
87where
88 S: Stream<Item = Event> + Send + 'static,
89{
90 fn response_to(self, _req: &RequestContext) -> Response<ResponseBody> {
91 let mut builder = Response::builder();
92 let headers = builder.headers_mut().unwrap();
93 headers.reserve(16);
94 headers.insert(http::header::CONTENT_TYPE, mime::TEXT_EVENT_STREAM.as_ref().parse().unwrap());
95 headers.insert(http::header::CACHE_CONTROL, HeaderValue::from_static("no-cache"));
96 headers.insert(http::header::CONNECTION, HeaderValue::from_static("keep-alive"));
97
98 let event_stream = self.stream.map(|event| match event {
99 Event::Message(Message { id, name, data }) => {
100 let mut string = String::with_capacity(data.len());
101
102 if let Some(i) = id {
103 string.push_str(&format!("id: {}\n", i));
104 }
105
106 if let Some(n) = name {
107 string.push_str(&format!("event: {}\n", n));
108 }
109
110 let split = data.lines();
111
112 for s in split {
113 string.push_str(&format!("data: {}\n", s));
114 }
115
116 string.push('\n');
117 Ok(Frame::data(Bytes::from(string)))
118 }
119 Event::Retry(duration) => Ok(Frame::data(Bytes::from(format!("retry: {}\n\n", duration.as_millis())))),
120 });
121
122 let stream_body = StreamBody::new(event_stream);
123
124 builder.status(StatusCode::OK).body(ResponseBody::stream(stream_body)).unwrap()
125 }
126}