sse_core/stream.rs
1use alloc::sync::Arc;
2use core::{
3 pin::Pin,
4 str,
5 task::{self, Poll, ready},
6};
7use thiserror::Error;
8
9use bytes::Buf;
10use futures_core::{
11 TryStream,
12 stream::{FusedStream, Stream},
13};
14use pin_project_lite::pin_project;
15
16use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
17
18pin_project! {
19 /// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
20 #[derive(Debug, Clone, Default)]
21 pub struct SseStream<T: TryStream> {
22 #[pin]
23 inner: Option<T>,
24 buf: Option<T::Ok>,
25
26 decoder: SseDecoder,
27 }
28}
29
30impl<T: TryStream> SseStream<T> {
31 /// Creates a new, disconnected [`SseStream`].
32 ///
33 /// A disconnected stream will immediately yield [`None`] (terminated) if polled.
34 /// This constructor is primarily useful when you need to store the [`SseStream`]
35 /// inside a struct before the network connection is established.
36 ///
37 /// To make the stream active, you must attach an inner stream using
38 /// [`attach()`](Self::attach).
39 ///
40 /// # Example
41 /// ```
42 /// # use futures_core::stream::Stream;
43 /// # use sse_core::*;
44 /// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
45 /// # tokio_test::stream_mock::StreamMockBuilder::new().build()
46 /// # }
47 /// # tokio_test::block_on(async {
48 /// let mut stream = SseStream::disconnected();
49 ///
50 /// // ... later, when the network is ready:
51 /// let byte_stream = fetch_http_stream().await;
52 /// stream.attach(byte_stream);
53 /// # })
54 /// ```
55 #[inline]
56 #[must_use]
57 pub fn disconnected() -> Self {
58 Self::with_decoder(SseDecoder::new())
59 }
60
61 /// Creates a disconnected stream initialized with a custom decoder.
62 ///
63 /// See the [`disconnected()`](Self::disconnected) function for more information.
64 #[inline]
65 #[must_use]
66 pub fn with_decoder(decoder: SseDecoder) -> Self {
67 Self {
68 inner: None,
69 buf: None,
70 decoder,
71 }
72 }
73
74 /// Creates a new [`SseStream`] wrapping the provided inner stream.
75 #[inline]
76 #[must_use]
77 pub fn new(inner: T) -> Self {
78 let mut slf = Self::disconnected();
79 slf.inner = Some(inner);
80 slf
81 }
82
83 /// Consumes the stream and returns the underlying state-machine decoder.
84 #[inline]
85 pub fn take_decoder(self) -> SseDecoder {
86 let Self { mut decoder, .. } = self;
87 decoder.reconnect();
88 decoder
89 }
90
91 /// Returns `true` if the stream is currently disconnected.
92 #[inline]
93 #[must_use]
94 pub fn is_closed(&self) -> bool {
95 self.inner.is_none()
96 }
97
98 /// Returns the current `Last-Event-ID` parsed by the underlying decoder.
99 #[inline]
100 #[must_use]
101 pub fn last_event_id(&self) -> Option<&Arc<str>> {
102 self.decoder.last_event_id()
103 }
104
105 /// Disconnects the inner stream while retaining the underlying parser's state.
106 ///
107 /// This drops the active network connection but safely preserves the most
108 /// recently parsed `Last-Event-ID` within the decoder. This is the standard
109 /// method to temporarily pause a stream or handle a dropped connection,
110 /// allowing you to later resume exactly where you left off.
111 ///
112 /// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
113 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
114 #[inline]
115 pub fn close(&mut self) {
116 self.inner = None;
117 }
118
119 /// Disconnects the stream and completely purges the underlying parser's state.
120 ///
121 /// This drops the inner stream, clears all internal byte buffers, and
122 /// permanently drops the currently tracked `Last-Event-ID`. It effectively
123 /// returns the `SseStream` to the exact state it was in when initially
124 /// created via [`disconnected()`](Self::disconnected).
125 ///
126 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
127 /// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
128 #[inline]
129 pub fn close_and_clear(&mut self) {
130 self.decoder.clear();
131 self.close();
132 }
133
134 /// Disconnects the inner stream and explicitly overrides the underlying
135 /// decoder's `Last-Event-ID` in preparation for a future connection.
136 ///
137 /// This is particularly useful in async contexts where you must drop the
138 /// active stream, inject a new ID, and then yield back to the runtime before
139 /// establishing a new network connection. The injected ID will be available
140 /// immediately via [`last_event_id()`](Self::last_event_id).
141 ///
142 /// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
143 /// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
144 #[inline]
145 pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
146 self.decoder.reconnect_with_id(id);
147 self.close();
148 }
149
150 /// Attaches a new inner stream to resume processing events.
151 ///
152 /// This method resets the underlying parser's buffers but retains the most
153 /// recently received `Last-Event-ID`. It is the standard way to recover from
154 /// a dropped network connection without missing any events.
155 ///
156 /// If you need to manually inject a saved `Last-Event-ID` (e.g., when recovering
157 /// an offline session from a database), use [`attach_with_id()`](Self::attach_with_id) instead.
158 #[inline]
159 pub fn attach(&mut self, inner: T) {
160 self.decoder.reconnect();
161 self.buf = None;
162 self.inner = Some(inner);
163 }
164
165 /// Attaches a new inner stream to resume processing, explicitly overriding
166 /// the `Last-Event-ID` in the underlying decoder.
167 ///
168 /// This method is primarily used when recovering an offline session where
169 /// you need to initialize the stream with a saved ID right as you provide
170 /// the new HTTP response stream.
171 ///
172 /// If you just want to resume a dropped stream using the ID that the decoder
173 /// has already tracked automatically, use [`attach()`](Self::attach).
174 #[inline]
175 pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
176 self.decoder.reconnect_with_id(id);
177 self.buf = None;
178 self.inner = Some(inner);
179 }
180}
181
182/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
183pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
184
185/// Errors that can occur while reading from an [`SseStream`].
186#[derive(Debug, Clone, PartialEq, Eq, Error)]
187#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
188pub enum SseStreamError<T> {
189 /// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
190 #[error("{0}")]
191 PayloadTooLarge(PayloadTooLargeError),
192 /// An error propagated from the inner [`TryStream`].
193 #[error("{0}")]
194 Inner(#[from] T),
195}
196
197impl<T: TryStream> Stream for SseStream<T>
198where
199 T::Ok: Buf,
200{
201 type Item = SseStreamResult<SseEvent, T::Error>;
202
203 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
204 let mut slf = self.project();
205
206 let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
207 return Poll::Ready(None);
208 };
209
210 loop {
211 if let Some(event) = (slf.buf.as_mut())
212 .and_then(|buf| slf.decoder.next(buf).transpose())
213 .transpose()
214 .map_err(SseStreamError::PayloadTooLarge)?
215 {
216 return Poll::Ready(Some(Ok(event)));
217 };
218
219 *slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
220 if slf.buf.is_none() {
221 slf.inner.set(None);
222 return Poll::Ready(None);
223 }
224 }
225 }
226}
227
228impl<T: TryStream> FusedStream for SseStream<T>
229where
230 T::Ok: Buf,
231{
232 fn is_terminated(&self) -> bool {
233 self.is_closed()
234 }
235}
236
237#[test]
238fn hard_parse() -> Result<(), PayloadTooLargeError> {
239 use crate::MessageEvent;
240 use std::slice;
241 use tokio_stream::StreamExt;
242
243 tokio_test::block_on(async {
244 // Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
245 let bytes = "
246
247:
248
249event: my-event\r
250data:line1
251data: line2
252:
253id: my-id
254:should be ignored too\rretry:42
255retry:
256
257data:second
258
259";
260
261 let mut inner = tokio_test::stream_mock::StreamMockBuilder::new();
262 for b in bytes.as_bytes() {
263 inner = inner.next(Ok(slice::from_ref(b)));
264 }
265 inner = inner
266 .next(Err(()))
267 .next(Ok(b"data: hello\n\ndata:ignored\n"));
268
269 let id = Some("my-id".into());
270
271 let mut stream = SseStream::new(inner.build());
272 let events: Vec<_> = (&mut stream).collect().await;
273
274 assert_eq!(
275 events,
276 &[
277 Ok(SseEvent::Retry(42)),
278 Ok(SseEvent::Message(MessageEvent {
279 event: "my-event".into(),
280 data: "line1\nline2".into(),
281 last_event_id: id.clone()
282 })),
283 Ok(SseEvent::Message(MessageEvent {
284 event: "message".into(),
285 data: "second".into(),
286 last_event_id: id.clone()
287 })),
288 Err(SseStreamError::Inner(())),
289 Ok(SseEvent::Message(MessageEvent {
290 event: "message".into(),
291 data: "hello".into(),
292 last_event_id: id.clone()
293 })),
294 ]
295 );
296
297 assert!(stream.is_closed());
298
299 Ok(())
300 })
301}