Skip to main content

sim_lib_stream_combinators/
stream.rs

1use std::sync::Arc;
2
3use sim_kernel::{Cx, Event, Expr, Ref, Result, Symbol};
4use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamMetadata, StreamValue};
5
6/// Lazy source of stream packets backing a [`Stream`].
7///
8/// A `StreamNode` is the pull-based engine behind a combinator: it exposes the
9/// stream metadata and yields one [`StreamItem`] at a time, advancing only when
10/// asked. Combinators wrap one or more upstream streams in their own node so
11/// that work is deferred until packets are actually pulled. Implementations are
12/// `Send + Sync` so a built graph can be shared across threads.
13pub trait StreamNode: Send + Sync {
14    /// Returns the metadata describing this node's output stream.
15    fn metadata(&self) -> &StreamMetadata;
16    /// Pulls the next packet, or `Ok(None)` when no packet is available yet.
17    fn next_packet(&self) -> Result<Option<StreamItem>>;
18    /// Reports whether the node has reached its terminal `done` state.
19    fn is_done(&self) -> Result<bool>;
20}
21
22/// Cloneable handle to a lazy combinator stream.
23///
24/// A `Stream` is a thin shared pointer over a [`StreamNode`]: cloning it shares
25/// the same underlying source rather than copying packets. It is the value that
26/// every combinator in this crate consumes and produces, forming pull-based
27/// graphs over the homogeneous `sim-stream` packet spine.
28///
29/// # Examples
30///
31/// ```
32/// use sim_kernel::{Expr, Symbol};
33/// use sim_lib_stream_core::{
34///     BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia,
35///     StreamMetadata, StreamPacket,
36/// };
37/// use sim_lib_stream_combinators::Stream;
38///
39/// let metadata = StreamMetadata::new(
40///     Symbol::qualified("stream", "doc"),
41///     StreamMedia::Data,
42///     StreamDirection::Source,
43///     Symbol::qualified("clock", "doc"),
44///     BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
45/// );
46/// let item = StreamItem::new(StreamPacket::data(
47///     Symbol::qualified("stream/data", "model-event"),
48///     Expr::Nil,
49/// ));
50/// let stream = Stream::pull(metadata, vec![item.clone()]);
51/// assert_eq!(stream.take_packets(8).unwrap(), vec![item]);
52/// assert!(stream.is_done().unwrap());
53/// ```
54#[derive(Clone)]
55pub struct Stream {
56    inner: Arc<dyn StreamNode>,
57}
58
59impl Stream {
60    /// Wraps a [`StreamNode`] implementation in a shareable `Stream` handle.
61    pub fn new(inner: impl StreamNode + 'static) -> Self {
62        Self {
63            inner: Arc::new(inner),
64        }
65    }
66
67    /// Builds a stream that replays the packets held by a stream-core value.
68    pub fn from_value(value: Arc<StreamValue>) -> Self {
69        Self::new(ValueStream { value })
70    }
71
72    /// Builds an in-memory pull stream from explicit metadata and packets.
73    pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
74        Self::from_value(Arc::new(StreamValue::pull(metadata, items)))
75    }
76
77    /// Returns the metadata describing this stream's media, clock, and buffer.
78    pub fn metadata(&self) -> &StreamMetadata {
79        self.inner.metadata()
80    }
81
82    /// Pulls the next packet, or `Ok(None)` when none is currently available.
83    pub fn next_packet(&self) -> Result<Option<StreamItem>> {
84        self.inner.next_packet()
85    }
86
87    /// Reports whether the stream has reached its terminal `done` state.
88    pub fn is_done(&self) -> Result<bool> {
89        self.inner.is_done()
90    }
91
92    /// Pulls up to `limit` packets, stopping early when the source is drained.
93    pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
94        let mut out = Vec::new();
95        for _ in 0..limit {
96            let Some(item) = self.next_packet()? else {
97                break;
98            };
99            out.push(item);
100        }
101        Ok(out)
102    }
103
104    /// Drains the stream into kernel events, one chunk event per packet.
105    ///
106    /// Sequence numbers start at `start_seq` and increase per packet; a final
107    /// `done` event for `run` is appended once the source reports done.
108    pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
109        let mut seq = start_seq;
110        let mut out = Vec::new();
111        while let Some(item) = self.next_packet()? {
112            out.push(item.chunk_event(cx, run.clone(), seq)?);
113            seq = seq.saturating_add(1);
114        }
115        if self.is_done()? {
116            out.push(Event::done(run, seq)?);
117        }
118        Ok(out)
119    }
120
121    /// Returns a stream that rewrites each data packet's payload expression.
122    ///
123    /// Method form of the free [`map_data_expr`](crate::map_data_expr)
124    /// combinator; non-data packets pass through unchanged.
125    pub fn map_data_expr<F>(self, f: F) -> Self
126    where
127        F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
128    {
129        crate::ops::map_data_expr(self, f)
130    }
131
132    /// Returns a stream keeping only data packets of the given `kind`.
133    ///
134    /// Method form of the free [`filter_data_kind`](crate::filter_data_kind)
135    /// combinator.
136    pub fn filter_data_kind(self, kind: Symbol) -> Self {
137        crate::ops::filter_data_kind(self, kind)
138    }
139
140    /// Returns a stream keeping data packets whose payload matches `matches`.
141    ///
142    /// Method form of the free [`filter_data_shape`](crate::filter_data_shape)
143    /// combinator.
144    pub fn filter_data_shape<F>(self, matches: F) -> Self
145    where
146        F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
147    {
148        crate::ops::filter_data_shape(self, matches)
149    }
150
151    /// Returns a stream that batches packets into windows of `count` packets.
152    ///
153    /// Method form of the free [`window_by_count`](crate::window_by_count)
154    /// combinator.
155    pub fn window_by_count(self, count: usize) -> Self {
156        crate::ops::window_by_count(self, count)
157    }
158
159    /// Returns a stream that observes each diagnostic packet without altering it.
160    ///
161    /// Method form of the free [`tap_diagnostics`](crate::tap_diagnostics)
162    /// combinator.
163    pub fn tap_diagnostics<F>(self, f: F) -> Self
164    where
165        F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
166    {
167        crate::ops::tap_diagnostics(self, f)
168    }
169}
170
171struct ValueStream {
172    value: Arc<StreamValue>,
173}
174
175impl StreamNode for ValueStream {
176    fn metadata(&self) -> &StreamMetadata {
177        self.value.metadata()
178    }
179
180    fn next_packet(&self) -> Result<Option<StreamItem>> {
181        self.value.next_packet()
182    }
183
184    fn is_done(&self) -> Result<bool> {
185        self.value.is_done()
186    }
187}