sse_core/stream.rs
1use alloc::sync::Arc;
2use core::{
3 pin::Pin,
4 str,
5 task::{self, ready, Poll},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11 stream::{FusedStream, Stream},
12 TryStream,
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19 /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20 #[derive(Debug, Clone, Default)]
21 pub struct SseStream<T: TryStream> {
22 #[pin]
23 inner: Option<T>,
24 buf: Option<T::Ok>,
25
26 decoder: SseDecoder,
27 }
28}
29
30impl<T: TryStream> SseStream<T> {
31 /// Creates a new, disconnected [`SseStream`].
32 ///
33 /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
34 /// This constructor is primarily useful when you need to store the [`SseStream`]
35 /// inside a struct before the network connection is established.
36 ///
37 /// To make the stream active, you must attach an inner stream using
38 /// [`attach()`](Self::attach).
39 ///
40 /// # Example
41 /// ```
42 /// # use futures_core::stream::Stream;
43 /// # use sse_core::*;
44 /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
45 /// # tokio_test::stream_mock::StreamMockBuilder::new().build()
46 /// # }
47 /// # tokio_test::block_on(async {
48 /// let mut stream = SseStream::disconnected();
49 ///
50 /// // ... later, when the network is ready:
51 /// let byte_stream = fetch_http_stream().await;
52 /// stream.attach(byte_stream);
53 /// # })
54 /// ```
55 #[inline]
56 #[must_use]
57 pub fn disconnected() -> Self {
58 Self::with_decoder(SseDecoder::new())
59 }
60
61 /// Creates a disconnected stream initialized with a custom decoder.
62 ///
63 /// See the [`disconnected()`](Self::disconnected) function for more information.
64 #[inline]
65 #[must_use]
66 pub fn with_decoder(decoder: SseDecoder) -> Self {
67 Self {
68 inner: None,
69 buf: None,
70 decoder,
71 }
72 }
73
74 /// Creates a new [`SseStream`] wrapping the provided inner stream.
75 #[inline]
76 #[must_use]
77 pub fn new(inner: T) -> Self {
78 let mut slf = Self::disconnected();
79 slf.inner = Some(inner);
80 slf
81 }
82
83 /// Consumes the stream and returns the underlying state-machine decoder.
84 #[inline]
85 pub fn take_decoder(self) -> SseDecoder {
86 let Self { mut decoder, .. } = self;
87 decoder.reconnect();
88 decoder
89 }
90
91 /// Returns `true` if the stream is currently disconnected.
92 #[inline]
93 #[must_use]
94 pub fn is_closed(&self) -> bool {
95 self.inner.is_none()
96 }
97
98 /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
99 #[inline]
100 #[must_use]
101 pub fn last_event_id(&self) -> Option<&Arc<str>> {
102 self.decoder.last_event_id()
103 }
104
105 /// Disconnects the inner stream while retaining the underlying parser's state.
106 ///
107 /// This drops the active network connection but safely preserves the most
108 /// recently parsed `Last-Event-ID` within the decoder. This is the standard
109 /// method to temporarily pause a stream or handle a dropped connection,
110 /// allowing you to later resume exactly where you left off.
111 ///
112 /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
113 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
114 #[inline]
115 pub fn close(&mut self) {
116 self.decoder.reconnect();
117 self.clear_bufs();
118 }
119
120 /// Disconnects the stream and completely purges the underlying parser's state.
121 ///
122 /// This drops the inner stream, clears all internal byte buffers, and
123 /// permanently drops the currently tracked `Last-Event-ID`. It effectively
124 /// returns the `SseStream` to the exact state it was in when initially
125 /// created via [`disconnected()`](Self::disconnected).
126 ///
127 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
128 /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
129 #[inline]
130 pub fn close_and_clear(&mut self) {
131 self.decoder.clear();
132 self.clear_bufs();
133 }
134
135 /// Disconnects the inner stream and explicitly overrides the underlying
136 /// decoder's `Last-Event-ID` in preparation for a future connection.
137 ///
138 /// This is particularly useful in async contexts where you must drop the
139 /// active stream, inject a new ID, and then yield back to the runtime before
140 /// establishing a new network connection. The injected ID will be available
141 /// immediately via [`last_event_id()`](Self::last_event_id).
142 ///
143 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
144 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
145 #[inline]
146 pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
147 self.decoder.reconnect_with_id(id);
148 self.clear_bufs();
149 }
150
151 /// Attaches a new inner stream to resume processing events.
152 ///
153 /// This method resets the underlying parser's buffers but safely retains the most
154 /// recently parsed `Last-Event-ID`. It is the standard way to recover from
155 /// a dropped network connection, allowing you to resume exactly where you left off.
156 ///
157 /// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
158 /// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
159 #[inline]
160 pub fn attach(&mut self, inner: T) {
161 self.close();
162 self.inner = Some(inner);
163 }
164
165 /// Attaches a new inner stream and completely purges the underlying parser's state.
166 ///
167 /// This method is used when you want to reuse an existing `SseStream` allocation
168 /// for a completely fresh connection or a different server. It clears all internal
169 /// byte buffers and permanently drops the currently tracked `Last-Event-ID`.
170 ///
171 /// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
172 /// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
173 #[inline]
174 pub fn clear_and_attach(&mut self, inner: T) {
175 self.close_and_clear();
176 self.inner = Some(inner);
177 }
178
179 /// Attaches a new inner stream to resume processing, explicitly overriding
180 /// the `Last-Event-ID` in the underlying decoder.
181 ///
182 /// This method is primarily used when recovering an offline session where
183 /// you need to initialize the stream with a saved ID (e.g., from a local database)
184 /// right as you provide the new HTTP response stream.
185 ///
186 /// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
187 /// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
188 #[inline]
189 pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
190 self.close_with_id(id);
191 self.inner = Some(inner);
192 }
193
194 #[inline]
195 fn clear_bufs(&mut self) {
196 self.inner = None;
197 self.buf = None;
198 }
199}
200
201/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
202pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
203
204/// Errors that can occur while reading from an [`SseStream`].
205#[derive(Debug, Clone, PartialEq, Eq, Error)]
206#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
207pub enum SseStreamError<T> {
208 /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
209 #[error("{0}")]
210 PayloadTooLarge(PayloadTooLargeError),
211 /// An error propagated from the inner [`TryStream`].
212 #[error("{0}")]
213 Inner(#[from] T),
214}
215
216impl<T: TryStream> Stream for SseStream<T>
217where
218 T::Ok: Buf,
219{
220 type Item = SseStreamResult<SseEvent, T::Error>;
221
222 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
223 let mut slf = self.project();
224
225 let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
226 return Poll::Ready(None);
227 };
228
229 loop {
230 if let Some(event) = (slf.buf.as_mut())
231 .and_then(|buf| slf.decoder.next(buf))
232 .transpose()
233 .map_err(SseStreamError::PayloadTooLarge)?
234 {
235 return Poll::Ready(Some(Ok(event)));
236 };
237
238 *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
239 if slf.buf.is_none() {
240 slf.inner.set(None);
241 return Poll::Ready(None);
242 }
243 }
244 }
245}
246
247impl<T: TryStream> FusedStream for SseStream<T>
248where
249 T::Ok: Buf,
250{
251 fn is_terminated(&self) -> bool {
252 self.is_closed()
253 }
254}
255
256#[test]
257fn hard_parse() -> Result<(), PayloadTooLargeError> {
258 use crate::MessageEvent;
259 use std::slice;
260 use tokio_stream::StreamExt;
261
262 tokio_test::block_on(async {
263 // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
264 let bytes = "
265
266:
267
268event: my-event\r
269data:line1
270data: line2
271:
272id: my-id
273:should be ignored too\rretry:42
274retry:
275
276data:second
277
278";
279
280 let mut inner = tokio_test::stream_mock::StreamMockBuilder::new();
281 for b in bytes.as_bytes() {
282 inner = inner.next(Ok(slice::from_ref(b)));
283 }
284 inner = inner
285 .next(Err(()))
286 .next(Ok(b"data: hello\n\ndata:ignored\n"));
287
288 let id = Some("my-id".into());
289
290 let mut stream = SseStream::new(inner.build());
291 let events: Vec<_> = (&mut stream).collect().await;
292
293 assert_eq!(
294 events,
295 &[
296 Ok(SseEvent::Retry(42)),
297 Ok(SseEvent::Message(MessageEvent {
298 event: "my-event".into(),
299 data: "line1\nline2".into(),
300 last_event_id: id.clone()
301 })),
302 Ok(SseEvent::Message(MessageEvent {
303 event: "message".into(),
304 data: "second".into(),
305 last_event_id: id.clone()
306 })),
307 Err(SseStreamError::Inner(())),
308 Ok(SseEvent::Message(MessageEvent {
309 event: "message".into(),
310 data: "hello".into(),
311 last_event_id: id.clone()
312 })),
313 ]
314 );
315
316 assert!(stream.is_closed());
317
318 Ok(())
319 })
320}