trillium_sse/
lib.rs

1/*!
2# Trillium tools for server sent events
3
4This primarily provides [`SseConnExt`](crate::SseConnExt), an
5extension trait for [`trillium::Conn`] that has a
6[`with_sse_stream`](crate::SseConnExt::with_sse_stream) chainable
7method that takes a [`Stream`](futures_lite::Stream) where the `Item`
8implements [`Eventable`].
9
10Often, you will want this stream to be something like a channel, but
11the specifics of that are dependent on the event fanout
12characteristics of your application.
13
14This crate implements [`Eventable`] for an [`Event`] type that you can
15use in your application, for `String`, and for `&'static str`. You can
16also implement [`Eventable`] for any type in your application.
17
18## Example usage
19
20```
21use broadcaster::BroadcastChannel;
22use trillium::{conn_try, conn_unwrap, log_error, Conn, Method, State};
23use trillium_sse::SseConnExt;
24use trillium_static_compiled::static_compiled;
25
26type Channel = BroadcastChannel<String>;
27
28fn get_sse(mut conn: Conn) -> Conn {
29    let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
30    conn.with_sse_stream(broadcaster)
31}
32
33async fn post_broadcast(mut conn: Conn) -> Conn {
34    let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
35    let body = conn_try!(conn.request_body_string().await, conn);
36    log_error!(broadcaster.send(&body).await);
37    conn.ok("sent")
38}
39
40fn main() {
41    let handler = (
42        static_compiled!("examples/static").with_index_file("index.html"),
43        State::new(Channel::new()),
44        |conn: Conn| async move {
45            match (conn.method(), conn.path()) {
46                (Method::Get, "/sse") => get_sse(conn),
47                (Method::Post, "/broadcast") => post_broadcast(conn).await,
48                _ => conn,
49            }
50        },
51    );
52
53    // trillium_smol::run(handler);
54}
55
56```
57*/
58#![forbid(unsafe_code)]
59#![deny(
60    missing_copy_implementations,
61    rustdoc::missing_crate_level_docs,
62    missing_debug_implementations,
63    nonstandard_style,
64    unused_qualifications
65)]
66#![warn(missing_docs)]
67
68use futures_lite::{stream::Stream, AsyncRead};
69use std::{
70    borrow::Cow,
71    fmt::Write,
72    io,
73    marker::PhantomData,
74    pin::Pin,
75    task::{Context, Poll},
76};
77use trillium::{Body, Conn, KnownHeaderName, Status};
78
79struct SseBody<S, E> {
80    stream: S,
81    buffer: Vec<u8>,
82    event: PhantomData<E>,
83}
84
85impl<S, E> SseBody<S, E>
86where
87    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
88    E: Eventable,
89{
90    pub fn new(stream: S) -> Self {
91        Self {
92            stream,
93            buffer: Vec::new(),
94            event: PhantomData,
95        }
96    }
97}
98
99fn encode(event: impl Eventable) -> String {
100    let mut output = String::new();
101    if let Some(event_type) = event.event_type() {
102        writeln!(&mut output, "event: {event_type}").unwrap();
103    }
104
105    if let Some(id) = event.id() {
106        writeln!(&mut output, "id: {id}").unwrap();
107    }
108
109    for part in event.data().lines() {
110        writeln!(&mut output, "data: {part}").unwrap();
111    }
112
113    writeln!(output).unwrap();
114
115    output
116}
117
118impl<S, E> AsyncRead for SseBody<S, E>
119where
120    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
121    E: Eventable,
122{
123    fn poll_read(
124        self: Pin<&mut Self>,
125        cx: &mut Context<'_>,
126        buf: &mut [u8],
127    ) -> Poll<io::Result<usize>> {
128        let Self { buffer, stream, .. } = self.get_mut();
129
130        let buffer_read = buffer.len().min(buf.len());
131        if buffer_read > 0 {
132            buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
133            buffer.drain(0..buffer_read);
134            return Poll::Ready(Ok(buffer_read));
135        }
136
137        match Pin::new(stream).poll_next(cx) {
138            Poll::Pending => Poll::Pending,
139            Poll::Ready(Some(item)) => {
140                let data = encode(item).into_bytes();
141                let writable_len = data.len().min(buf.len());
142                buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
143                if writable_len < data.len() {
144                    buffer.extend_from_slice(&data[writable_len..]);
145                }
146                Poll::Ready(Ok(writable_len))
147            }
148
149            Poll::Ready(None) => Poll::Ready(Ok(0)),
150        }
151    }
152}
153
154impl<S, E> From<SseBody<S, E>> for Body
155where
156    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
157    E: Eventable,
158{
159    fn from(sse_body: SseBody<S, E>) -> Self {
160        Body::new_streaming(sse_body, None)
161    }
162}
163
164/**
165Extension trait for server sent events
166*/
167pub trait SseConnExt {
168    /**
169    builds and sets a streaming response body that conforms to the
170    [server-sent-events
171    spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)
172    from a Stream of any [`Eventable`](crate::Eventable) type (such as
173    [`Event`](crate::Event), as well as setting appropiate headers for
174    this response.
175    */
176    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
177    where
178        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
179        E: Eventable;
180}
181
182impl SseConnExt for Conn {
183    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
184    where
185        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
186        E: Eventable,
187    {
188        let body = SseBody::new(self.inner().stopper().stop_stream(sse_stream));
189        self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
190            .with_response_header(KnownHeaderName::CacheControl, "no-cache")
191            .with_body(body)
192            .with_status(Status::Ok)
193            .halt()
194    }
195}
196
197/**
198A trait that allows any Unpin + Send + Sync type to act as an event.
199
200For a concrete implementation of this trait, you can use [`Event`],
201but it is also implemented for [`String`] and [`&'static str`].
202*/
203
204pub trait Eventable: Unpin + Send + Sync + 'static {
205    /// return the data for this event. non-optional.
206    fn data(&self) -> &str;
207
208    /// return the event type, optionally
209    fn event_type(&self) -> Option<&str> {
210        None
211    }
212
213    /// return a unique event id, optionally
214    fn id(&self) -> Option<&str> {
215        None
216    }
217}
218
219impl Eventable for Event {
220    fn data(&self) -> &str {
221        Event::data(self)
222    }
223
224    fn event_type(&self) -> Option<&str> {
225        Event::event_type(self)
226    }
227}
228
229impl Eventable for &'static str {
230    fn data(&self) -> &str {
231        self
232    }
233}
234
235impl Eventable for String {
236    fn data(&self) -> &str {
237        self
238    }
239}
240
241/**
242Events are a concrete implementation of the [`Eventable`] trait.
243*/
244#[derive(Debug, Clone, Eq, PartialEq)]
245pub struct Event {
246    data: Cow<'static, str>,
247    event_type: Option<Cow<'static, str>>,
248}
249
250impl From<&'static str> for Event {
251    fn from(s: &'static str) -> Self {
252        Self::from(Cow::Borrowed(s))
253    }
254}
255
256impl From<String> for Event {
257    fn from(s: String) -> Self {
258        Self::from(Cow::Owned(s))
259    }
260}
261
262impl From<Cow<'static, str>> for Event {
263    fn from(data: Cow<'static, str>) -> Self {
264        Event {
265            data,
266            event_type: None,
267        }
268    }
269}
270
271impl Event {
272    /**
273    builds a new [`Event`]
274
275    by default, this event has no event type. to set an event type,
276    use [`Event::with_type`] or [`Event::set_type`]
277    */
278    pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
279        Self::from(data.into())
280    }
281
282    /**
283    chainable constructor to set the type on an event
284
285    ```
286    let event = trillium_sse::Event::new("event data").with_type("userdata");
287    assert_eq!(event.event_type(), Some("userdata"));
288    assert_eq!(event.data(), "event data");
289    ```
290    */
291    pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
292        self.set_type(event_type);
293        self
294    }
295
296    /**
297    set the event type for this Event. The default is None.
298
299    ```
300    let mut event = trillium_sse::Event::new("event data");
301    assert_eq!(event.event_type(), None);
302    event.set_type("userdata");
303    assert_eq!(event.event_type(), Some("userdata"));
304    ```
305     */
306    pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
307        self.event_type = Some(event_type.into());
308    }
309
310    /// returns this Event's data as a &str
311    pub fn data(&self) -> &str {
312        &self.data
313    }
314
315    /// returns this Event's type as a str, if set
316    pub fn event_type(&self) -> Option<&str> {
317        self.event_type.as_deref()
318    }
319}