mod always;
#[cfg(feature = "async")]
mod async_io;
mod average;
mod bimap;
mod buffer;
mod callback;
#[cfg(feature = "async")]
mod channel;
mod combine;
mod constant;
mod consumer;
mod delay;
mod delay_with_reset;
mod demux;
mod difference;
mod distinct;
#[cfg(feature = "dynamic-graph-beta")]
pub mod dynamic_group;
mod feedback;
mod filter;
mod finally;
mod fold;
#[cfg(feature = "async")]
mod graph_node;
mod graph_state;
mod inspect;
mod iterator_stream;
mod limit;
mod map;
mod map_filter;
mod merge;
mod never;
mod node_flow;
mod print;
mod producer;
#[cfg(feature = "zmq")]
mod receiver;
mod sample;
mod throttle;
mod tick;
mod timed;
mod trimap;
mod try_bimap;
mod try_map;
mod try_trimap;
mod window;
mod with_time;
pub use always::*;
#[cfg(feature = "async")]
pub use async_io::*;
pub use callback::CallBackStream;
pub use channel::ChannelReceiverStream;
pub use demux::*;
#[cfg(feature = "dynamic-graph-beta")]
pub use dynamic_group::*;
use feedback::FeedbackSendStream;
pub use feedback::{FeedbackSink, feedback, feedback_node};
#[cfg(feature = "async")]
pub use graph_node::*;
pub use iterator_stream::{IteratorStream, SimpleIteratorStream};
pub use map_filter::MapFilterStream;
pub use never::*;
use average::*;
use bimap::*;
use buffer::BufferStream;
use constant::*;
use consumer::*;
use delay::*;
use delay_with_reset::*;
use difference::*;
use distinct::*;
use filter::*;
use finally::*;
use fold::*;
use graph_state::*;
use inspect::*;
use limit::*;
use map::*;
use merge::*;
use node_flow::*;
use print::*;
use producer::*;
use sample::*;
use throttle::*;
use tick::*;
use timed::*;
use trimap::*;
use try_bimap::*;
use try_map::*;
use try_trimap::*;
use window::WindowStream;
use with_time::WithTimeStream;
use crate::graph::*;
use crate::queue::ValueAt;
use crate::types::*;
#[cfg(feature = "zmq")]
pub(crate) use receiver::*;
use log::Level;
#[cfg(not(feature = "tracing"))]
use log::log;
use num_traits::ToPrimitive;
use std::cmp::Eq;
#[cfg(feature = "async")]
use std::future::Future;
use std::hash::Hash;
use std::ops::Add;
#[cfg(feature = "async")]
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
#[must_use]
pub fn add<T>(upstream1: &Rc<dyn Stream<T>>, upstream2: &Rc<dyn Stream<T>>) -> Rc<dyn Stream<T>>
where
T: Element + Add<Output = T>,
{
let f = |a: T, b: T| (a + b) as T;
BiMapStream::new(
Dep::Active(upstream1.clone()),
Dep::Active(upstream2.clone()),
Box::new(f),
)
.into_stream()
}
#[must_use]
pub fn bimap<IN1: Element, IN2: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
func: impl Fn(IN1, IN2) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
BiMapStream::new(upstream1, upstream2, Box::new(func)).into_stream()
}
#[must_use]
pub fn trimap<IN1: Element, IN2: Element, IN3: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
upstream3: Dep<IN3>,
func: impl Fn(IN1, IN2, IN3) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
TriMapStream::new(upstream1, upstream2, upstream3, Box::new(func)).into_stream()
}
#[must_use]
pub fn try_bimap<IN1: Element, IN2: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
func: impl Fn(IN1, IN2) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryBiMapStream::new(upstream1, upstream2, Box::new(func)).into_stream()
}
#[must_use]
pub fn try_trimap<IN1: Element, IN2: Element, IN3: Element, OUT: Element>(
upstream1: Dep<IN1>,
upstream2: Dep<IN2>,
upstream3: Dep<IN3>,
func: impl Fn(IN1, IN2, IN3) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryTriMapStream::new(upstream1, upstream2, upstream3, Box::new(func)).into_stream()
}
#[must_use]
pub fn merge<T>(sources: Vec<Rc<dyn Stream<T>>>) -> Rc<dyn Stream<T>>
where
T: Element,
{
MergeStream::new(sources).into_stream()
}
#[must_use]
pub fn constant<T: Element>(value: T) -> Rc<dyn Stream<T>> {
ConstantStream::new(value).into_stream()
}
#[must_use]
pub fn combine<T>(streams: Vec<Rc<dyn Stream<T>>>) -> Rc<dyn Stream<Burst<T>>>
where
T: Element + 'static,
{
combine::combine(streams)
}
#[must_use]
pub fn ticker(period: Duration) -> Rc<dyn Node> {
TickNode::new(NanoTime::new(period.as_nanos() as u64)).into_node()
}
pub trait NodeOperators {
#[must_use]
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>>;
#[must_use]
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>>;
#[must_use]
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>>;
#[must_use]
fn produce<T: Element>(self: &Rc<Self>, func: impl Fn() -> T + 'static) -> Rc<dyn Stream<T>>;
fn run(self: &Rc<Self>, run_mode: RunMode, run_to: RunFor) -> anyhow::Result<()>;
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph;
}
impl NodeOperators for dyn Node {
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>> {
constant(1).sample(self.clone()).sum()
}
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
let f = Box::new(|state: &mut GraphState| state.time());
GraphStateStream::new(self.clone(), f).into_stream()
}
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
let f = Box::new(|state: &mut GraphState| state.elapsed());
GraphStateStream::new(self.clone(), f).into_stream()
}
fn produce<T: Element>(self: &Rc<Self>, func: impl Fn() -> T + 'static) -> Rc<dyn Stream<T>> {
ProducerStream::new(self.clone(), Box::new(func)).into_stream()
}
fn run(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> anyhow::Result<()> {
Graph::new(vec![self.clone()], run_mode, run_for).run()
}
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph {
Graph::new(vec![self.clone()], run_mode, run_for)
}
}
impl<T> NodeOperators for dyn Stream<T> {
fn count(self: &Rc<Self>) -> Rc<dyn Stream<u64>> {
self.clone().as_node().count()
}
fn ticked_at(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
self.clone().as_node().ticked_at()
}
fn ticked_at_elapsed(self: &Rc<Self>) -> Rc<dyn Stream<NanoTime>> {
self.clone().as_node().ticked_at_elapsed()
}
fn produce<OUT: Element>(
self: &Rc<Self>,
func: impl Fn() -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
self.clone().as_node().produce(func)
}
fn run(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> anyhow::Result<()> {
self.clone().as_node().run(run_mode, run_for)
}
fn into_graph(self: &Rc<Self>, run_mode: RunMode, run_for: RunFor) -> Graph {
self.clone().as_node().into_graph(run_mode, run_for)
}
}
pub trait NodeFlowOperators {
#[must_use]
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Node>;
#[must_use]
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Node>;
#[must_use]
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Node>;
#[must_use]
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Node>;
#[must_use]
fn feedback(self: &Rc<Self>, sink: FeedbackSink<()>) -> Rc<dyn Node>;
}
impl NodeFlowOperators for dyn Node {
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Node> {
ThrottleNode::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_node()
}
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Node> {
DelayNode::new(self.clone(), NanoTime::new(delay.as_nanos() as u64)).into_node()
}
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Node> {
LimitNode::new(self.clone(), limit).into_node()
}
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Node> {
FilterNode::new(self.clone(), condition).into_node()
}
fn feedback(self: &Rc<Self>, sink: FeedbackSink<()>) -> Rc<dyn Node> {
FeedbackSendNode::new(self.clone(), sink).into_node()
}
}
pub trait StreamOperators<T: Element> {
#[must_use]
fn accumulate(self: &Rc<Self>) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn average(self: &Rc<Self>) -> Rc<dyn Stream<f64>>
where
T: ToPrimitive;
#[must_use]
fn buffer(self: &Rc<Self>, capacity: usize) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn window(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<Vec<T>>>;
#[must_use]
fn collect(self: &Rc<Self>) -> Rc<dyn Stream<Vec<ValueAt<T>>>>;
#[must_use]
fn collapse<OUT>(self: &Rc<Self>) -> Rc<dyn Stream<OUT>>
where
T: std::iter::IntoIterator<Item = OUT>,
OUT: Element;
#[cfg(feature = "async")]
#[must_use]
fn consume_async<FUT>(
self: &Rc<Self>,
func: Box<dyn FnOnce(RunParams, Pin<Box<dyn FutStream<T>>>) -> FUT + Send>,
) -> Rc<dyn Node>
where
T: Element + Send,
FUT: Future<Output = anyhow::Result<()>> + Send + 'static;
#[must_use]
fn finally<F: FnOnce(T, &GraphState) -> anyhow::Result<()> + 'static>(
self: &Rc<Self>,
func: F,
) -> Rc<dyn Node>;
#[must_use]
fn for_each(self: &Rc<Self>, func: impl Fn(T, NanoTime) + 'static) -> Rc<dyn Node>;
#[must_use]
fn feedback(self: &Rc<Self>, sink: FeedbackSink<T>) -> Rc<dyn Stream<T>>
where
T: Hash + Eq;
#[must_use]
fn try_for_each(
self: &Rc<Self>,
func: impl Fn(T, NanoTime) -> anyhow::Result<()> + 'static,
) -> Rc<dyn Node>;
#[must_use]
fn fold<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(&mut OUT, T) + 'static,
) -> Rc<dyn Stream<OUT>>;
#[must_use]
fn difference(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Sub<Output = T>;
#[must_use]
fn delay(self: &Rc<Self>, delay: Duration) -> Rc<dyn Stream<T>>
where
T: Hash + Eq;
#[must_use]
fn delay_with_reset(
self: &Rc<Self>,
delay: Duration,
trigger: Rc<dyn Node>,
) -> Rc<dyn Stream<T>>
where
T: Hash + Eq;
fn demux<K, F>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<T>>>, Overflow<T>)
where
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&T) -> (K, DemuxEvent) + 'static;
fn demux_it<K, F, U>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static;
fn demux_it_with_map<K, F, U>(
self: &Rc<Self>,
map: DemuxMap<K>,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static;
#[must_use]
fn distinct(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: PartialEq;
#[must_use]
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Stream<T>>;
#[must_use]
fn filter_value(self: &Rc<Self>, predicate: impl Fn(&T) -> bool + 'static)
-> Rc<dyn Stream<T>>;
#[must_use]
fn inspect(self: &Rc<Self>, func: impl Fn(&T) + 'static) -> Rc<dyn Stream<T>>;
#[must_use]
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Stream<T>>;
#[must_use]
fn logged(self: &Rc<Self>, label: &str, level: Level) -> Rc<dyn Stream<T>>;
#[must_use]
fn map<OUT: Element>(self: &Rc<Self>, func: impl Fn(T) -> OUT + 'static)
-> Rc<dyn Stream<OUT>>;
#[must_use]
fn try_map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>>;
#[cfg(feature = "async")]
#[must_use]
fn mapper<FUNC, OUT>(self: &Rc<Self>, func: FUNC) -> Rc<dyn Stream<Burst<OUT>>>
where
T: Element + Send,
OUT: Element + Send + Hash + Eq,
FUNC: FnOnce(Rc<dyn Stream<Burst<T>>>) -> Rc<dyn Stream<OUT>> + Send + 'static;
#[must_use]
fn not(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Not<Output = T>;
#[must_use]
fn reduce(self: &Rc<Self>, func: impl Fn(T, T) -> T + 'static) -> Rc<dyn Stream<T>>;
#[must_use]
fn sample(self: &Rc<Self>, trigger: Rc<dyn Node>) -> Rc<dyn Stream<T>>;
#[must_use]
fn print(self: &Rc<Self>) -> Rc<dyn Stream<T>>;
#[must_use]
fn sum(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: Add<T, Output = T>;
#[must_use]
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<T>>;
#[must_use]
fn with_time(self: &Rc<Self>) -> Rc<dyn Stream<(NanoTime, T)>>;
#[must_use]
fn timed(self: &Rc<Self>) -> Rc<dyn Stream<T>>;
}
impl<T> StreamOperators<T> for dyn Stream<T>
where
T: Element + 'static,
{
fn accumulate(self: &Rc<Self>) -> Rc<dyn Stream<Vec<T>>> {
self.fold(|acc: &mut Vec<T>, value| {
acc.push(value);
})
}
fn average(self: &Rc<Self>) -> Rc<dyn Stream<f64>>
where
T: ToPrimitive,
{
AverageStream::new(self.clone()).into_stream()
}
fn buffer(self: &Rc<Self>, capacity: usize) -> Rc<dyn Stream<Vec<T>>> {
BufferStream::new(self.clone(), capacity).into_stream()
}
fn window(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<Vec<T>>> {
WindowStream::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_stream()
}
fn collect(self: &Rc<Self>) -> Rc<dyn Stream<Vec<ValueAt<T>>>> {
bimap(
Dep::Active(self.clone()),
Dep::Active(self.clone().as_node().ticked_at()),
ValueAt::new,
)
.fold(|acc: &mut Vec<ValueAt<T>>, value| {
acc.push(value);
})
}
fn collapse<OUT>(self: &Rc<Self>) -> Rc<dyn Stream<OUT>>
where
T: std::iter::IntoIterator<Item = OUT>,
OUT: Element,
{
let f = |x: T| match x.into_iter().last() {
Some(x) => (x, true),
None => (Default::default(), false),
};
MapFilterStream::new(self.clone(), Box::new(f)).into_stream()
}
#[cfg(feature = "async")]
fn consume_async<FUT>(
self: &Rc<Self>,
func: Box<dyn FnOnce(RunParams, Pin<Box<dyn FutStream<T>>>) -> FUT + Send>,
) -> Rc<dyn Node>
where
T: Element + Send,
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
{
AsyncConsumerNode::new(self.clone(), func).into_node()
}
fn demux<K, F>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<T>>>, Overflow<T>)
where
T: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&T) -> (K, DemuxEvent) + 'static,
{
demux::demux(self.clone(), demux::DemuxMap::new(capacity), func)
}
fn demux_it<K, F, U>(
self: &Rc<Self>,
capacity: usize,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static,
{
self.demux_it_with_map(DemuxMap::new(capacity), func)
}
fn demux_it_with_map<K, F, U>(
self: &Rc<Self>,
map: DemuxMap<K>,
func: F,
) -> (Vec<Rc<dyn Stream<Burst<U>>>>, Overflow<Burst<U>>)
where
T: IntoIterator<Item = U>,
U: Element,
K: Hash + Eq + PartialEq + std::fmt::Debug + 'static,
F: Fn(&U) -> (K, DemuxEvent) + 'static,
{
demux_it(self.clone(), map, func)
}
fn for_each(self: &Rc<Self>, func: impl Fn(T, NanoTime) + 'static) -> Rc<dyn Node> {
ConsumerNode::new(self.clone(), Box::new(func)).into_node()
}
fn feedback(self: &Rc<Self>, sink: FeedbackSink<T>) -> Rc<dyn Stream<T>>
where
T: Hash + Eq,
{
FeedbackSendStream::new(self.clone(), sink).into_stream()
}
fn try_for_each(
self: &Rc<Self>,
func: impl Fn(T, NanoTime) -> anyhow::Result<()> + 'static,
) -> Rc<dyn Node> {
TryConsumerNode::new(self.clone(), Box::new(func)).into_node()
}
fn delay(self: &Rc<Self>, duration: Duration) -> Rc<dyn Stream<T>>
where
T: Hash + Eq,
{
DelayStream::new(self.clone(), NanoTime::new(duration.as_nanos() as u64)).into_stream()
}
fn delay_with_reset(
self: &Rc<Self>,
delay: Duration,
trigger: Rc<dyn Node>,
) -> Rc<dyn Stream<T>>
where
T: Hash + Eq,
{
DelayWithResetStream::new(
self.clone(),
trigger,
NanoTime::new(delay.as_nanos() as u64),
)
.into_stream()
}
fn difference(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Sub<Output = T>,
{
DifferenceStream::new(self.clone()).into_stream()
}
fn distinct(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: PartialEq,
{
DistinctStream::new(self.clone()).into_stream()
}
fn filter(self: &Rc<Self>, condition: Rc<dyn Stream<bool>>) -> Rc<dyn Stream<T>> {
FilterStream::new(self.clone(), condition).into_stream()
}
fn filter_value(
self: &Rc<Self>,
predicate: impl Fn(&T) -> bool + 'static,
) -> Rc<dyn Stream<T>> {
let condition = self.clone().map(move |val| predicate(&val));
FilterStream::new(self.clone(), condition).into_stream()
}
fn finally<F: FnOnce(T, &GraphState) -> anyhow::Result<()> + 'static>(
self: &Rc<Self>,
func: F,
) -> Rc<dyn Node> {
FinallyNode::new(self.clone(), Some(func)).into_node()
}
fn fold<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(&mut OUT, T) + 'static,
) -> Rc<dyn Stream<OUT>> {
FoldStream::new(self.clone(), Box::new(func)).into_stream()
}
fn inspect(self: &Rc<Self>, func: impl Fn(&T) + 'static) -> Rc<dyn Stream<T>> {
InspectStream::new(self.clone(), Box::new(func)).into_stream()
}
fn limit(self: &Rc<Self>, limit: u32) -> Rc<dyn Stream<T>> {
LimitStream::new(self.clone(), limit).into_stream()
}
fn logged(self: &Rc<Self>, label: &str, level: Level) -> Rc<dyn Stream<T>> {
#[cfg(not(feature = "tracing"))]
if !log::log_enabled!(level) {
return self.clone();
}
#[cfg(feature = "tracing")]
if !tracing_log_enabled!(level) {
return self.clone();
}
let lbl = label.to_string();
let func = move |value: T, time: NanoTime| {
#[cfg(not(feature = "tracing"))]
log!(target: "wingfoil", level, "{} {} {:?}", time.pretty(), lbl, value);
#[cfg(feature = "tracing")]
tracing_log!(level; time, lbl, value);
value
};
bimap(
Dep::Active(self.clone()),
Dep::Active(self.clone().as_node().ticked_at_elapsed()),
func,
)
}
fn map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> OUT + 'static,
) -> Rc<dyn Stream<OUT>> {
MapStream::new(self.clone(), Box::new(func)).into_stream()
}
fn try_map<OUT: Element>(
self: &Rc<Self>,
func: impl Fn(T) -> anyhow::Result<OUT> + 'static,
) -> Rc<dyn Stream<OUT>> {
TryMapStream::new(self.clone(), Box::new(func)).into_stream()
}
#[cfg(feature = "async")]
fn mapper<FUNC, OUT>(self: &Rc<Self>, func: FUNC) -> Rc<dyn Stream<Burst<OUT>>>
where
T: Element + Send,
OUT: Element + Send + Hash + Eq,
FUNC: FnOnce(Rc<dyn Stream<Burst<T>>>) -> Rc<dyn Stream<OUT>> + Send + 'static,
{
GraphMapStream::new(self.clone(), func).into_stream()
}
fn not(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: std::ops::Not<Output = T>,
{
self.map(|value| !value)
}
fn print(self: &Rc<Self>) -> Rc<dyn Stream<T>> {
PrintStream::new(self.clone()).into_stream()
}
fn reduce(self: &Rc<Self>, func: impl Fn(T, T) -> T + 'static) -> Rc<dyn Stream<T>> {
let f = move |acc: &mut T, val: T| {
*acc = func((*acc).clone(), val);
};
self.fold(f)
}
fn sample(self: &Rc<Self>, trigger: Rc<dyn Node>) -> Rc<dyn Stream<T>> {
SampleStream::new(self.clone(), trigger).into_stream()
}
fn sum(self: &Rc<Self>) -> Rc<dyn Stream<T>>
where
T: Add<T, Output = T>,
{
self.reduce(|acc, val| acc + val)
}
fn throttle(self: &Rc<Self>, interval: Duration) -> Rc<dyn Stream<T>> {
ThrottleStream::new(self.clone(), NanoTime::new(interval.as_nanos() as u64)).into_stream()
}
fn with_time(self: &Rc<Self>) -> Rc<dyn Stream<(NanoTime, T)>> {
WithTimeStream::new(self.clone()).into_stream()
}
fn timed(self: &Rc<Self>) -> Rc<dyn Stream<T>> {
TimedStream::new(self.clone()).into_stream()
}
}
#[doc(hidden)]
pub trait TupleStreamOperators<A, B>
where
A: Element + 'static,
B: Element + 'static,
{
fn split(self: &Rc<Self>) -> (Rc<dyn Stream<A>>, Rc<dyn Stream<B>>);
}
impl<A, B> TupleStreamOperators<A, B> for dyn Stream<(A, B)>
where
A: Element + 'static,
B: Element + 'static,
{
fn split(self: &Rc<Self>) -> (Rc<dyn Stream<A>>, Rc<dyn Stream<B>>) {
let a = self.map(|tuple: (A, B)| tuple.0);
let b = self.map(|tuple: (A, B)| tuple.1);
(a, b)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::queue::ValueAt;
use std::cell::RefCell;
fn make_source(value: u64, time: u64) -> Rc<dyn Stream<u64>> {
let src = Rc::new(RefCell::new(CallBackStream::<u64>::new()));
src.borrow_mut()
.push(ValueAt::new(value, NanoTime::new(time)));
src.clone().as_stream()
}
#[test]
fn node_count_via_dyn_node() {
let src: Rc<dyn Stream<u64>> = make_source(42, 100);
let node: Rc<dyn Node> = src.clone().as_node();
let cnt = node.count();
cnt.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(cnt.peek_value(), 1);
}
#[test]
fn node_ticked_at_via_dyn_node() {
let src: Rc<dyn Stream<u64>> = make_source(42, 100);
let node: Rc<dyn Node> = src.clone().as_node();
let ta = node.ticked_at();
ta.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(ta.peek_value(), NanoTime::new(100));
}
#[test]
fn node_ticked_at_elapsed_via_dyn_node() {
let src: Rc<dyn Stream<u64>> = make_source(42, 100);
let node: Rc<dyn Node> = src.clone().as_node();
let te = node.ticked_at_elapsed();
te.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(te.peek_value(), NanoTime::new(100));
}
#[test]
fn node_produce_via_dyn_node() {
let src: Rc<dyn Stream<u64>> = make_source(42, 100);
let node: Rc<dyn Node> = src.clone().as_node();
let prod = node.produce(|| 99u64);
prod.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(prod.peek_value(), 99u64);
}
#[test]
fn node_into_graph_via_dyn_node() {
let src: Rc<dyn Stream<u64>> = make_source(42, 100);
let node: Rc<dyn Node> = src.clone().as_node();
let result = node
.into_graph(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.run();
assert!(result.is_ok());
}
#[test]
fn stream_count_via_dyn_stream() {
let src: Rc<dyn Stream<u64>> = make_source(7, 50);
let cnt = src.count();
cnt.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(cnt.peek_value(), 1);
}
#[test]
fn stream_ticked_at_via_dyn_stream() {
let src: Rc<dyn Stream<u64>> = make_source(7, 50);
let ta = src.ticked_at();
ta.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(ta.peek_value(), NanoTime::new(50));
}
#[test]
fn stream_ticked_at_elapsed_via_dyn_stream() {
let src: Rc<dyn Stream<u64>> = make_source(7, 50);
let te = src.ticked_at_elapsed();
te.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(te.peek_value(), NanoTime::new(50));
}
#[test]
fn stream_produce_via_dyn_stream() {
let src: Rc<dyn Stream<u64>> = make_source(7, 50);
let prod = src.produce(|| 55u64);
prod.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert_eq!(prod.peek_value(), 55u64);
}
#[test]
fn stream_into_graph_via_dyn_stream() {
let src: Rc<dyn Stream<u64>> = make_source(7, 50);
let result = src
.into_graph(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.run();
assert!(result.is_ok());
}
#[test]
fn demux_it_routes_items_from_iterable_stream() {
let cb = Rc::new(RefCell::new(CallBackStream::<Vec<u64>>::new()));
cb.borrow_mut()
.push(ValueAt::new(vec![1u64, 2, 3, 4], NanoTime::new(10)));
let src: Rc<dyn Stream<Vec<u64>>> = cb.clone().as_stream();
let (streams, overflow) = src.demux_it::<u64, _, u64>(2, |v| (*v % 2, DemuxEvent::None));
let collected: Vec<_> = streams.iter().map(|s| s.collect()).collect();
let overflow_node = overflow.stream().for_each(|_, _| {});
let mut nodes: Vec<Rc<dyn Node>> = collected.iter().map(|c| c.clone().as_node()).collect();
nodes.push(overflow_node);
Graph::new(
nodes,
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Forever,
)
.run()
.unwrap();
}
#[test]
fn not_inverts_bool_stream() {
let cb = Rc::new(RefCell::new(CallBackStream::<bool>::new()));
cb.borrow_mut().push(ValueAt::new(true, NanoTime::new(10)));
let src: Rc<dyn Stream<bool>> = cb.clone().as_stream();
let inverted = src.not();
inverted
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
assert!(!inverted.peek_value()); }
#[test]
fn collapse_skips_empty_iterator() {
let cb = Rc::new(RefCell::new(CallBackStream::<Vec<u64>>::new()));
cb.borrow_mut()
.push(ValueAt::new(vec![1u64, 2], NanoTime::new(10)));
cb.borrow_mut()
.push(ValueAt::new(vec![], NanoTime::new(20))); let collapsed = cb.clone().as_stream().collapse::<u64>();
let collected = collapsed.collect();
collected
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let ticks = collected.peek_value();
assert_eq!(ticks.len(), 1);
assert_eq!(ticks[0].value, 2u64); }
#[test]
fn split_decomposes_tuple_stream() {
let cb = Rc::new(RefCell::new(CallBackStream::<(u64, u64)>::new()));
cb.borrow_mut()
.push(ValueAt::new((10u64, 20u64), NanoTime::new(5)));
let stream: Rc<dyn Stream<(u64, u64)>> = cb.clone().as_stream();
let (a, b) = stream.split();
let ca = a.collect();
let cb2 = b.collect();
Graph::new(
vec![ca.clone().as_node(), cb2.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Forever,
)
.run()
.unwrap();
assert_eq!(ca.peek_value()[0].value, 10u64);
assert_eq!(cb2.peek_value()[0].value, 20u64);
}
}