event_stream_processor/
lib.rs

1//! <https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format>
2
3use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt};
4use tokio_stream::wrappers::ReceiverStream;
5
6#[derive(Clone, Default, Debug, PartialEq, Eq)]
7pub struct Message {
8    /// A string identifying the type of event described. If this is specified, an event will be
9    /// dispatched on the browser to the listener for the specified event name; the website source
10    /// code should use addEventListener() to listen for named events. The onmessage handler is
11    /// called if no event name is specified for a message.
12    pub event: Option<String>,
13    pub data: Option<String>,
14
15    /// the event ID
16    pub id: Option<String>,
17
18    /// the number of milliseconds to wait before attempting to send the event
19    pub retry: Option<u64>,
20}
21
22pub fn get_messages(
23    bytes: impl AsyncBufRead + Send + 'static + Unpin,
24) -> ReceiverStream<anyhow::Result<Message>> {
25    let (tx, rx) = tokio::sync::mpsc::channel(100);
26    tokio::spawn(async move {
27        let mut message = Message::default();
28        let mut lines = bytes.lines();
29        while let Some(Ok(s)) = lines.next().await {
30            if s.is_empty() {
31                let message = std::mem::take(&mut message);
32                if tx.send(Ok(message)).await.is_err() {
33                    return;
34                }
35                continue;
36            }
37            let Some((k, v)) = s.split_once(':') else {
38                let _ = tx.send(Err(anyhow::anyhow!("Invalid line: {}", s))).await;
39                return;
40            };
41            let v = v.trim();
42            match k.trim() {
43                "event" => message.event = Some(v.to_string()),
44                "data" => message.data = Some(v.to_string()),
45                "id" => message.id = Some(v.to_string()),
46                "retry" => match v.parse() {
47                    Ok(v) => message.retry = Some(v),
48                    Err(e) => {
49                        let _ = tx
50                            .send(Err(anyhow::anyhow!("Invalid retry value: {}", e)))
51                            .await;
52                        return;
53                    }
54                },
55                _ => {
56                    let _ = tx.send(Err(anyhow::anyhow!("Invalid line: {}", s))).await;
57                    return;
58                }
59            }
60        }
61
62        if message != Message::default() {
63            let _ = tx.send(Ok(message)).await;
64        }
65    });
66
67    rx.into()
68}
69
70#[cfg(test)]
71mod tests {
72    use assert_matches::assert_matches;
73    use bytes::Bytes;
74    use futures::TryStreamExt;
75    use tokio::sync::mpsc::channel;
76    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
77
78    use super::*;
79
80    #[tokio::test]
81    async fn test_correct_messages() {
82        let (tx, rx) = channel(100);
83        let stream = ReceiverStream::new(rx);
84        let stream = stream.map(Ok).into_async_read();
85        let mut messages = get_messages(stream);
86        tokio::spawn(async move {
87            tx.send(Bytes::from("event: foo\n")).await.unwrap();
88            tx.send(Bytes::from("data: bar\n")).await.unwrap();
89            tx.send(Bytes::from("\n")).await.unwrap();
90            tx.send(Bytes::from("event: foo\n")).await.unwrap();
91            tx.send(Bytes::from("data: bar\n")).await.unwrap();
92            tx.send(Bytes::from("id: 1")).await.unwrap();
93        });
94        let message = messages.next().await.unwrap().unwrap();
95        assert_eq!(message.event, Some("foo".to_string()));
96        assert_eq!(message.data, Some("bar".to_string()));
97
98        let message = messages.next().await.unwrap().unwrap();
99        assert_eq!(message.event, Some("foo".to_string()));
100        assert_eq!(message.data, Some("bar".to_string()));
101        assert_eq!(message.id, Some("1".to_string()));
102    }
103
104    #[tokio::test]
105    async fn test_incorrect_messages() {
106        let (tx, rx) = channel(100);
107        let stream = ReceiverStream::new(rx);
108        let stream = stream.map(Ok).into_async_read();
109        let mut messages = get_messages(stream);
110
111        tokio::spawn(async move {
112            tx.send(Bytes::from("event: foo\n")).await.unwrap();
113            tx.send(Bytes::from("data: bar\n")).await.unwrap();
114            tx.send(Bytes::from("id: 1\n")).await.unwrap();
115            tx.send(Bytes::from("retry: 1\n")).await.unwrap();
116            tx.send(Bytes::from("foo: bar\n")).await.unwrap();
117        });
118
119        assert_matches!(messages.next().await.unwrap(), Err(..));
120    }
121}