Skip to main content

stream_rs/
stream.rs

1//! Async adapter: turn a byte [`Stream`] into a [`Stream`] of [`SseEvent`]s.
2//!
3//! This module is only compiled with the `stream` feature, which pulls in
4//! `futures-core` (and nothing else). It wraps any
5//! `Stream<Item = Result<Bytes, E>>`, where `Bytes: AsRef<[u8]>`, and drives an
6//! [`SseParser`] across chunk boundaries.
7//!
8//! ```no_run
9//! # async fn run() {
10//! use futures::stream::{self, StreamExt};
11//! use stream_rs::stream::SseStream;
12//!
13//! let body = stream::iter(vec![
14//!     Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
15//!     Ok(b"\n".to_vec()),
16//! ]);
17//! let mut events = SseStream::new(body);
18//! while let Some(ev) = events.next().await {
19//!     let ev = ev.unwrap();
20//!     println!("{}", ev.data);
21//! }
22//! # }
23//! ```
24
25use alloc::collections::VecDeque;
26use alloc::vec::Vec;
27use core::pin::Pin;
28use core::task::{Context, Poll};
29
30use futures_core::Stream;
31
32use crate::sse::{SseEvent, SseParser};
33
34/// Adapts a fallible byte stream into a stream of parsed [`SseEvent`]s.
35///
36/// Errors from the underlying byte stream are forwarded unchanged. When the
37/// byte stream ends, any buffered-but-undispatched data is discarded per the
38/// SSE specification.
39///
40/// The adapter keeps a single scratch `Vec` that the parser writes into on
41/// every poll and a `VecDeque` it drains from, so steady-state polling performs
42/// no per-chunk heap allocation beyond the events themselves.
43pub struct SseStream<S, B, E>
44where
45    S: Stream<Item = Result<B, E>>,
46    B: AsRef<[u8]>,
47{
48    inner: S,
49    parser: SseParser,
50    /// Events parsed but not yet yielded, drained front-to-back.
51    ready: VecDeque<SseEvent>,
52    /// Reused scratch buffer the parser writes each poll's events into.
53    scratch: Vec<SseEvent>,
54    done: bool,
55}
56
57impl<S, B, E> SseStream<S, B, E>
58where
59    S: Stream<Item = Result<B, E>>,
60    B: AsRef<[u8]>,
61{
62    /// Wrap a byte stream.
63    pub fn new(inner: S) -> Self {
64        Self {
65            inner,
66            parser: SseParser::new(),
67            ready: VecDeque::new(),
68            scratch: Vec::new(),
69            done: false,
70        }
71    }
72}
73
74impl<S, B, E> Stream for SseStream<S, B, E>
75where
76    S: Stream<Item = Result<B, E>> + Unpin,
77    B: AsRef<[u8]>,
78{
79    type Item = Result<SseEvent, E>;
80
81    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82        loop {
83            if let Some(ev) = self.ready.pop_front() {
84                return Poll::Ready(Some(Ok(ev)));
85            }
86            if self.done {
87                return Poll::Ready(None);
88            }
89
90            match Pin::new(&mut self.inner).poll_next(cx) {
91                Poll::Ready(Some(Ok(chunk))) => {
92                    // Parse into the reusable scratch buffer, then move the
93                    // events into the ready queue. `scratch` keeps its capacity
94                    // across polls, so no new allocation happens per chunk.
95                    let this = &mut *self;
96                    this.scratch.clear();
97                    this.parser.feed(chunk.as_ref(), &mut this.scratch);
98                    this.ready.extend(this.scratch.drain(..));
99                    // Loop: drain whatever we just parsed.
100                }
101                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
102                Poll::Ready(None) => {
103                    self.done = true;
104                    // Spec: a trailing unterminated event is discarded. `finish`
105                    // processes any final line and clears parser state; it never
106                    // emits, so there is nothing to drain afterwards.
107                    let this = &mut *self;
108                    this.scratch.clear();
109                    this.parser.finish(&mut this.scratch);
110                    return Poll::Ready(None);
111                }
112                Poll::Pending => return Poll::Pending,
113            }
114        }
115    }
116}