1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
use alloc::sync::Arc;
use core::{
pin::Pin,
str,
task::{self, ready, Poll},
};
use thiserror::Error;
use bytes::Buf;
use futures_core::{
stream::{FusedStream, Stream},
TryStream,
};
use pin_project_lite::pin_project;
use crate::{PayloadTooLargeError, SseDecoder, SseEvent};
pin_project! {
/// An asynchronous stream wrapper that parses SSE events from an underlying byte stream.
#[derive(Debug, Clone, Default)]
pub struct SseStream<T: TryStream> {
#[pin]
inner: Option<T>,
buf: Option<T::Ok>,
decoder: SseDecoder,
}
}
impl<T: TryStream> SseStream<T> {
/// Creates a new, disconnected [`SseStream`].
///
/// A disconnected stream will immediately yield [`None`] (terminated) if polled.
/// This constructor is primarily useful when you need to store the [`SseStream`]
/// inside a struct before the network connection is established.
///
/// To make the stream active, you must attach an inner stream using
/// [`attach()`](Self::attach).
///
/// # Example
/// ```
/// # use futures_core::stream::Stream;
/// # use sse_core::*;
/// # async fn fetch_http_stream() -> impl Stream<Item = Result<&'static [u8], ()>> {
/// # tokio_test::stream_mock::StreamMockBuilder::new().build()
/// # }
/// # tokio_test::block_on(async {
/// let mut stream = SseStream::disconnected();
///
/// // ... later, when the network is ready:
/// let byte_stream = fetch_http_stream().await;
/// stream.attach(byte_stream);
/// # })
/// ```
#[inline]
#[must_use]
pub fn disconnected() -> Self {
Self::with_decoder(SseDecoder::new())
}
/// Creates a disconnected stream initialized with a custom decoder.
///
/// See the [`disconnected()`](Self::disconnected) function for more information.
#[inline]
#[must_use]
pub fn with_decoder(decoder: SseDecoder) -> Self {
Self {
inner: None,
buf: None,
decoder,
}
}
/// Creates a new [`SseStream`] wrapping the provided inner stream.
#[inline]
#[must_use]
pub fn new(inner: T) -> Self {
let mut slf = Self::disconnected();
slf.inner = Some(inner);
slf
}
/// Consumes the stream and returns the underlying state-machine decoder.
#[inline]
pub fn take_decoder(self) -> SseDecoder {
let Self { mut decoder, .. } = self;
decoder.reconnect();
decoder
}
/// Returns `true` if the stream is currently disconnected.
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_none()
}
/// Returns the current `Last-Event-ID` parsed by the underlying decoder.
#[inline]
#[must_use]
pub fn last_event_id(&self) -> Option<&Arc<str>> {
self.decoder.last_event_id()
}
/// Disconnects the inner stream while retaining the underlying parser's state.
///
/// This drops the active network connection but safely preserves the most
/// recently parsed `Last-Event-ID` within the decoder. This is the standard
/// method to temporarily pause a stream or handle a dropped connection,
/// allowing you to later resume exactly where you left off.
///
/// * To close the stream and **inject** a new ID for the next connection, use [`close_with_id()`](Self::close_with_id).
/// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
#[inline]
pub fn close(&mut self) {
self.decoder.reconnect();
self.clear_bufs();
}
/// Disconnects the stream and completely purges the underlying parser's state.
///
/// This drops the inner stream, clears all internal byte buffers, and
/// permanently drops the currently tracked `Last-Event-ID`. It effectively
/// returns the `SseStream` to the exact state it was in when initially
/// created via [`disconnected()`](Self::disconnected).
///
/// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
/// * To close the stream and **inject** a new ID, use [`close_with_id()`](Self::close_with_id).
#[inline]
pub fn close_and_clear(&mut self) {
self.decoder.clear();
self.clear_bufs();
}
/// Disconnects the inner stream and explicitly overrides the underlying
/// decoder's `Last-Event-ID` in preparation for a future connection.
///
/// This is particularly useful in async contexts where you must drop the
/// active stream, inject a new ID, and then yield back to the runtime before
/// establishing a new network connection. The injected ID will be available
/// immediately via [`last_event_id()`](Self::last_event_id).
///
/// * To close the stream and **keep** the current ID, use [`close()`](Self::close).
/// * To close the stream and completely **wipe** the session state, use [`close_and_clear()`](Self::close_and_clear).
#[inline]
pub fn close_with_id(&mut self, id: Option<Arc<str>>) {
self.decoder.reconnect_with_id(id);
self.clear_bufs();
}
/// Attaches a new inner stream to resume processing events.
///
/// This method resets the underlying parser's buffers but safely retains the most
/// recently parsed `Last-Event-ID`. It is the standard way to recover from
/// a dropped network connection, allowing you to resume exactly where you left off.
///
/// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
/// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
#[inline]
pub fn attach(&mut self, inner: T) {
self.close();
self.inner = Some(inner);
}
/// Attaches a new inner stream and completely purges the underlying parser's state.
///
/// This method is used when you want to reuse an existing `SseStream` allocation
/// for a completely fresh connection or a different server. It clears all internal
/// byte buffers and permanently drops the currently tracked `Last-Event-ID`.
///
/// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
/// * To attach a stream and **inject** a new ID, use [`attach_with_id()`](Self::attach_with_id).
#[inline]
pub fn clear_and_attach(&mut self, inner: T) {
self.close_and_clear();
self.inner = Some(inner);
}
/// Attaches a new inner stream to resume processing, explicitly overriding
/// the `Last-Event-ID` in the underlying decoder.
///
/// This method is primarily used when recovering an offline session where
/// you need to initialize the stream with a saved ID (e.g., from a local database)
/// right as you provide the new HTTP response stream.
///
/// * To attach a stream and **keep** the current ID, use [`attach()`](Self::attach).
/// * To attach a stream and completely **wipe** the session state, use [`clear_and_attach()`](Self::clear_and_attach).
#[inline]
pub fn attach_with_id(&mut self, inner: T, id: Option<Arc<str>>) {
self.close_with_id(id);
self.inner = Some(inner);
}
#[inline]
fn clear_bufs(&mut self) {
self.inner = None;
self.buf = None;
}
}
/// An alias for [`Result`] with the error set to [`SseStreamError<E>`].
pub type SseStreamResult<T, E> = Result<T, SseStreamError<E>>;
/// Errors that can occur while reading from an [`SseStream`].
#[derive(Debug, Clone, PartialEq, Eq, Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum SseStreamError<T> {
/// A single field (e.g., data or Last-Event-ID) exceeded the configured byte limit.
#[error("{0}")]
PayloadTooLarge(PayloadTooLargeError),
/// An error propagated from the inner [`TryStream`].
#[error("{0}")]
Inner(#[from] T),
}
impl<T: TryStream> Stream for SseStream<T>
where
T::Ok: Buf,
{
type Item = SseStreamResult<SseEvent, T::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut slf = self.project();
let Some(mut inner) = slf.inner.as_mut().as_pin_mut() else {
return Poll::Ready(None);
};
loop {
if let Some(event) = (slf.buf.as_mut())
.and_then(|buf| slf.decoder.next(buf))
.transpose()
.map_err(SseStreamError::PayloadTooLarge)?
{
return Poll::Ready(Some(Ok(event)));
};
*slf.buf = ready!(inner.as_mut().try_poll_next(cx)?);
if slf.buf.is_none() {
slf.inner.set(None);
return Poll::Ready(None);
}
}
}
}
impl<T: TryStream> FusedStream for SseStream<T>
where
T::Ok: Buf,
{
fn is_terminated(&self) -> bool {
self.is_closed()
}
}
#[test]
fn hard_parse() -> Result<(), PayloadTooLargeError> {
use crate::MessageEvent;
use std::slice;
use tokio_stream::StreamExt;
tokio_test::block_on(async {
// Source: https://github.com/jpopesculian/eventsource-stream/blob/v0.2.3/tests/eventsource-stream.rs
let bytes = "
:
event: my-event\r
data:line1
data: line2
:
id: my-id
:should be ignored too\rretry:42
retry:
data:second
";
let mut inner = tokio_test::stream_mock::StreamMockBuilder::new();
for b in bytes.as_bytes() {
inner = inner.next(Ok(slice::from_ref(b)));
}
inner = inner
.next(Err(()))
.next(Ok(b"data: hello\n\ndata:ignored\n"));
let id = Some("my-id".into());
let mut stream = SseStream::new(inner.build());
let events: Vec<_> = (&mut stream).collect().await;
assert_eq!(
events,
&[
Ok(SseEvent::Retry(42)),
Ok(SseEvent::Message(MessageEvent {
event: "my-event".into(),
data: "line1\nline2".into(),
last_event_id: id.clone()
})),
Ok(SseEvent::Message(MessageEvent {
event: "message".into(),
data: "second".into(),
last_event_id: id.clone()
})),
Err(SseStreamError::Inner(())),
Ok(SseEvent::Message(MessageEvent {
event: "message".into(),
data: "hello".into(),
last_event_id: id.clone()
})),
]
);
assert!(stream.is_closed());
Ok(())
})
}