pub fn pipe(source: Stream, stages: Vec<StreamStage>) -> StreamExpand description
Applies a sequence of StreamStage transforms left-to-right to source.
ยงExamples
use sim_kernel::{Expr, Symbol};
use sim_lib_stream_core::{
BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia,
StreamMetadata, StreamPacket,
};
use sim_lib_stream_combinators::{identity, pipe, take_stage, Stream};
let metadata = StreamMetadata::new(
Symbol::qualified("stream", "doc"),
StreamMedia::Data,
StreamDirection::Source,
Symbol::qualified("clock", "doc"),
BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
);
let item = || StreamItem::new(StreamPacket::data(
Symbol::qualified("stream/data", "model-event"),
Expr::Nil,
));
let stream = Stream::pull(metadata, vec![item(), item(), item()]);
let out = pipe(stream, vec![identity(), take_stage(2)]);
assert_eq!(out.take_packets(8).unwrap().len(), 2);