Skip to main content

fprovider/
stream.rs

1//! Streaming event contracts and in-memory stream utilities.
2//!
3//! ```rust
4//! use fprovider::{BoxedEventStream, StreamEvent, VecEventStream};
5//!
6//! let stream = VecEventStream::new(vec![Ok(StreamEvent::TextDelta("hello".into()))]);
7//! let _boxed: BoxedEventStream<'static> = Box::pin(stream);
8//! ```
9
10use std::collections::VecDeque;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use futures_core::Stream;
15
16use crate::{Message, ModelResponse, ProviderError, ToolCall};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum StreamEvent {
20    TextDelta(String),
21    ToolCallDelta(ToolCall),
22    MessageComplete(Message),
23    ResponseComplete(ModelResponse),
24}
25
26/// Provider stream contract.
27///
28/// Invariants for consumers:
29/// - Events are emitted in source order.
30/// - `TextDelta` and `ToolCallDelta` may appear zero or more times.
31/// - `MessageComplete` and `ResponseComplete` are terminal milestones and, when present,
32///   arrive after all related deltas.
33/// - Once the stream yields `None`, it must not yield additional items.
34pub trait ModelEventStream: Stream<Item = Result<StreamEvent, ProviderError>> + Send {}
35
36impl<T> ModelEventStream for T where T: Stream<Item = Result<StreamEvent, ProviderError>> + Send {}
37
38pub type BoxedEventStream<'a> = Pin<Box<dyn ModelEventStream + 'a>>;
39
40#[derive(Debug)]
41pub struct VecEventStream {
42    events: VecDeque<Result<StreamEvent, ProviderError>>,
43}
44
45impl VecEventStream {
46    pub fn new(events: Vec<Result<StreamEvent, ProviderError>>) -> Self {
47        Self {
48            events: events.into(),
49        }
50    }
51}
52
53impl Stream for VecEventStream {
54    type Item = Result<StreamEvent, ProviderError>;
55
56    fn poll_next(
57        mut self: Pin<&mut Self>,
58        _cx: &mut Context<'_>,
59    ) -> Poll<Option<Result<StreamEvent, ProviderError>>> {
60        Poll::Ready(self.events.pop_front())
61    }
62}