sse_core/stream.rs
1use alloc::sync::Arc;
2use core::{
3 pin::Pin,
4 str,
5 task::{self, Poll, ready},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11 TryStream,
12 stream::{FusedStream, Stream},
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19 /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20 pub struct SseStream<T: TryStream> {
21 #[pin]
22 inner: Option<T>,
23 buf: Option<T::Ok>,
24
25 decoder: SseDecoder,
26 }
27}
28
29impl<T: TryStream> SseStream<T> {
30 /// Creates a new, disconnected [`SseStream`].
31 ///
32 /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
33 /// This constructor is primarily useful when you need to store the [`SseStream`]
34 /// inside a struct before the network connection is established.
35 ///
36 /// To make the stream active, you must attach an inner stream using
37 /// [`attach()`](Self::attach).
38 ///
39 /// # Example
40 /// ```
41 /// # use futures_core::stream::Stream;
42 /// # use sse_core::*;
43 /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
44 /// # tokio_test::stream_mock::StreamMockBuilder::new().build()
45 /// # }
46 /// # tokio_test::block_on(async {
47 /// let mut stream = SseStream::disconnected();
48 ///
49 /// // ... later, when the network is ready:
50 /// let byte_stream = fetch_http_stream().await;
51 /// stream.attach(byte_stream);
52 /// # })
53 /// ```
54 #[inline]
55 #[must_use]
56 pub fn disconnected() -> Self {
57 Self::with_decoder(SseDecoder::new())
58 }
59
60 /// Creates a disconnected stream initialized with a custom decoder.
61 ///
62 /// See the [`disconnected()`](Self::disconnected) function for more information.
63 #[inline]
64 #[must_use]
65 pub fn with_decoder(decoder: SseDecoder) -> Self {
66 Self {
67 inner: None,
68 buf: None,
69 decoder,
70 }
71 }
72
73 /// Creates a new [`SseStream`] wrapping the provided inner stream.
74 #[inline]
75 #[must_use]
76 pub fn new(inner: T) -> Self {
77 let mut slf = Self::disconnected();
78 slf.inner = Some(inner);
79 slf
80 }
81
82 /// Consumes the stream and returns the underlying state-machine decoder.
83 #[inline]
84 pub fn take_decoder(self) -> SseDecoder {
85 let Self { mut decoder, .. } = self;
86 decoder.reconnect();
87 decoder
88 }
89
90 /// Returns `true` if the stream is currently disconnected.
91 #[inline]
92 #[must_use]
93 pub fn is_closed(&self) -> bool {
94 self.inner.is_none()
95 }
96
97 /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
98 #[inline]
99 #[must_use]
100 pub fn last_event_id(&self) -> Option<&Arc<str>> {
101 self.decoder.last_event_id()
102 }
103
104 /// Disconnects the inner stream while retaining the underlying parser's state.
105 ///
106 /// This drops the active network connection but safely preserves the most
107 /// recently parsed `Last-Event-ID` within the decoder. This is the standard
108 /// method to temporarily pause a stream or handle a dropped connection,
109 /// allowing you to later resume exactly where you left off.
110 ///
111 /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
112 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
113 #[inline]
114 pub fn close(&mut self) {
115 self.inner = None;
116 }
117
118 /// Disconnects the stream and completely purges the underlying parser's state.
119 ///
120 /// This drops the inner stream, clears all internal byte buffers, and
121 /// permanently drops the currently tracked `Last-Event-ID`. It effectively
122 /// returns the `SseStream` to the exact state it was in when initially
123 /// created via [`disconnected()`](Self::disconnected).
124 ///
125 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
126 /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
127 #[inline]
128 pub fn close_and_clear(&mut self) {
129 self.decoder.clear();
130 self.close();
131 }
132
133 /// Disconnects the inner stream and explicitly overrides the underlying
134 /// decoder's `Last-Event-ID` in preparation for a future connection.
135 ///
136 /// This is particularly useful in async contexts where you must drop the
137 /// active stream, inject a new ID, and then yield back to the runtime before
138 /// establishing a new network connection. The injected ID will be available
139 /// immediately via [`last_event_id()`](Self::last_event_id).
140 ///
141 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
142 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
143 #[inline]
144 pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
145 self.decoder.reconnect_with_id(id);
146 self.close();
147 }
148
149 /// Attaches a new inner stream to resume processing events.
150 ///
151 /// This method resets the underlying parser's buffers but retains the most
152 /// recently received `Last-Event-ID`. It is the standard way to recover from
153 /// a dropped network connection without missing any events.
154 ///
155 /// If you need to manually inject a saved `Last-Event-ID` (e.g., when recovering
156 /// an offline session from a database), use [`attach_with_id()`](Self::attach_with_id) instead.
157 #[inline]
158 pub fn attach(&mut self, inner: T) {
159 self.decoder.reconnect();
160 self.buf = None;
161 self.inner = Some(inner);
162 }
163
164 /// Attaches a new inner stream to resume processing, explicitly overriding
165 /// the `Last-Event-ID` in the underlying decoder.
166 ///
167 /// This method is primarily used when recovering an offline session where
168 /// you need to initialize the stream with a saved ID right as you provide
169 /// the new HTTP response stream.
170 ///
171 /// If you just want to resume a dropped stream using the ID that the decoder
172 /// has already tracked automatically, use [`attach()`](Self::attach).
173 #[inline]
174 pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
175 self.decoder.reconnect_with_id(id);
176 self.buf = None;
177 self.inner = Some(inner);
178 }
179}
180
181/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
182pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
183
184/// Errors that can occur while reading from an [`SseStream`].
185#[derive(Debug, Clone, PartialEq, Eq, Error)]
186#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
187pub enum SseStreamError<T> {
188 /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
189 #[error("{0}")]
190 PayloadTooLarge(PayloadTooLargeError),
191 /// An error propagated from the inner [`TryStream`].
192 #[error("{0}")]
193 Inner(#[from] T),
194}
195
196impl<T: TryStream> Stream for SseStream<T>
197where
198 T::Ok: Buf,
199{
200 type Item = SseStreamResult<SseEvent, T::Error>;
201
202 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
203 let mut slf = self.project();
204
205 let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
206 return Poll::Ready(None);
207 };
208
209 loop {
210 if let Some(event) = (slf.buf.as_mut())
211 .and_then(|buf| slf.decoder.next(buf).transpose())
212 .transpose()
213 .map_err(SseStreamError::PayloadTooLarge)?
214 {
215 return Poll::Ready(Some(Ok(event)));
216 };
217
218 *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
219 if slf.buf.is_none() {
220 slf.inner.set(None);
221 return Poll::Ready(None);
222 }
223 }
224 }
225}
226
227impl<T: TryStream> FusedStream for SseStream<T>
228where
229 T::Ok: Buf,
230{
231 fn is_terminated(&self) -> bool {
232 self.is_closed()
233 }
234}