Skip to main content

modo/sse/
sender.rs

1use super::event::Event;
2use crate::error::Error;
3use tokio::sync::mpsc;
4
5/// Imperative event sender for [`Broadcaster::channel()`](super::Broadcaster::channel) closures.
6///
7/// When the client disconnects, [`send()`](Self::send) returns an error.
8/// Use this as a signal to stop producing events.
9pub struct Sender {
10    pub(super) tx: mpsc::Sender<Event>,
11}
12
13impl Sender {
14    /// Send an event to the connected client.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the client has disconnected (the response stream
19    /// was dropped).
20    pub async fn send(&self, event: Event) -> Result<(), Error> {
21        self.tx
22            .send(event)
23            .await
24            .map_err(|_| Error::internal("SSE client disconnected"))
25    }
26}
27
28#[cfg(test)]
29mod tests {
30    use super::*;
31    use tokio::sync::mpsc;
32
33    #[tokio::test]
34    async fn send_delivers_event() {
35        let (tx, mut rx) = mpsc::channel(16);
36        let sender = Sender { tx };
37        let event = super::super::Event::new("id1", "test")
38            .unwrap()
39            .data("hello");
40        sender.send(event).await.unwrap();
41
42        let received = rx.recv().await.unwrap();
43        assert_eq!(received.id, "id1");
44        assert_eq!(received.event, "test");
45        assert_eq!(received.data.as_deref(), Some("hello"));
46    }
47
48    #[tokio::test]
49    async fn send_returns_error_when_receiver_dropped() {
50        let (tx, rx) = mpsc::channel(16);
51        let sender = Sender { tx };
52        drop(rx);
53
54        let event = super::super::Event::new("id1", "test")
55            .unwrap()
56            .data("hello");
57        let result = sender.send(event).await;
58        assert!(result.is_err());
59    }
60}