event_stream_processor/
lib.rs1use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt};
4use tokio_stream::wrappers::ReceiverStream;
5
6#[derive(Clone, Default, Debug, PartialEq, Eq)]
7pub struct Message {
8 pub event: Option<String>,
13 pub data: Option<String>,
14
15 pub id: Option<String>,
17
18 pub retry: Option<u64>,
20}
21
22pub fn get_messages(
23 bytes: impl AsyncBufRead + Send + 'static + Unpin,
24) -> ReceiverStream<anyhow::Result<Message>> {
25 let (tx, rx) = tokio::sync::mpsc::channel(100);
26 tokio::spawn(async move {
27 let mut message = Message::default();
28 let mut lines = bytes.lines();
29 while let Some(Ok(s)) = lines.next().await {
30 if s.is_empty() {
31 let message = std::mem::take(&mut message);
32 if tx.send(Ok(message)).await.is_err() {
33 return;
34 }
35 continue;
36 }
37 let Some((k, v)) = s.split_once(':') else {
38 let _ = tx.send(Err(anyhow::anyhow!("Invalid line: {}", s))).await;
39 return;
40 };
41 let v = v.trim();
42 match k.trim() {
43 "event" => message.event = Some(v.to_string()),
44 "data" => message.data = Some(v.to_string()),
45 "id" => message.id = Some(v.to_string()),
46 "retry" => match v.parse() {
47 Ok(v) => message.retry = Some(v),
48 Err(e) => {
49 let _ = tx
50 .send(Err(anyhow::anyhow!("Invalid retry value: {}", e)))
51 .await;
52 return;
53 }
54 },
55 _ => {
56 let _ = tx.send(Err(anyhow::anyhow!("Invalid line: {}", s))).await;
57 return;
58 }
59 }
60 }
61
62 if message != Message::default() {
63 let _ = tx.send(Ok(message)).await;
64 }
65 });
66
67 rx.into()
68}
69
70#[cfg(test)]
71mod tests {
72 use assert_matches::assert_matches;
73 use bytes::Bytes;
74 use futures::TryStreamExt;
75 use tokio::sync::mpsc::channel;
76 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
77
78 use super::*;
79
80 #[tokio::test]
81 async fn test_correct_messages() {
82 let (tx, rx) = channel(100);
83 let stream = ReceiverStream::new(rx);
84 let stream = stream.map(Ok).into_async_read();
85 let mut messages = get_messages(stream);
86 tokio::spawn(async move {
87 tx.send(Bytes::from("event: foo\n")).await.unwrap();
88 tx.send(Bytes::from("data: bar\n")).await.unwrap();
89 tx.send(Bytes::from("\n")).await.unwrap();
90 tx.send(Bytes::from("event: foo\n")).await.unwrap();
91 tx.send(Bytes::from("data: bar\n")).await.unwrap();
92 tx.send(Bytes::from("id: 1")).await.unwrap();
93 });
94 let message = messages.next().await.unwrap().unwrap();
95 assert_eq!(message.event, Some("foo".to_string()));
96 assert_eq!(message.data, Some("bar".to_string()));
97
98 let message = messages.next().await.unwrap().unwrap();
99 assert_eq!(message.event, Some("foo".to_string()));
100 assert_eq!(message.data, Some("bar".to_string()));
101 assert_eq!(message.id, Some("1".to_string()));
102 }
103
104 #[tokio::test]
105 async fn test_incorrect_messages() {
106 let (tx, rx) = channel(100);
107 let stream = ReceiverStream::new(rx);
108 let stream = stream.map(Ok).into_async_read();
109 let mut messages = get_messages(stream);
110
111 tokio::spawn(async move {
112 tx.send(Bytes::from("event: foo\n")).await.unwrap();
113 tx.send(Bytes::from("data: bar\n")).await.unwrap();
114 tx.send(Bytes::from("id: 1\n")).await.unwrap();
115 tx.send(Bytes::from("retry: 1\n")).await.unwrap();
116 tx.send(Bytes::from("foo: bar\n")).await.unwrap();
117 });
118
119 assert_matches!(messages.next().await.unwrap(), Err(..));
120 }
121}