Skip to main content

sim_lib_stream_combinators/
ops.rs

1mod nodes;
2
3use std::sync::{Arc, Mutex};
4
5use sim_kernel::{Cx, Diagnostic, Error, Event, Expr, Ref, Result, Symbol, Tick};
6use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamPacket};
7
8use crate::stream::Stream;
9
10/// A reusable, composable transformation from one [`Stream`] to another.
11///
12/// Stages are the building blocks fed to [`pipe`]: each `*_stage` constructor
13/// captures its configuration into a boxed closure so the same transformation
14/// can be applied to multiple sources.
15pub type StreamStage = Box<dyn Fn(Stream) -> Stream + Send + Sync>;
16
17type MapFn = Arc<dyn Fn(StreamItem) -> Result<StreamItem> + Send + Sync>;
18type DataMapFn = Arc<dyn Fn(Expr) -> Result<Expr> + Send + Sync>;
19type PredicateFn = Arc<dyn Fn(&StreamItem) -> Result<bool> + Send + Sync>;
20type ShapePredicateFn = Arc<dyn Fn(&Expr) -> Result<bool> + Send + Sync>;
21type TapFn = Arc<dyn Fn(&StreamItem) -> Result<()> + Send + Sync>;
22type DiagnosticTapFn = Arc<dyn Fn(&StreamDiagnostic) -> Result<()> + Send + Sync>;
23type MergeKeyFn = Arc<dyn Fn(&StreamItem) -> Option<Ref> + Send + Sync>;
24type ClockConvertFn =
25    Arc<dyn Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync>;
26
27/// Applies a sequence of [`StreamStage`] transforms left-to-right to `source`.
28///
29/// # Examples
30///
31/// ```
32/// use sim_kernel::{Expr, Symbol};
33/// use sim_lib_stream_core::{
34///     BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia,
35///     StreamMetadata, StreamPacket,
36/// };
37/// use sim_lib_stream_combinators::{identity, pipe, take_stage, Stream};
38///
39/// let metadata = StreamMetadata::new(
40///     Symbol::qualified("stream", "doc"),
41///     StreamMedia::Data,
42///     StreamDirection::Source,
43///     Symbol::qualified("clock", "doc"),
44///     BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
45/// );
46/// let item = || StreamItem::new(StreamPacket::data(
47///     Symbol::qualified("stream/data", "model-event"),
48///     Expr::Nil,
49/// ));
50/// let stream = Stream::pull(metadata, vec![item(), item(), item()]);
51///
52/// let out = pipe(stream, vec![identity(), take_stage(2)]);
53/// assert_eq!(out.take_packets(8).unwrap().len(), 2);
54/// ```
55pub fn pipe(source: Stream, stages: Vec<StreamStage>) -> Stream {
56    stages
57        .into_iter()
58        .fold(source, |stream, stage| stage(stream))
59}
60
61/// Returns a stage that forwards its source stream unchanged.
62pub fn identity() -> StreamStage {
63    Box::new(|stream| stream)
64}
65
66/// Returns a stream that applies `f` to every packet of `source`.
67pub fn map<F>(source: Stream, f: F) -> Stream
68where
69    F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
70{
71    nodes::map_with(source, Arc::new(f))
72}
73
74/// Returns a reusable stage form of [`map`].
75pub fn map_stage<F>(f: F) -> StreamStage
76where
77    F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
78{
79    let f: MapFn = Arc::new(f);
80    Box::new(move |source| nodes::map_with(source, Arc::clone(&f)))
81}
82
83/// Returns a stream that rewrites each data packet's payload expression with `f`.
84///
85/// Non-data packets pass through untouched.
86pub fn map_data_expr<F>(source: Stream, f: F) -> Stream
87where
88    F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
89{
90    nodes::map_data_expr_with(source, Arc::new(f))
91}
92
93/// Returns a reusable stage form of [`map_data_expr`].
94pub fn map_data_expr_stage<F>(f: F) -> StreamStage
95where
96    F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
97{
98    let f: DataMapFn = Arc::new(f);
99    Box::new(move |source| nodes::map_data_expr_with(source, Arc::clone(&f)))
100}
101
102/// Returns a stream keeping only packets for which `pred` holds.
103pub fn filter<F>(source: Stream, pred: F) -> Stream
104where
105    F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
106{
107    nodes::filter_with(source, Arc::new(pred))
108}
109
110/// Returns a reusable stage form of [`filter`].
111pub fn filter_stage<F>(pred: F) -> StreamStage
112where
113    F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
114{
115    let pred: PredicateFn = Arc::new(pred);
116    Box::new(move |source| nodes::filter_with(source, Arc::clone(&pred)))
117}
118
119/// Returns a stream keeping only data packets whose kind equals `kind`.
120pub fn filter_data_kind(source: Stream, kind: Symbol) -> Stream {
121    nodes::filter_with(
122        source,
123        Arc::new(move |item| match item.packet() {
124            StreamPacket::Data(packet) => Ok(packet.kind == kind),
125            _ => Ok(false),
126        }),
127    )
128}
129
130/// Returns a reusable stage form of [`filter_data_kind`].
131pub fn filter_data_kind_stage(kind: Symbol) -> StreamStage {
132    Box::new(move |source| filter_data_kind(source, kind.clone()))
133}
134
135/// Returns a stream keeping only data packets whose payload matches `matches`.
136///
137/// Non-data packets are dropped.
138pub fn filter_data_shape<F>(source: Stream, matches: F) -> Stream
139where
140    F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
141{
142    nodes::filter_data_shape_with(source, Arc::new(matches))
143}
144
145/// Returns a reusable stage form of [`filter_data_shape`].
146pub fn filter_data_shape_stage<F>(matches: F) -> StreamStage
147where
148    F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
149{
150    let matches: ShapePredicateFn = Arc::new(matches);
151    Box::new(move |source| nodes::filter_data_shape_with(source, Arc::clone(&matches)))
152}
153
154/// Returns a stream that runs `f` on each packet as a side effect, unchanged.
155pub fn tap<F>(source: Stream, f: F) -> Stream
156where
157    F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
158{
159    nodes::tap_with(source, Arc::new(f))
160}
161
162/// Returns a reusable stage form of [`tap`].
163pub fn tap_stage<F>(f: F) -> StreamStage
164where
165    F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
166{
167    let f: TapFn = Arc::new(f);
168    Box::new(move |source| nodes::tap_with(source, Arc::clone(&f)))
169}
170
171/// Returns a stream that runs `f` on each diagnostic packet, leaving it intact.
172///
173/// Non-diagnostic packets pass through without invoking `f`.
174pub fn tap_diagnostics<F>(source: Stream, f: F) -> Stream
175where
176    F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
177{
178    nodes::tap_diagnostics_with(source, Arc::new(f))
179}
180
181/// Returns a reusable stage form of [`tap_diagnostics`].
182pub fn tap_diagnostics_stage<F>(f: F) -> StreamStage
183where
184    F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
185{
186    let f: DiagnosticTapFn = Arc::new(f);
187    Box::new(move |source| nodes::tap_diagnostics_with(source, Arc::clone(&f)))
188}
189
190/// Returns a stream that yields at most the first `limit` packets of `source`.
191pub fn take(source: Stream, limit: usize) -> Stream {
192    nodes::take_with_limit(source, limit)
193}
194
195/// Returns a reusable stage form of [`take`].
196pub fn take_stage(limit: usize) -> StreamStage {
197    Box::new(move |source| take(source, limit))
198}
199
200/// Returns a stream that batches packets into windows of `count` packets each.
201///
202/// Each window is emitted as a data packet of [`stream_window_data_kind`]
203/// whose payload lists the windowed packets; a trailing partial window is kept.
204pub fn window_by_count(source: Stream, count: usize) -> Stream {
205    nodes::window_by_count(source, count)
206}
207
208/// Returns a reusable stage form of [`window_by_count`].
209pub fn window_by_count_stage(count: usize) -> StreamStage {
210    Box::new(move |source| window_by_count(source, count))
211}
212
213/// Returns the canonical data-packet kind emitted by [`window_by_count`].
214pub fn stream_window_data_kind() -> Symbol {
215    Symbol::qualified("stream/data", "window")
216}
217
218/// Returns a stream interleaving `left` and `right` in pull arrival order.
219///
220/// With no clock key, available packets are emitted left-before-right.
221pub fn merge(left: Stream, right: Stream) -> Stream {
222    nodes::merge_with_key(left, right, Arc::new(|_| None))
223}
224
225/// Returns a stream merging `left` and `right` ordered by their `clock` tick.
226///
227/// At each step the packet with the lower tick index on `clock` is emitted
228/// first; packets without that clock tick sort as having no key.
229pub fn merge_by_clock(left: Stream, right: Stream, clock: Symbol) -> Stream {
230    nodes::merge_with_key(
231        left,
232        right,
233        Arc::new(move |item| {
234            item.ticks()
235                .iter()
236                .find(|tick| tick.clock == clock)
237                .map(|tick| tick.index.clone())
238        }),
239    )
240}
241
242/// Two independent readers that each see every packet of a fanned-out source.
243pub struct Fanout {
244    /// First reader over the shared source.
245    pub left: Stream,
246    /// Second reader over the shared source.
247    pub right: Stream,
248}
249
250/// Splits `source` into a `Fanout` of two readers that each see all packets.
251///
252/// Packets pulled by one reader are buffered so the other reader still observes
253/// the full sequence regardless of read order.
254pub fn fan(source: Stream) -> Fanout {
255    let (left, right) = nodes::fan_readers(source);
256    Fanout { left, right }
257}
258
259/// A clock-converted stream paired with the diagnostics its conversion emits.
260///
261/// The conversion closure may report lossy or approximate clock mappings; those
262/// diagnostics accumulate as packets are pulled and can be read back via
263/// [`ClockConvertedStream::diagnostics`].
264pub struct ClockConvertedStream {
265    stream: Stream,
266    diagnostics: Arc<Mutex<Vec<Diagnostic>>>,
267}
268
269impl ClockConvertedStream {
270    /// Borrows the underlying converted stream.
271    pub fn stream(&self) -> &Stream {
272        &self.stream
273    }
274
275    /// Consumes this wrapper and returns the underlying converted stream.
276    pub fn into_stream(self) -> Stream {
277        self.stream
278    }
279
280    /// Pulls the next converted packet from the underlying stream.
281    pub fn next_packet(&self) -> Result<Option<StreamItem>> {
282        self.stream.next_packet()
283    }
284
285    /// Returns the diagnostics accumulated by the conversion so far.
286    pub fn diagnostics(&self) -> Result<Vec<Diagnostic>> {
287        self.diagnostics
288            .lock()
289            .map_err(|_| Error::PoisonedLock("clock-convert diagnostics"))
290            .map(|diagnostics| diagnostics.clone())
291    }
292}
293
294/// Rewrites each packet's ticks via `convert`, collecting its diagnostics.
295///
296/// For every packet, `convert` returns the replacement ticks and any
297/// diagnostics describing the conversion; the diagnostics are gathered into the
298/// returned [`ClockConvertedStream`].
299pub fn clock_convert<F>(source: Stream, convert: F) -> ClockConvertedStream
300where
301    F: Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync + 'static,
302{
303    let diagnostics = Arc::new(Mutex::new(Vec::new()));
304    ClockConvertedStream {
305        stream: nodes::clock_convert_stream(source, Arc::new(convert), Arc::clone(&diagnostics)),
306        diagnostics,
307    }
308}
309
310/// Drains `stream` into kernel events for `run`, starting at `start_seq`.
311///
312/// Free-function form of [`Stream::run_events`](crate::Stream::run_events): one
313/// chunk event per packet followed by a `done` event when the source completes.
314pub fn run_bang(stream: &Stream, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
315    stream.run_events(cx, run, start_seq)
316}