reqwest_sse/
lib.rs

1//! # `reqwest-sse`
2//!
3//!  `reqwest-sse` is a lightweight Rust library that extends
4//! [reqwest](https://docs.rs/reqwest) by adding native support for handling
5//! [Server-Sent Events (SSE)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
6//! . It introduces the [`EventSource`] trait, which enhances reqwest's [`Response`]
7//! type with an ergonomic `.events()` method. This method transforms the
8//! response body into an asynchronous [Stream] of SSE [`Event`]s, enabling
9//! seamless integration of real-time event handling in applications
10//! using the familiar reqwest HTTP client and the [`StreamExt`] API.
11//!
12//! ## Example
13//!
14//! ```rust,no_run
15//! use tokio_stream::StreamExt;
16//!
17//! use reqwest_sse::EventSource;
18//!
19//! #[tokio::main]
20//! async fn main() {
21//!     let mut events = reqwest::get("https://sse.test-free.online/api/story")
22//!         .await.unwrap()
23//!         .events()
24//!         .await.unwrap();
25//!
26//!     while let Some(Ok(event)) = events.next().await {
27//!         println!("{event:?}");
28//!     }
29//! }
30//! ```
31pub mod error;
32
33use std::{pin::Pin, time::Duration};
34
35use async_stream::try_stream;
36use reqwest::{
37    Response, StatusCode,
38    header::{CONTENT_TYPE, HeaderValue},
39};
40use tokio::io::AsyncBufReadExt;
41use tokio_stream::{Stream, StreamExt};
42use tokio_util::io::StreamReader;
43
44use crate::error::{EventError, EventSourceError};
45
46/// `text/event-stream` MIME type as [`HeaderValue`].
47pub static MIME_EVENT_STREAM: HeaderValue = HeaderValue::from_static("text/event-stream");
48
49/// Internal buffer used to accumulate lines of an SSE (Server-Sent Events) stream.
50///
51/// A single [`EventBuffer`] can be used to process the whole stream. [`set_event_type`] and [`push_data`]
52/// methods update the state. [`produce_event`] produces a proper [`Event`] and prepares the internal
53/// state to process further data.
54struct EventBuffer {
55    event_type: String,
56    data: String,
57    last_event_id: Option<String>,
58    retry: Option<Duration>,
59}
60
61impl EventBuffer {
62    /// Creates fresh new [`EventBuffer`].
63    #[allow(clippy::new_without_default)]
64    fn new() -> Self {
65        Self {
66            event_type: String::new(),
67            data: String::new(),
68            last_event_id: None,
69            retry: None,
70        }
71    }
72
73    /// Produces a [`Event`], if current state allow it.
74    ///
75    /// Reset the internal state to process further data.
76    fn produce_event(&mut self) -> Option<Event> {
77        let event = if self.data.is_empty() {
78            None
79        } else {
80            Some(Event {
81                event_type: if self.event_type.is_empty() {
82                    "message".to_string()
83                } else {
84                    self.event_type.clone()
85                },
86                data: self.data.to_string(),
87                last_event_id: self.last_event_id.clone(),
88                retry: self.retry,
89            })
90        };
91
92        self.event_type.clear();
93        self.data.clear();
94
95        event
96    }
97
98    /// Set the [`Event`]'s type. Overide previous value.
99    fn set_event_type(&mut self, event_type: &str) {
100        self.event_type.clear();
101        self.event_type.push_str(event_type);
102    }
103
104    /// Extends internal data with given data.
105    fn push_data(&mut self, data: &str) {
106        if !self.data.is_empty() {
107            self.data.push('\n');
108        }
109        self.data.push_str(data);
110    }
111
112    fn set_id(&mut self, id: &str) {
113        self.last_event_id = Some(id.to_string());
114    }
115
116    fn set_retry(&mut self, retry: Duration) {
117        self.retry = Some(retry);
118    }
119}
120
121/// Parse line to split field name and value, applying proper trimming.
122fn parse_line(line: &str) -> (&str, &str) {
123    let (field, value) = line.split_once(':').unwrap_or((line, ""));
124    let value = value.strip_prefix(' ').unwrap_or(value);
125    (field, value)
126}
127
128/// Server-Sent Event representation.
129#[derive(Debug, Clone, Eq, PartialEq)]
130pub struct Event {
131    /// A string identifying the type of event described.
132    pub event_type: String,
133    /// The data field for the message.
134    pub data: String,
135    /// Last event ID value.
136    pub last_event_id: Option<String>,
137    /// Reconnection time.
138    pub retry: Option<Duration>,
139}
140
141/// A trait for consuming a [`Response`] as a [`Stream`] of Server-Sent [`Event`]s (SSE).
142pub trait EventSource {
143    /// Converts the [`Response`] into a stream of Server-Sent Events.
144    /// Returns it as a faillable [`Stream`] of [`Event`]s.
145    ///
146    /// # Errors
147    ///
148    /// Returns an [`EventSourceError`] if:
149    /// - The response status is not `200 OK`
150    /// - The `Content-Type` header is missing or not `text/event-stream`
151    ///
152    /// The stream yields an [`EventError`] when error occure on event reading.
153    fn events(
154        self,
155    ) -> impl Future<
156        Output = Result<Pin<Box<impl Stream<Item = Result<Event, EventError>>>>, EventSourceError>,
157    > + Send;
158}
159
160impl EventSource for Response {
161    async fn events(
162        self,
163    ) -> Result<Pin<Box<impl Stream<Item = Result<Event, EventError>>>>, EventSourceError> {
164        let status = self.status();
165        if status != StatusCode::OK {
166            return Err(EventSourceError::BadStatus(status));
167        }
168        let content_type = self.headers().get(CONTENT_TYPE);
169        if content_type != Some(&MIME_EVENT_STREAM) {
170            return Err(EventSourceError::BadContentType(content_type.cloned()));
171        }
172
173        let mut stream = StreamReader::new(
174            self.bytes_stream()
175                .map(|result| result.map_err(std::io::Error::other)),
176        );
177
178        let mut line_buffer = String::new();
179        let mut event_buffer = EventBuffer::new();
180
181        let stream = Box::pin(try_stream! {
182            loop {
183                line_buffer.clear();
184                let count = stream.read_line(&mut line_buffer).await.map_err(EventError::IoError)?;
185                if count == 0 {
186                    break;
187                }
188                let line = if let Some(line) = line_buffer.strip_suffix('\n') {
189                    line
190                } else {
191                    &line_buffer
192                };
193
194                // dispatch
195                if line.is_empty() {
196                    if let Some(event) = event_buffer.produce_event() {
197                        yield event;
198                    }
199                    continue;
200                }
201
202                let (field, value) = parse_line(line);
203
204                match field {
205                    "event" => {
206                        event_buffer.set_event_type(value);
207                    }
208                    "data" => {
209                        event_buffer.push_data(value);
210                    }
211                    "id" => {
212                        event_buffer.set_id(value);
213                    }
214                    "retry" => {
215                        if let Ok(millis) = value.parse() {
216                            event_buffer.set_retry(Duration::from_millis(millis));
217                        }
218                    }
219                    _ => {}
220                }
221            }
222        });
223
224        Ok(stream)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn parse_line_properly() {
234        let (field, value) = parse_line("event: message");
235        assert_eq!(field, "event");
236        assert_eq!(value, "message");
237
238        let (field, value) = parse_line("non-standard field");
239        assert_eq!(field, "non-standard field");
240        assert_eq!(value, "");
241
242        let (field, value) = parse_line("data:data with : inside");
243        assert_eq!(field, "data");
244        assert_eq!(value, "data with : inside");
245    }
246}