stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! Async adapter: turn a byte [`Stream`] into a [`Stream`] of [`SseEvent`]s.
//!
//! This module is only compiled with the `stream` feature, which pulls in
//! `futures-core` (and nothing else). It wraps any
//! `Stream<Item = Result<Bytes, E>>`, where `Bytes: AsRef<[u8]>`, and drives an
//! [`SseParser`] across chunk boundaries.
//!
//! ```no_run
//! # async fn run() {
//! use futures::stream::{self, StreamExt};
//! use stream_rs::stream::SseStream;
//!
//! let body = stream::iter(vec![
//!     Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
//!     Ok(b"\n".to_vec()),
//! ]);
//! let mut events = SseStream::new(body);
//! while let Some(ev) = events.next().await {
//!     let ev = ev.unwrap();
//!     println!("{}", ev.data);
//! }
//! # }
//! ```

use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::Stream;

use crate::sse::{SseEvent, SseParser};

/// Adapts a fallible byte stream into a stream of parsed [`SseEvent`]s.
///
/// Errors from the underlying byte stream are forwarded unchanged. When the
/// byte stream ends, any buffered-but-undispatched data is discarded per the
/// SSE specification.
///
/// The adapter keeps a single scratch `Vec` that the parser writes into on
/// every poll and a `VecDeque` it drains from, so steady-state polling performs
/// no per-chunk heap allocation beyond the events themselves.
pub struct SseStream<S, B, E>
where
    S: Stream<Item = Result<B, E>>,
    B: AsRef<[u8]>,
{
    inner: S,
    parser: SseParser,
    /// Events parsed but not yet yielded, drained front-to-back.
    ready: VecDeque<SseEvent>,
    /// Reused scratch buffer the parser writes each poll's events into.
    scratch: Vec<SseEvent>,
    done: bool,
}

impl<S, B, E> SseStream<S, B, E>
where
    S: Stream<Item = Result<B, E>>,
    B: AsRef<[u8]>,
{
    /// Wrap a byte stream.
    pub fn new(inner: S) -> Self {
        Self {
            inner,
            parser: SseParser::new(),
            ready: VecDeque::new(),
            scratch: Vec::new(),
            done: false,
        }
    }
}

impl<S, B, E> Stream for SseStream<S, B, E>
where
    S: Stream<Item = Result<B, E>> + Unpin,
    B: AsRef<[u8]>,
{
    type Item = Result<SseEvent, E>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            if let Some(ev) = self.ready.pop_front() {
                return Poll::Ready(Some(Ok(ev)));
            }
            if self.done {
                return Poll::Ready(None);
            }

            match Pin::new(&mut self.inner).poll_next(cx) {
                Poll::Ready(Some(Ok(chunk))) => {
                    // Parse into the reusable scratch buffer, then move the
                    // events into the ready queue. `scratch` keeps its capacity
                    // across polls, so no new allocation happens per chunk.
                    let this = &mut *self;
                    this.scratch.clear();
                    this.parser.feed(chunk.as_ref(), &mut this.scratch);
                    this.ready.extend(this.scratch.drain(..));
                    // Loop: drain whatever we just parsed.
                }
                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
                Poll::Ready(None) => {
                    self.done = true;
                    // Spec: a trailing unterminated event is discarded. `finish`
                    // processes any final line and clears parser state; it never
                    // emits, so there is nothing to drain afterwards.
                    let this = &mut *self;
                    this.scratch.clear();
                    this.parser.finish(&mut this.scratch);
                    return Poll::Ready(None);
                }
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}