1pub mod error;
32
33use std::{pin::Pin, time::Duration};
34
35use async_stream::try_stream;
36use reqwest::{
37 Response, StatusCode,
38 header::{CONTENT_TYPE, HeaderValue},
39};
40use tokio::io::AsyncBufReadExt;
41use tokio_stream::{Stream, StreamExt};
42use tokio_util::io::StreamReader;
43
44use crate::error::{EventError, EventSourceError};
45
46pub static MIME_EVENT_STREAM: HeaderValue = HeaderValue::from_static("text/event-stream");
48
49struct EventBuffer {
55 event_type: String,
56 data: String,
57 last_event_id: Option<String>,
58 retry: Option<Duration>,
59}
60
61impl EventBuffer {
62 #[allow(clippy::new_without_default)]
64 fn new() -> Self {
65 Self {
66 event_type: String::new(),
67 data: String::new(),
68 last_event_id: None,
69 retry: None,
70 }
71 }
72
73 fn produce_event(&mut self) -> Option<Event> {
77 let event = if self.data.is_empty() {
78 None
79 } else {
80 Some(Event {
81 event_type: if self.event_type.is_empty() {
82 "message".to_string()
83 } else {
84 self.event_type.clone()
85 },
86 data: self.data.to_string(),
87 last_event_id: self.last_event_id.clone(),
88 retry: self.retry,
89 })
90 };
91
92 self.event_type.clear();
93 self.data.clear();
94
95 event
96 }
97
98 fn set_event_type(&mut self, event_type: &str) {
100 self.event_type.clear();
101 self.event_type.push_str(event_type);
102 }
103
104 fn push_data(&mut self, data: &str) {
106 if !self.data.is_empty() {
107 self.data.push('\n');
108 }
109 self.data.push_str(data);
110 }
111
112 fn set_id(&mut self, id: &str) {
113 self.last_event_id = Some(id.to_string());
114 }
115
116 fn set_retry(&mut self, retry: Duration) {
117 self.retry = Some(retry);
118 }
119}
120
121fn parse_line(line: &str) -> (&str, &str) {
123 let (field, value) = line.split_once(':').unwrap_or((line, ""));
124 let value = value.strip_prefix(' ').unwrap_or(value);
125 (field, value)
126}
127
128#[derive(Debug, Clone, Eq, PartialEq)]
130pub struct Event {
131 pub event_type: String,
133 pub data: String,
135 pub last_event_id: Option<String>,
137 pub retry: Option<Duration>,
139}
140
141pub trait EventSource {
143 fn events(
154 self,
155 ) -> impl Future<
156 Output = Result<Pin<Box<impl Stream<Item = Result<Event, EventError>>>>, EventSourceError>,
157 > + Send;
158}
159
160impl EventSource for Response {
161 async fn events(
162 self,
163 ) -> Result<Pin<Box<impl Stream<Item = Result<Event, EventError>>>>, EventSourceError> {
164 let status = self.status();
165 if status != StatusCode::OK {
166 return Err(EventSourceError::BadStatus(status));
167 }
168 let content_type = self.headers().get(CONTENT_TYPE);
169 if content_type != Some(&MIME_EVENT_STREAM) {
170 return Err(EventSourceError::BadContentType(content_type.cloned()));
171 }
172
173 let mut stream = StreamReader::new(
174 self.bytes_stream()
175 .map(|result| result.map_err(std::io::Error::other)),
176 );
177
178 let mut line_buffer = String::new();
179 let mut event_buffer = EventBuffer::new();
180
181 let stream = Box::pin(try_stream! {
182 loop {
183 line_buffer.clear();
184 let count = stream.read_line(&mut line_buffer).await.map_err(EventError::IoError)?;
185 if count == 0 {
186 break;
187 }
188 let line = if let Some(line) = line_buffer.strip_suffix('\n') {
189 line
190 } else {
191 &line_buffer
192 };
193
194 if line.is_empty() {
196 if let Some(event) = event_buffer.produce_event() {
197 yield event;
198 }
199 continue;
200 }
201
202 let (field, value) = parse_line(line);
203
204 match field {
205 "event" => {
206 event_buffer.set_event_type(value);
207 }
208 "data" => {
209 event_buffer.push_data(value);
210 }
211 "id" => {
212 event_buffer.set_id(value);
213 }
214 "retry" => {
215 if let Ok(millis) = value.parse() {
216 event_buffer.set_retry(Duration::from_millis(millis));
217 }
218 }
219 _ => {}
220 }
221 }
222 });
223
224 Ok(stream)
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn parse_line_properly() {
234 let (field, value) = parse_line("event: message");
235 assert_eq!(field, "event");
236 assert_eq!(value, "message");
237
238 let (field, value) = parse_line("non-standard field");
239 assert_eq!(field, "non-standard field");
240 assert_eq!(value, "");
241
242 let (field, value) = parse_line("data:data with : inside");
243 assert_eq!(field, "data");
244 assert_eq!(value, "data with : inside");
245 }
246}