stream_rs/stream.rs
1//! Async adapter: turn a byte [`Stream`] into a [`Stream`] of [`SseEvent`]s.
2//!
3//! This module is only compiled with the `stream` feature, which pulls in
4//! `futures-core` (and nothing else). It wraps any
5//! `Stream<Item = Result<Bytes, E>>`, where `Bytes: AsRef<[u8]>`, and drives an
6//! [`SseParser`] across chunk boundaries.
7//!
8//! ```no_run
9//! # async fn run() {
10//! use futures::stream::{self, StreamExt};
11//! use stream_rs::stream::SseStream;
12//!
13//! let body = stream::iter(vec![
14//! Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
15//! Ok(b"\n".to_vec()),
16//! ]);
17//! let mut events = SseStream::new(body);
18//! while let Some(ev) = events.next().await {
19//! let ev = ev.unwrap();
20//! println!("{}", ev.data);
21//! }
22//! # }
23//! ```
24
25use alloc::collections::VecDeque;
26use alloc::vec::Vec;
27use core::pin::Pin;
28use core::task::{Context, Poll};
29
30use futures_core::Stream;
31
32use crate::sse::{SseEvent, SseParser};
33
34/// Adapts a fallible byte stream into a stream of parsed [`SseEvent`]s.
35///
36/// Errors from the underlying byte stream are forwarded unchanged. When the
37/// byte stream ends, any buffered-but-undispatched data is discarded per the
38/// SSE specification.
39///
40/// The adapter keeps a single scratch `Vec` that the parser writes into on
41/// every poll and a `VecDeque` it drains from, so steady-state polling performs
42/// no per-chunk heap allocation beyond the events themselves.
43pub struct SseStream<S, B, E>
44where
45 S: Stream<Item = Result<B, E>>,
46 B: AsRef<[u8]>,
47{
48 inner: S,
49 parser: SseParser,
50 /// Events parsed but not yet yielded, drained front-to-back.
51 ready: VecDeque<SseEvent>,
52 /// Reused scratch buffer the parser writes each poll's events into.
53 scratch: Vec<SseEvent>,
54 done: bool,
55}
56
57impl<S, B, E> SseStream<S, B, E>
58where
59 S: Stream<Item = Result<B, E>>,
60 B: AsRef<[u8]>,
61{
62 /// Wrap a byte stream.
63 pub fn new(inner: S) -> Self {
64 Self {
65 inner,
66 parser: SseParser::new(),
67 ready: VecDeque::new(),
68 scratch: Vec::new(),
69 done: false,
70 }
71 }
72}
73
74impl<S, B, E> Stream for SseStream<S, B, E>
75where
76 S: Stream<Item = Result<B, E>> + Unpin,
77 B: AsRef<[u8]>,
78{
79 type Item = Result<SseEvent, E>;
80
81 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 loop {
83 if let Some(ev) = self.ready.pop_front() {
84 return Poll::Ready(Some(Ok(ev)));
85 }
86 if self.done {
87 return Poll::Ready(None);
88 }
89
90 match Pin::new(&mut self.inner).poll_next(cx) {
91 Poll::Ready(Some(Ok(chunk))) => {
92 // Parse into the reusable scratch buffer, then move the
93 // events into the ready queue. `scratch` keeps its capacity
94 // across polls, so no new allocation happens per chunk.
95 let this = &mut *self;
96 this.scratch.clear();
97 this.parser.feed(chunk.as_ref(), &mut this.scratch);
98 this.ready.extend(this.scratch.drain(..));
99 // Loop: drain whatever we just parsed.
100 }
101 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
102 Poll::Ready(None) => {
103 self.done = true;
104 // Spec: a trailing unterminated event is discarded. `finish`
105 // processes any final line and clears parser state; it never
106 // emits, so there is nothing to drain afterwards.
107 let this = &mut *self;
108 this.scratch.clear();
109 this.parser.finish(&mut this.scratch);
110 return Poll::Ready(None);
111 }
112 Poll::Pending => return Poll::Pending,
113 }
114 }
115 }
116}