mod nodes;
use std::sync::{Arc, Mutex};
use sim_kernel::{Cx, Diagnostic, Error, Event, Expr, Ref, Result, Symbol, Tick};
use sim_lib_stream_core::{StreamDiagnostic, StreamItem, StreamPacket};
use crate::stream::Stream;
pub type StreamStage = Box<dyn Fn(Stream) -> Stream + Send + Sync>;
type MapFn = Arc<dyn Fn(StreamItem) -> Result<StreamItem> + Send + Sync>;
type DataMapFn = Arc<dyn Fn(Expr) -> Result<Expr> + Send + Sync>;
type PredicateFn = Arc<dyn Fn(&StreamItem) -> Result<bool> + Send + Sync>;
type ShapePredicateFn = Arc<dyn Fn(&Expr) -> Result<bool> + Send + Sync>;
type TapFn = Arc<dyn Fn(&StreamItem) -> Result<()> + Send + Sync>;
type DiagnosticTapFn = Arc<dyn Fn(&StreamDiagnostic) -> Result<()> + Send + Sync>;
type MergeKeyFn = Arc<dyn Fn(&StreamItem) -> Option<Ref> + Send + Sync>;
type ClockConvertFn =
Arc<dyn Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync>;
pub fn pipe(source: Stream, stages: Vec<StreamStage>) -> Stream {
stages
.into_iter()
.fold(source, |stream, stage| stage(stream))
}
pub fn identity() -> StreamStage {
Box::new(|stream| stream)
}
pub fn map<F>(source: Stream, f: F) -> Stream
where
F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
{
nodes::map_with(source, Arc::new(f))
}
pub fn map_stage<F>(f: F) -> StreamStage
where
F: Fn(StreamItem) -> Result<StreamItem> + Send + Sync + 'static,
{
let f: MapFn = Arc::new(f);
Box::new(move |source| nodes::map_with(source, Arc::clone(&f)))
}
pub fn map_data_expr<F>(source: Stream, f: F) -> Stream
where
F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
{
nodes::map_data_expr_with(source, Arc::new(f))
}
pub fn map_data_expr_stage<F>(f: F) -> StreamStage
where
F: Fn(Expr) -> Result<Expr> + Send + Sync + 'static,
{
let f: DataMapFn = Arc::new(f);
Box::new(move |source| nodes::map_data_expr_with(source, Arc::clone(&f)))
}
pub fn filter<F>(source: Stream, pred: F) -> Stream
where
F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
{
nodes::filter_with(source, Arc::new(pred))
}
pub fn filter_stage<F>(pred: F) -> StreamStage
where
F: Fn(&StreamItem) -> Result<bool> + Send + Sync + 'static,
{
let pred: PredicateFn = Arc::new(pred);
Box::new(move |source| nodes::filter_with(source, Arc::clone(&pred)))
}
pub fn filter_data_kind(source: Stream, kind: Symbol) -> Stream {
nodes::filter_with(
source,
Arc::new(move |item| match item.packet() {
StreamPacket::Data(packet) => Ok(packet.kind == kind),
_ => Ok(false),
}),
)
}
pub fn filter_data_kind_stage(kind: Symbol) -> StreamStage {
Box::new(move |source| filter_data_kind(source, kind.clone()))
}
pub fn filter_data_shape<F>(source: Stream, matches: F) -> Stream
where
F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
{
nodes::filter_data_shape_with(source, Arc::new(matches))
}
pub fn filter_data_shape_stage<F>(matches: F) -> StreamStage
where
F: Fn(&Expr) -> Result<bool> + Send + Sync + 'static,
{
let matches: ShapePredicateFn = Arc::new(matches);
Box::new(move |source| nodes::filter_data_shape_with(source, Arc::clone(&matches)))
}
pub fn tap<F>(source: Stream, f: F) -> Stream
where
F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
{
nodes::tap_with(source, Arc::new(f))
}
pub fn tap_stage<F>(f: F) -> StreamStage
where
F: Fn(&StreamItem) -> Result<()> + Send + Sync + 'static,
{
let f: TapFn = Arc::new(f);
Box::new(move |source| nodes::tap_with(source, Arc::clone(&f)))
}
pub fn tap_diagnostics<F>(source: Stream, f: F) -> Stream
where
F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
{
nodes::tap_diagnostics_with(source, Arc::new(f))
}
pub fn tap_diagnostics_stage<F>(f: F) -> StreamStage
where
F: Fn(&StreamDiagnostic) -> Result<()> + Send + Sync + 'static,
{
let f: DiagnosticTapFn = Arc::new(f);
Box::new(move |source| nodes::tap_diagnostics_with(source, Arc::clone(&f)))
}
pub fn take(source: Stream, limit: usize) -> Stream {
nodes::take_with_limit(source, limit)
}
pub fn take_stage(limit: usize) -> StreamStage {
Box::new(move |source| take(source, limit))
}
pub fn window_by_count(source: Stream, count: usize) -> Stream {
nodes::window_by_count(source, count)
}
pub fn window_by_count_stage(count: usize) -> StreamStage {
Box::new(move |source| window_by_count(source, count))
}
pub fn stream_window_data_kind() -> Symbol {
Symbol::qualified("stream/data", "window")
}
pub fn merge(left: Stream, right: Stream) -> Stream {
nodes::merge_with_key(left, right, Arc::new(|_| None))
}
pub fn merge_by_clock(left: Stream, right: Stream, clock: Symbol) -> Stream {
nodes::merge_with_key(
left,
right,
Arc::new(move |item| {
item.ticks()
.iter()
.find(|tick| tick.clock == clock)
.map(|tick| tick.index.clone())
}),
)
}
pub struct Fanout {
pub left: Stream,
pub right: Stream,
}
pub fn fan(source: Stream) -> Fanout {
let (left, right) = nodes::fan_readers(source);
Fanout { left, right }
}
pub struct ClockConvertedStream {
stream: Stream,
diagnostics: Arc<Mutex<Vec<Diagnostic>>>,
}
impl ClockConvertedStream {
pub fn stream(&self) -> &Stream {
&self.stream
}
pub fn into_stream(self) -> Stream {
self.stream
}
pub fn next_packet(&self) -> Result<Option<StreamItem>> {
self.stream.next_packet()
}
pub fn diagnostics(&self) -> Result<Vec<Diagnostic>> {
self.diagnostics
.lock()
.map_err(|_| Error::PoisonedLock("clock-convert diagnostics"))
.map(|diagnostics| diagnostics.clone())
}
}
pub fn clock_convert<F>(source: Stream, convert: F) -> ClockConvertedStream
where
F: Fn(&StreamItem) -> Result<(Vec<Tick>, Vec<Diagnostic>)> + Send + Sync + 'static,
{
let diagnostics = Arc::new(Mutex::new(Vec::new()));
ClockConvertedStream {
stream: nodes::clock_convert_stream(source, Arc::new(convert), Arc::clone(&diagnostics)),
diagnostics,
}
}
pub fn run_bang(stream: &Stream, cx: &mut Cx, run: Ref, start_seq: u64) -> Result<Vec<Event>> {
stream.run_events(cx, run, start_seq)
}