Skip to main content

sse_core/
stream.rs

1use alloc::sync::Arc;
2use core::{
3    pin::Pin,
4    str,
5    task::{self, Poll, ready},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11    TryStream,
12    stream::{FusedStream, Stream},
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19    /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20    pub struct SseStream<T: TryStream> {
21        #[pin]
22        inner: Option<T>,
23        buf: Option<T::Ok>,
24
25        decoder: SseDecoder,
26    }
27}
28
29impl<T: TryStream> SseStream<T> {
30    /// Creates a new, disconnected [`SseStream`].
31    ///
32    /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
33    /// This constructor is primarily useful when you need to store the [`SseStream`]
34    /// inside a struct before the network connection is established.
35    ///
36    /// To make the stream active, you must attach an inner stream using
37    /// [`attach()`](Self::attach).
38    ///
39    /// # Example
40    /// ```
41    /// # use futures_core::stream::Stream;
42    /// # use sse_core::*;
43    /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
44    /// #     tokio_test::stream_mock::StreamMockBuilder::new().build()
45    /// # }
46    /// # tokio_test::block_on(async {
47    /// let mut stream = SseStream::disconnected();
48    ///
49    /// // ... later, when the network is ready:
50    /// let byte_stream = fetch_http_stream().await;
51    /// stream.attach(byte_stream);
52    /// # })
53    /// ```
54    #[inline]
55    #[must_use]
56    pub fn disconnected() -> Self {
57        Self::with_decoder(SseDecoder::new())
58    }
59
60    /// Creates a disconnected stream initialized with a custom decoder.
61    ///
62    /// See the [`disconnected()`](Self::disconnected) function for more information.
63    #[inline]
64    #[must_use]
65    pub fn with_decoder(decoder: SseDecoder) -> Self {
66        Self {
67            inner: None,
68            buf: None,
69            decoder,
70        }
71    }
72
73    /// Creates a new [`SseStream`] wrapping the provided inner stream.
74    #[inline]
75    #[must_use]
76    pub fn new(inner: T) -> Self {
77        let mut slf = Self::disconnected();
78        slf.inner = Some(inner);
79        slf
80    }
81
82    /// Consumes the stream and returns the underlying state-machine decoder.
83    #[inline]
84    pub fn take_decoder(self) -> SseDecoder {
85        let Self { mut decoder, .. } = self;
86        decoder.reconnect();
87        decoder
88    }
89
90    /// Returns `true` if the stream is currently disconnected.
91    #[inline]
92    #[must_use]
93    pub fn is_closed(&self) -> bool {
94        self.inner.is_none()
95    }
96
97    /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
98    #[inline]
99    #[must_use]
100    pub fn last_event_id(&self) -> Option<&Arc<str>> {
101        self.decoder.last_event_id()
102    }
103
104    /// Disconnects the inner stream while retaining the underlying parser's state.
105    ///
106    /// This drops the active network connection but safely preserves the most
107    /// recently parsed `Last-Event-ID` within the decoder. This is the standard
108    /// method to temporarily pause a stream or handle a dropped connection,
109    /// allowing you to later resume exactly where you left off.
110    ///
111    /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
112    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
113    #[inline]
114    pub fn close(&mut self) {
115        self.inner = None;
116    }
117
118    /// Disconnects the stream and completely purges the underlying parser's state.
119    ///
120    /// This drops the inner stream, clears all internal byte buffers, and
121    /// permanently drops the currently tracked `Last-Event-ID`. It effectively
122    /// returns the `SseStream` to the exact state it was in when initially
123    /// created via [`disconnected()`](Self::disconnected).
124    ///
125    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
126    /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
127    #[inline]
128    pub fn close_and_clear(&mut self) {
129        self.decoder.clear();
130        self.close();
131    }
132
133    /// Disconnects the inner stream and explicitly overrides the underlying
134    /// decoder's `Last-Event-ID` in preparation for a future connection.
135    ///
136    /// This is particularly useful in async contexts where you must drop the
137    /// active stream, inject a new ID, and then yield back to the runtime before
138    /// establishing a new network connection. The injected ID will be available
139    /// immediately via [`last_event_id()`](Self::last_event_id).
140    ///
141    /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
142    /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
143    #[inline]
144    pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
145        self.decoder.reconnect_with_id(id);
146        self.close();
147    }
148
149    /// Attaches a new inner stream to resume processing events.
150    ///
151    /// This method resets the underlying parser's buffers but retains the most
152    /// recently received `Last-Event-ID`. It is the standard way to recover from
153    /// a dropped network connection without missing any events.
154    ///
155    /// If you need to manually inject a saved `Last-Event-ID` (e.g., when recovering
156    /// an offline session from a database), use [`attach_with_id()`](Self::attach_with_id) instead.
157    #[inline]
158    pub fn attach(&mut self, inner: T) {
159        self.decoder.reconnect();
160        self.buf = None;
161        self.inner = Some(inner);
162    }
163
164    /// Attaches a new inner stream to resume processing, explicitly overriding
165    /// the `Last-Event-ID` in the underlying decoder.
166    ///
167    /// This method is primarily used when recovering an offline session where
168    /// you need to initialize the stream with a saved ID right as you provide
169    /// the new HTTP response stream.
170    ///
171    /// If you just want to resume a dropped stream using the ID that the decoder
172    /// has already tracked automatically, use [`attach()`](Self::attach).
173    #[inline]
174    pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
175        self.decoder.reconnect_with_id(id);
176        self.buf = None;
177        self.inner = Some(inner);
178    }
179}
180
181/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
182pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
183
184/// Errors that can occur while reading from an [`SseStream`].
185#[derive(Debug, Clone, PartialEq, Eq, Error)]
186#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
187pub enum SseStreamError<T> {
188    /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
189    #[error("{0}")]
190    PayloadTooLarge(PayloadTooLargeError),
191    /// An error propagated from the inner [`TryStream`].
192    #[error("{0}")]
193    Inner(#[from] T),
194}
195
196impl<T: TryStream> Stream for SseStream<T>
197where
198    T::Ok: Buf,
199{
200    type Item = SseStreamResult<SseEvent, T::Error>;
201
202    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
203        let mut slf = self.project();
204
205        let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
206            return Poll::Ready(None);
207        };
208
209        loop {
210            if let Some(event) = (slf.buf.as_mut())
211                .and_then(|buf| slf.decoder.next(buf).transpose())
212                .transpose()
213                .map_err(SseStreamError::PayloadTooLarge)?
214            {
215                return Poll::Ready(Some(Ok(event)));
216            };
217
218            *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
219            if slf.buf.is_none() {
220                slf.inner.set(None);
221                return Poll::Ready(None);
222            }
223        }
224    }
225}
226
227impl<T: TryStream> FusedStream for SseStream<T>
228where
229    T::Ok: Buf,
230{
231    fn is_terminated(&self) -> bool {
232        self.is_closed()
233    }
234}