Skip to main content

sse_core/
stream.rs

1use alloc::sync::Arc;
2use core::{
3    pin::Pin,
4    str,
5    task::{self, Poll, ready},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11    TryStream,
12    stream::{FusedStream, Stream},
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19    /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20    #[derive(Debug, Clone, Default)]
21    pub struct SseStream<T: TryStream> {
22        #[pin]
23        inner: Option<T>,
24        buf: Option<T::Ok>,
25
26        decoder: SseDecoder,
27    }
28}
29
30impl<T: TryStream> SseStream<T> {
31    /// Creates a new, disconnected [`SseStream`].
32    ///
33    /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
34    /// This constructor is primarily useful when you need to store the [`SseStream`]
35    /// inside a struct before the network connection is established.
36    ///
37    /// To make the stream active, you must attach an inner stream using
38    /// [`attach()`](Self::attach).
39    ///
40    /// # Example
41    /// ```
42    /// # use futures_core::stream::Stream;
43    /// # use sse_core::*;
44    /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
45    /// #     tokio_test::stream_mock::StreamMockBuilder::new().build()
46    /// # }
47    /// # tokio_test::block_on(async {
48    /// let mut stream = SseStream::disconnected();
49    ///
50    /// // ... later, when the network is ready:
51    /// let byte_stream = fetch_http_stream().await;
52    /// stream.attach(byte_stream);
53    /// # })
54    /// ```
55    #[inline]
56    #[must_use]
57    pub fn disconnected() -> Self {
58        Self::with_decoder(SseDecoder::new())
59    }
60
61    /// Creates a disconnected stream initialized with a custom decoder.
62    ///
63    /// See the [`disconnected()`](Self::disconnected) function for more information.
64    #[inline]
65    #[must_use]
66    pub fn with_decoder(decoder: SseDecoder) -> Self {
67        Self {
68            inner: None,
69            buf: None,
70            decoder,
71        }
72    }
73
74    /// Creates a new [`SseStream`] wrapping the provided inner stream.
75    #[inline]
76    #[must_use]
77    pub fn new(inner: T) -> Self {
78        let mut slf = Self::disconnected();
79        slf.inner = Some(inner);
80        slf
81    }
82
83    /// Consumes the stream and returns the underlying state-machine decoder.
84    #[inline]
85    pub fn take_decoder(self) -> SseDecoder {
86        let Self { mut decoder, .. } = self;
87        decoder.reconnect();
88        decoder
89    }
90
91    /// Returns `true` if the stream is currently disconnected.
92    #[inline]
93    #[must_use]
94    pub fn is_closed(&self) -> bool {
95        self.inner.is_none()
96    }
97
98    /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
99    #[inline]
100    #[must_use]
101    pub fn last_event_id(&self) -> Option<&Arc<str>> {
102        self.decoder.last_event_id()
103    }
104
105    /// Disconnects the inner stream while retaining the underlying parser's state.
106    ///
107    /// This drops the active network connection but safely preserves the most
108    /// recently parsed `Last-Event-ID` within the decoder. This is the standard
109    /// method to temporarily pause a stream or handle a dropped connection,
110    /// allowing you to later resume exactly where you left off.
111    ///
112    /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
113    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
114    #[inline]
115    pub fn close(&mut self) {
116        self.inner = None;
117    }
118
119    /// Disconnects the stream and completely purges the underlying parser's state.
120    ///
121    /// This drops the inner stream, clears all internal byte buffers, and
122    /// permanently drops the currently tracked `Last-Event-ID`. It effectively
123    /// returns the `SseStream` to the exact state it was in when initially
124    /// created via [`disconnected()`](Self::disconnected).
125    ///
126    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
127    /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
128    #[inline]
129    pub fn close_and_clear(&mut self) {
130        self.decoder.clear();
131        self.close();
132    }
133
134    /// Disconnects the inner stream and explicitly overrides the underlying
135    /// decoder's `Last-Event-ID` in preparation for a future connection.
136    ///
137    /// This is particularly useful in async contexts where you must drop the
138    /// active stream, inject a new ID, and then yield back to the runtime before
139    /// establishing a new network connection. The injected ID will be available
140    /// immediately via [`last_event_id()`](Self::last_event_id).
141    ///
142    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
143    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
144    #[inline]
145    pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
146        self.decoder.reconnect_with_id(id);
147        self.close();
148    }
149
150    /// Attaches a new inner stream to resume processing events.
151    ///
152    /// This method resets the underlying parser's buffers but retains the most
153    /// recently received `Last-Event-ID`. It is the standard way to recover from
154    /// a dropped network connection without missing any events.
155    ///
156    /// If you need to manually inject a saved `Last-Event-ID` (e.g., when recovering
157    /// an offline session from a database), use [`attach_with_id()`](Self::attach_with_id) instead.
158    #[inline]
159    pub fn attach(&mut self, inner: T) {
160        self.decoder.reconnect();
161        self.buf = None;
162        self.inner = Some(inner);
163    }
164
165    /// Attaches a new inner stream to resume processing, explicitly overriding
166    /// the `Last-Event-ID` in the underlying decoder.
167    ///
168    /// This method is primarily used when recovering an offline session where
169    /// you need to initialize the stream with a saved ID right as you provide
170    /// the new HTTP response stream.
171    ///
172    /// If you just want to resume a dropped stream using the ID that the decoder
173    /// has already tracked automatically, use [`attach()`](Self::attach).
174    #[inline]
175    pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
176        self.decoder.reconnect_with_id(id);
177        self.buf = None;
178        self.inner = Some(inner);
179    }
180}
181
182/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
183pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
184
185/// Errors that can occur while reading from an [`SseStream`].
186#[derive(Debug, Clone, PartialEq, Eq, Error)]
187#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
188pub enum SseStreamError<T> {
189    /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
190    #[error("{0}")]
191    PayloadTooLarge(PayloadTooLargeError),
192    /// An error propagated from the inner [`TryStream`].
193    #[error("{0}")]
194    Inner(#[from] T),
195}
196
197impl<T: TryStream> Stream for SseStream<T>
198where
199    T::Ok: Buf,
200{
201    type Item = SseStreamResult<SseEvent, T::Error>;
202
203    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
204        let mut slf = self.project();
205
206        let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
207            return Poll::Ready(None);
208        };
209
210        loop {
211            if let Some(event) = (slf.buf.as_mut())
212                .and_then(|buf| slf.decoder.next(buf).transpose())
213                .transpose()
214                .map_err(SseStreamError::PayloadTooLarge)?
215            {
216                return Poll::Ready(Some(Ok(event)));
217            };
218
219            *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
220            if slf.buf.is_none() {
221                slf.inner.set(None);
222                return Poll::Ready(None);
223            }
224        }
225    }
226}
227
228impl<T: TryStream> FusedStream for SseStream<T>
229where
230    T::Ok: Buf,
231{
232    fn is_terminated(&self) -> bool {
233        self.is_closed()
234    }
235}
236
237#[test]
238fn hard_parse() -> Result<(), PayloadTooLargeError> {
239    use crate::MessageEvent;
240    use std::slice;
241    use tokio_stream::StreamExt;
242
243    tokio_test::block_on(async {
244        // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
245        let bytes = "
246
247:
248
249event: my-event\r
250data:line1
251data: line2
252:
253id: my-id
254:should be ignored too\rretry:42
255retry:
256
257data:second
258
259";
260
261        let mut inner = tokio_test::stream_mock::StreamMockBuilder::new();
262        for b in bytes.as_bytes() {
263            inner = inner.next(Ok(slice::from_ref(b)));
264        }
265        inner = inner
266            .next(Err(()))
267            .next(Ok(b"data: hello\n\ndata:ignored\n"));
268
269        let id = Some("my-id".into());
270
271        let mut stream = SseStream::new(inner.build());
272        let events: Vec<_> = (&mut stream).collect().await;
273
274        assert_eq!(
275            events,
276            &[
277                Ok(SseEvent::Retry(42)),
278                Ok(SseEvent::Message(MessageEvent {
279                    event: "my-event".into(),
280                    data: "line1\nline2".into(),
281                    last_event_id: id.clone()
282                })),
283                Ok(SseEvent::Message(MessageEvent {
284                    event: "message".into(),
285                    data: "second".into(),
286                    last_event_id: id.clone()
287                })),
288                Err(SseStreamError::Inner(())),
289                Ok(SseEvent::Message(MessageEvent {
290                    event: "message".into(),
291                    data: "hello".into(),
292                    last_event_id: id.clone()
293                })),
294            ]
295        );
296
297        assert!(stream.is_closed());
298
299        Ok(())
300    })
301}