sim_lib_stream_combinators/stream.rs
1use std::sync::Arc;
2
3use sim_kernel::{Cx, Event, Expr, Ref, Result, Symbol};
4use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamMetadata, StreamValue};
5
6/// Lazy source of stream packets backing a [`Stream`].
7///
8/// A `StreamNode` is the pull-based engine behind a combinator: it exposes the
9/// stream metadata and yields one [`StreamItem`] at a time, advancing only when
10/// asked. Combinators wrap one or more upstream streams in their own node so
11/// that work is deferred until packets are actually pulled. Implementations are
12/// `Send + Sync` so a built graph can be shared across threads.
13pub trait StreamNode: Send + Sync {
14 /// Returns the metadata describing this node's output stream.
15 fn metadata(&self) -> &StreamMetadata;
16 /// Pulls the next packet, or `Ok(None)` when no packet is available yet.
17 fn next_packet(&self) -> Result<Option<StreamItem>>;
18 /// Reports whether the node has reached its terminal `done` state.
19 fn is_done(&self) -> Result<bool>;
20}
21
22/// Cloneable handle to a lazy combinator stream.
23///
24/// A `Stream` is a thin shared pointer over a [`StreamNode`]: cloning it shares
25/// the same underlying source rather than copying packets. It is the value that
26/// every combinator in this crate consumes and produces, forming pull-based
27/// graphs over the homogeneous `sim-stream` packet spine.
28///
29/// # Examples
30///
31/// ```
32/// use sim_kernel::{Expr, Symbol};
33/// use sim_lib_stream_core::{
34/// BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia,
35/// StreamMetadata, StreamPacket,
36/// };
37/// use sim_lib_stream_combinators::Stream;
38///
39/// let metadata = StreamMetadata::new(
40/// Symbol::qualified("stream", "doc"),
41/// StreamMedia::Data,
42/// StreamDirection::Source,
43/// Symbol::qualified("clock", "doc"),
44/// BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
45/// );
46/// let item = StreamItem::new(StreamPacket::data(
47/// Symbol::qualified("stream/data", "model-event"),
48/// Expr::Nil,
49/// ));
50/// let stream = Stream::pull(metadata, vec![item.clone()]);
51/// assert_eq!(stream.take_packets(8).unwrap(), vec![item]);
52/// assert!(stream.is_done().unwrap());
53/// ```
54#[derive(Clone)]
55pub struct Stream {
56 inner: Arc<dyn StreamNode>,
57}
58
59impl Stream {
60 /// Wraps a [`StreamNode`] implementation in a shareable `Stream` handle.
61 pub fn new(inner: impl StreamNode + 'static) -> Self {
62 Self {
63 inner: Arc::new(inner),
64 }
65 }
66
67 /// Builds a stream that replays the packets held by a stream-core value.
68 pub fn from_value(value: Arc<StreamValue>) -> Self {
69 Self::new(ValueStream { value })
70 }
71
72 /// Builds an in-memory pull stream from explicit metadata and packets.
73 pub fn pull(metadata: StreamMetadata, items: Vec<StreamItem>) -> Self {
74 Self::from_value(Arc::new(StreamValue::pull(metadata, items)))
75 }
76
77 /// Returns the metadata describing this stream's media, clock, and buffer.
78 pub fn metadata(&self) -> &StreamMetadata {
79 self.inner.metadata()
80 }
81
82 /// Pulls the next packet, or `Ok(None)` when none is currently available.
83 pub fn next_packet(&self) -> Result<Option<StreamItem>> {
84 self.inner.next_packet()
85 }
86
87 /// Reports whether the stream has reached its terminal `done` state.
88 pub fn is_done(&self) -> Result<bool> {
89 self.inner.is_done()
90 }
91
92 /// Pulls up to `limit` packets, stopping early when the source is drained.
93 pub fn take_packets(&self, limit: usize) -> Result<Vec<StreamItem>> {
94 let mut out = Vec::new();
95 for _ in 0..limit {
96 let Some(item) = self.next_packet()? else {
97 break;
98 };
99 out.push(item);
100 }
101 Ok(out)
102 }
103
104 /// Drains the stream into kernel events, one chunk event per packet.
105 ///
106 /// Sequence numbers start at `start_seq` and increase per packet; a final
107 /// `done` event for `run` is appended once the source reports done.
108 pub fn run_events(&self, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
109 let mut seq = start_seq;
110 let mut out = Vec::new();
111 while let Some(item) = self.next_packet()? {
112 out.push(item.chunk_event(cx, run.clone(), seq)?);
113 seq = seq.saturating_add(1);
114 }
115 if self.is_done()? {
116 out.push(Event::done(run, seq)?);
117 }
118 Ok(out)
119 }
120
121 /// Returns a stream that rewrites each data packet's payload expression.
122 ///
123 /// Method form of the free [`map_data_expr`](crate::map_data_expr)
124 /// combinator; non-data packets pass through unchanged.
125 pub fn map_data_expr<F>(self, f: F) -> Self
126 where
127 F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
128 {
129 crate::ops::map_data_expr(self, f)
130 }
131
132 /// Returns a stream keeping only data packets of the given `kind`.
133 ///
134 /// Method form of the free [`filter_data_kind`](crate::filter_data_kind)
135 /// combinator.
136 pub fn filter_data_kind(self, kind: Symbol) -> Self {
137 crate::ops::filter_data_kind(self, kind)
138 }
139
140 /// Returns a stream keeping data packets whose payload matches `matches`.
141 ///
142 /// Method form of the free [`filter_data_shape`](crate::filter_data_shape)
143 /// combinator.
144 pub fn filter_data_shape<F>(self, matches: F) -> Self
145 where
146 F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
147 {
148 crate::ops::filter_data_shape(self, matches)
149 }
150
151 /// Returns a stream that batches packets into windows of `count` packets.
152 ///
153 /// Method form of the free [`window_by_count`](crate::window_by_count)
154 /// combinator.
155 pub fn window_by_count(self, count: usize) -> Self {
156 crate::ops::window_by_count(self, count)
157 }
158
159 /// Returns a stream that observes each diagnostic packet without altering it.
160 ///
161 /// Method form of the free [`tap_diagnostics`](crate::tap_diagnostics)
162 /// combinator.
163 pub fn tap_diagnostics<F>(self, f: F) -> Self
164 where
165 F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
166 {
167 crate::ops::tap_diagnostics(self, f)
168 }
169}
170
171struct ValueStream {
172 value: Arc<StreamValue>,
173}
174
175impl StreamNode for ValueStream {
176 fn metadata(&self) -> &StreamMetadata {
177 self.value.metadata()
178 }
179
180 fn next_packet(&self) -> Result<Option<StreamItem>> {
181 self.value.next_packet()
182 }
183
184 fn is_done(&self) -> Result<bool> {
185 self.value.is_done()
186 }
187}