sim_lib_stream_combinators/
ops.rs1mod nodes;
2
3use std::sync::{Arc, Mutex};
4
5use sim_kernel::{Cx, Diagnostic, Error, Event, Expr, Ref, Result, Symbol, Tick};
6use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamPacket};
7
8use crate::stream::Stream;
9
10pub type StreamStage = Box<dyn Fn(Stream) -> Stream + Send + Sync>;
16
17type MapFn = Arc<dyn Fn(StreamItem) -> Result<StreamItem> + Send + Sync>;
18type DataMapFn = Arc<dyn Fn(Expr) -> Result<Expr> + Send + Sync>;
19type PredicateFn = Arc<dyn Fn(&StreamItem) -> Result<bool> + Send + Sync>;
20type ShapePredicateFn = Arc<dyn Fn(&Expr) -> Result<bool> + Send + Sync>;
21type TapFn = Arc<dyn Fn(&StreamItem) -> Result<()> + Send + Sync>;
22type DiagnosticTapFn = Arc<dyn Fn(&StreamDiagnostic) -> Result<()> + Send + Sync>;
23type MergeKeyFn = Arc<dyn Fn(&StreamItem) -> Option<Ref> + Send + Sync>;
24type ClockConvertFn =
25 Arc<dyn Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync>;
26
27pub fn pipe(source: Stream, stages: Vec<StreamStage>) -> Stream {
56 stages
57 .into_iter()
58 .fold(source, |stream, stage| stage(stream))
59}
60
61pub fn identity() -> StreamStage {
63 Box::new(|stream| stream)
64}
65
66pub fn map<F>(source: Stream, f: F) -> Stream
68where
69 F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
70{
71 nodes::map_with(source, Arc::new(f))
72}
73
74pub fn map_stage<F>(f: F) -> StreamStage
76where
77 F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
78{
79 let f: MapFn = Arc::new(f);
80 Box::new(move |source| nodes::map_with(source, Arc::clone(&f)))
81}
82
83pub fn map_data_expr<F>(source: Stream, f: F) -> Stream
87where
88 F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
89{
90 nodes::map_data_expr_with(source, Arc::new(f))
91}
92
93pub fn map_data_expr_stage<F>(f: F) -> StreamStage
95where
96 F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
97{
98 let f: DataMapFn = Arc::new(f);
99 Box::new(move |source| nodes::map_data_expr_with(source, Arc::clone(&f)))
100}
101
102pub fn filter<F>(source: Stream, pred: F) -> Stream
104where
105 F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
106{
107 nodes::filter_with(source, Arc::new(pred))
108}
109
110pub fn filter_stage<F>(pred: F) -> StreamStage
112where
113 F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
114{
115 let pred: PredicateFn = Arc::new(pred);
116 Box::new(move |source| nodes::filter_with(source, Arc::clone(&pred)))
117}
118
119pub fn filter_data_kind(source: Stream, kind: Symbol) -> Stream {
121 nodes::filter_with(
122 source,
123 Arc::new(move |item| match item.packet() {
124 StreamPacket::Data(packet) => Ok(packet.kind == kind),
125 _ => Ok(false),
126 }),
127 )
128}
129
130pub fn filter_data_kind_stage(kind: Symbol) -> StreamStage {
132 Box::new(move |source| filter_data_kind(source, kind.clone()))
133}
134
135pub fn filter_data_shape<F>(source: Stream, matches: F) -> Stream
139where
140 F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
141{
142 nodes::filter_data_shape_with(source, Arc::new(matches))
143}
144
145pub fn filter_data_shape_stage<F>(matches: F) -> StreamStage
147where
148 F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
149{
150 let matches: ShapePredicateFn = Arc::new(matches);
151 Box::new(move |source| nodes::filter_data_shape_with(source, Arc::clone(&matches)))
152}
153
154pub fn tap<F>(source: Stream, f: F) -> Stream
156where
157 F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
158{
159 nodes::tap_with(source, Arc::new(f))
160}
161
162pub fn tap_stage<F>(f: F) -> StreamStage
164where
165 F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
166{
167 let f: TapFn = Arc::new(f);
168 Box::new(move |source| nodes::tap_with(source, Arc::clone(&f)))
169}
170
171pub fn tap_diagnostics<F>(source: Stream, f: F) -> Stream
175where
176 F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
177{
178 nodes::tap_diagnostics_with(source, Arc::new(f))
179}
180
181pub fn tap_diagnostics_stage<F>(f: F) -> StreamStage
183where
184 F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
185{
186 let f: DiagnosticTapFn = Arc::new(f);
187 Box::new(move |source| nodes::tap_diagnostics_with(source, Arc::clone(&f)))
188}
189
190pub fn take(source: Stream, limit: usize) -> Stream {
192 nodes::take_with_limit(source, limit)
193}
194
195pub fn take_stage(limit: usize) -> StreamStage {
197 Box::new(move |source| take(source, limit))
198}
199
200pub fn window_by_count(source: Stream, count: usize) -> Stream {
205 nodes::window_by_count(source, count)
206}
207
208pub fn window_by_count_stage(count: usize) -> StreamStage {
210 Box::new(move |source| window_by_count(source, count))
211}
212
213pub fn stream_window_data_kind() -> Symbol {
215 Symbol::qualified("stream/data", "window")
216}
217
218pub fn merge(left: Stream, right: Stream) -> Stream {
222 nodes::merge_with_key(left, right, Arc::new(|_| None))
223}
224
225pub fn merge_by_clock(left: Stream, right: Stream, clock: Symbol) -> Stream {
230 nodes::merge_with_key(
231 left,
232 right,
233 Arc::new(move |item| {
234 item.ticks()
235 .iter()
236 .find(|tick| tick.clock == clock)
237 .map(|tick| tick.index.clone())
238 }),
239 )
240}
241
242pub struct Fanout {
244 pub left: Stream,
246 pub right: Stream,
248}
249
250pub fn fan(source: Stream) -> Fanout {
255 let (left, right) = nodes::fan_readers(source);
256 Fanout { left, right }
257}
258
259pub struct ClockConvertedStream {
265 stream: Stream,
266 diagnostics: Arc<Mutex<Vec<Diagnostic>>>,
267}
268
269impl ClockConvertedStream {
270 pub fn stream(&self) -> &Stream {
272 &self.stream
273 }
274
275 pub fn into_stream(self) -> Stream {
277 self.stream
278 }
279
280 pub fn next_packet(&self) -> Result<Option<StreamItem>> {
282 self.stream.next_packet()
283 }
284
285 pub fn diagnostics(&self) -> Result<Vec<Diagnostic>> {
287 self.diagnostics
288 .lock()
289 .map_err(|_| Error::PoisonedLock("clock-convert diagnostics"))
290 .map(|diagnostics| diagnostics.clone())
291 }
292}
293
294pub fn clock_convert<F>(source: Stream, convert: F) -> ClockConvertedStream
300where
301 F: Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync + 'static,
302{
303 let diagnostics = Arc::new(Mutex::new(Vec::new()));
304 ClockConvertedStream {
305 stream: nodes::clock_convert_stream(source, Arc::new(convert), Arc::clone(&diagnostics)),
306 diagnostics,
307 }
308}
309
310pub fn run_bang(stream: &Stream, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
315 stream.run_events(cx, run, start_seq)
316}