Skip to main content

sse_core/
stream.rs

1use alloc::sync::Arc;
2use core::{
3    pin::Pin,
4    str,
5    task::{self, ready, Poll},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11    stream::{FusedStream, Stream},
12    TryStream,
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19    /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20    #[derive(Debug, Clone, Default)]
21    pub struct SseStream<T: TryStream> {
22        #[pin]
23        inner: Option<T>,
24        buf: Option<T::Ok>,
25
26        decoder: SseDecoder,
27    }
28}
29
30impl<T: TryStream> SseStream<T> {
31    /// Creates a new, disconnected [`SseStream`].
32    ///
33    /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
34    /// This constructor is primarily useful when you need to store the [`SseStream`]
35    /// inside a struct before the network connection is established.
36    ///
37    /// To make the stream active, you must attach an inner stream using
38    /// [`attach()`](Self::attach).
39    ///
40    /// # Example
41    /// ```
42    /// # use futures_core::stream::Stream;
43    /// # use sse_core::*;
44    /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
45    /// #     tokio_test::stream_mock::StreamMockBuilder::new().build()
46    /// # }
47    /// # tokio_test::block_on(async {
48    /// let mut stream = SseStream::disconnected();
49    ///
50    /// // ... later, when the network is ready:
51    /// let byte_stream = fetch_http_stream().await;
52    /// stream.attach(byte_stream);
53    /// # })
54    /// ```
55    #[inline]
56    #[must_use]
57    pub fn disconnected() -> Self {
58        Self::with_decoder(SseDecoder::new())
59    }
60
61    /// Creates a disconnected stream initialized with a custom decoder.
62    ///
63    /// See the [`disconnected()`](Self::disconnected) function for more information.
64    #[inline]
65    #[must_use]
66    pub fn with_decoder(decoder: SseDecoder) -> Self {
67        Self {
68            inner: None,
69            buf: None,
70            decoder,
71        }
72    }
73
74    /// Creates a new [`SseStream`] wrapping the provided inner stream.
75    #[inline]
76    #[must_use]
77    pub fn new(inner: T) -> Self {
78        let mut slf = Self::disconnected();
79        slf.inner = Some(inner);
80        slf
81    }
82
83    /// Consumes the stream and returns the underlying state-machine decoder.
84    #[inline]
85    pub fn take_decoder(self) -> SseDecoder {
86        let Self { mut decoder, .. } = self;
87        decoder.reconnect();
88        decoder
89    }
90
91    /// Returns `true` if the stream is currently disconnected.
92    #[inline]
93    #[must_use]
94    pub fn is_closed(&self) -> bool {
95        self.inner.is_none()
96    }
97
98    /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
99    #[inline]
100    #[must_use]
101    pub fn last_event_id(&self) -> Option<&Arc<str>> {
102        self.decoder.last_event_id()
103    }
104
105    /// Disconnects the inner stream while retaining the underlying parser's state.
106    ///
107    /// This drops the active network connection but safely preserves the most
108    /// recently parsed `Last-Event-ID` within the decoder. This is the standard
109    /// method to temporarily pause a stream or handle a dropped connection,
110    /// allowing you to later resume exactly where you left off.
111    ///
112    /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
113    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
114    #[inline]
115    pub fn close(&mut self) {
116        self.decoder.reconnect();
117        self.clear_bufs();
118    }
119
120    /// Disconnects the stream and completely purges the underlying parser's state.
121    ///
122    /// This drops the inner stream, clears all internal byte buffers, and
123    /// permanently drops the currently tracked `Last-Event-ID`. It effectively
124    /// returns the `SseStream` to the exact state it was in when initially
125    /// created via [`disconnected()`](Self::disconnected).
126    ///
127    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
128    /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
129    #[inline]
130    pub fn close_and_clear(&mut self) {
131        self.decoder.clear();
132        self.clear_bufs();
133    }
134
135    /// Disconnects the inner stream and explicitly overrides the underlying
136    /// decoder's `Last-Event-ID` in preparation for a future connection.
137    ///
138    /// This is particularly useful in async contexts where you must drop the
139    /// active stream, inject a new ID, and then yield back to the runtime before
140    /// establishing a new network connection. The injected ID will be available
141    /// immediately via [`last_event_id()`](Self::last_event_id).
142    ///
143    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
144    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
145    #[inline]
146    pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
147        self.decoder.reconnect_with_id(id);
148        self.clear_bufs();
149    }
150
151    /// Attaches a new inner stream to resume processing events.
152    ///
153    /// This method resets the underlying parser's buffers but safely retains the most
154    /// recently parsed `Last-Event-ID`. It is the standard way to recover from
155    /// a dropped network connection, allowing you to resume exactly where you left off.
156    ///
157    /// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
158    /// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
159    #[inline]
160    pub fn attach(&mut self, inner: T) {
161        self.close();
162        self.inner = Some(inner);
163    }
164
165    /// Attaches a new inner stream and completely purges the underlying parser's state.
166    ///
167    /// This method is used when you want to reuse an existing `SseStream` allocation
168    /// for a completely fresh connection or a different server. It clears all internal
169    /// byte buffers and permanently drops the currently tracked `Last-Event-ID`.
170    ///
171    /// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
172    /// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
173    #[inline]
174    pub fn clear_and_attach(&mut self, inner: T) {
175        self.close_and_clear();
176        self.inner = Some(inner);
177    }
178
179    /// Attaches a new inner stream to resume processing, explicitly overriding
180    /// the `Last-Event-ID` in the underlying decoder.
181    ///
182    /// This method is primarily used when recovering an offline session where
183    /// you need to initialize the stream with a saved ID (e.g., from a local database)
184    /// right as you provide the new HTTP response stream.
185    ///
186    /// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
187    /// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
188    #[inline]
189    pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
190        self.close_with_id(id);
191        self.inner = Some(inner);
192    }
193
194    #[inline]
195    fn clear_bufs(&mut self) {
196        self.inner = None;
197        self.buf = None;
198    }
199}
200
201/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
202pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
203
204/// Errors that can occur while reading from an [`SseStream`].
205#[derive(Debug, Clone, PartialEq, Eq, Error)]
206#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
207pub enum SseStreamError<T> {
208    /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
209    #[error("{0}")]
210    PayloadTooLarge(PayloadTooLargeError),
211    /// An error propagated from the inner [`TryStream`].
212    #[error("{0}")]
213    Inner(#[from] T),
214}
215
216impl<T: TryStream> Stream for SseStream<T>
217where
218    T::Ok: Buf,
219{
220    type Item = SseStreamResult<SseEvent, T::Error>;
221
222    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
223        let mut slf = self.project();
224
225        let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
226            return Poll::Ready(None);
227        };
228
229        loop {
230            if let Some(event) = (slf.buf.as_mut())
231                .and_then(|buf| slf.decoder.next(buf))
232                .transpose()
233                .map_err(SseStreamError::PayloadTooLarge)?
234            {
235                return Poll::Ready(Some(Ok(event)));
236            };
237
238            *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
239            if slf.buf.is_none() {
240                slf.inner.set(None);
241                return Poll::Ready(None);
242            }
243        }
244    }
245}
246
247impl<T: TryStream> FusedStream for SseStream<T>
248where
249    T::Ok: Buf,
250{
251    fn is_terminated(&self) -> bool {
252        self.is_closed()
253    }
254}
255
256#[test]
257fn hard_parse() -> Result<(), PayloadTooLargeError> {
258    use crate::MessageEvent;
259    use std::slice;
260    use tokio_stream::StreamExt;
261
262    tokio_test::block_on(async {
263        // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
264        let bytes = "
265
266:
267
268event: my-event\r
269data:line1
270data: line2
271:
272id: my-id
273:should be ignored too\rretry:42
274retry:
275
276data:second
277
278";
279
280        let mut inner = tokio_test::stream_mock::StreamMockBuilder::new();
281        for b in bytes.as_bytes() {
282            inner = inner.next(Ok(slice::from_ref(b)));
283        }
284        inner = inner
285            .next(Err(()))
286            .next(Ok(b"data: hello\n\ndata:ignored\n"));
287
288        let id = Some("my-id".into());
289
290        let mut stream = SseStream::new(inner.build());
291        let events: Vec<_> = (&mut stream).collect().await;
292
293        assert_eq!(
294            events,
295            &[
296                Ok(SseEvent::Retry(42)),
297                Ok(SseEvent::Message(MessageEvent {
298                    event: "my-event".into(),
299                    data: "line1\nline2".into(),
300                    last_event_id: id.clone()
301                })),
302                Ok(SseEvent::Message(MessageEvent {
303                    event: "message".into(),
304                    data: "second".into(),
305                    last_event_id: id.clone()
306                })),
307                Err(SseStreamError::Inner(())),
308                Ok(SseEvent::Message(MessageEvent {
309                    event: "message".into(),
310                    data: "hello".into(),
311                    last_event_id: id.clone()
312                })),
313            ]
314        );
315
316        assert!(stream.is_closed());
317
318        Ok(())
319    })
320}