1#![forbid(unsafe_code)]
59#![deny(
60 missing_copy_implementations,
61 rustdoc::missing_crate_level_docs,
62 missing_debug_implementations,
63 nonstandard_style,
64 unused_qualifications
65)]
66#![warn(missing_docs)]
67
68use futures_lite::{stream::Stream, AsyncRead};
69use std::{
70 borrow::Cow,
71 fmt::Write,
72 io,
73 marker::PhantomData,
74 pin::Pin,
75 task::{Context, Poll},
76};
77use trillium::{Body, Conn, KnownHeaderName, Status};
78
79struct SseBody<S, E> {
80 stream: S,
81 buffer: Vec<u8>,
82 event: PhantomData<E>,
83}
84
85impl<S, E> SseBody<S, E>
86where
87 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
88 E: Eventable,
89{
90 pub fn new(stream: S) -> Self {
91 Self {
92 stream,
93 buffer: Vec::new(),
94 event: PhantomData,
95 }
96 }
97}
98
99fn encode(event: impl Eventable) -> String {
100 let mut output = String::new();
101 if let Some(event_type) = event.event_type() {
102 writeln!(&mut output, "event: {event_type}").unwrap();
103 }
104
105 if let Some(id) = event.id() {
106 writeln!(&mut output, "id: {id}").unwrap();
107 }
108
109 for part in event.data().lines() {
110 writeln!(&mut output, "data: {part}").unwrap();
111 }
112
113 writeln!(output).unwrap();
114
115 output
116}
117
118impl<S, E> AsyncRead for SseBody<S, E>
119where
120 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
121 E: Eventable,
122{
123 fn poll_read(
124 self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 buf: &mut [u8],
127 ) -> Poll<io::Result<usize>> {
128 let Self { buffer, stream, .. } = self.get_mut();
129
130 let buffer_read = buffer.len().min(buf.len());
131 if buffer_read > 0 {
132 buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
133 buffer.drain(0..buffer_read);
134 return Poll::Ready(Ok(buffer_read));
135 }
136
137 match Pin::new(stream).poll_next(cx) {
138 Poll::Pending => Poll::Pending,
139 Poll::Ready(Some(item)) => {
140 let data = encode(item).into_bytes();
141 let writable_len = data.len().min(buf.len());
142 buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
143 if writable_len < data.len() {
144 buffer.extend_from_slice(&data[writable_len..]);
145 }
146 Poll::Ready(Ok(writable_len))
147 }
148
149 Poll::Ready(None) => Poll::Ready(Ok(0)),
150 }
151 }
152}
153
154impl<S, E> From<SseBody<S, E>> for Body
155where
156 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
157 E: Eventable,
158{
159 fn from(sse_body: SseBody<S, E>) -> Self {
160 Body::new_streaming(sse_body, None)
161 }
162}
163
164pub trait SseConnExt {
168 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
177 where
178 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
179 E: Eventable;
180}
181
182impl SseConnExt for Conn {
183 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
184 where
185 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
186 E: Eventable,
187 {
188 let body = SseBody::new(self.inner().stopper().stop_stream(sse_stream));
189 self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
190 .with_response_header(KnownHeaderName::CacheControl, "no-cache")
191 .with_body(body)
192 .with_status(Status::Ok)
193 .halt()
194 }
195}
196
197pub trait Eventable: Unpin + Send + Sync + 'static {
205 fn data(&self) -> &str;
207
208 fn event_type(&self) -> Option<&str> {
210 None
211 }
212
213 fn id(&self) -> Option<&str> {
215 None
216 }
217}
218
219impl Eventable for Event {
220 fn data(&self) -> &str {
221 Event::data(self)
222 }
223
224 fn event_type(&self) -> Option<&str> {
225 Event::event_type(self)
226 }
227}
228
229impl Eventable for &'static str {
230 fn data(&self) -> &str {
231 self
232 }
233}
234
235impl Eventable for String {
236 fn data(&self) -> &str {
237 self
238 }
239}
240
241#[derive(Debug, Clone, Eq, PartialEq)]
245pub struct Event {
246 data: Cow<'static, str>,
247 event_type: Option<Cow<'static, str>>,
248}
249
250impl From<&'static str> for Event {
251 fn from(s: &'static str) -> Self {
252 Self::from(Cow::Borrowed(s))
253 }
254}
255
256impl From<String> for Event {
257 fn from(s: String) -> Self {
258 Self::from(Cow::Owned(s))
259 }
260}
261
262impl From<Cow<'static, str>> for Event {
263 fn from(data: Cow<'static, str>) -> Self {
264 Event {
265 data,
266 event_type: None,
267 }
268 }
269}
270
271impl Event {
272 pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
279 Self::from(data.into())
280 }
281
282 pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
292 self.set_type(event_type);
293 self
294 }
295
296 pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
307 self.event_type = Some(event_type.into());
308 }
309
310 pub fn data(&self) -> &str {
312 &self.data
313 }
314
315 pub fn event_type(&self) -> Option<&str> {
317 self.event_type.as_deref()
318 }
319}