use crate::types::*;
use derive_new::new;
use std::rc::Rc;
#[derive(new)]
pub struct MergeStream<T: Element> {
upstreams: Vec<Rc<dyn Stream<T>>>,
#[new(default)]
value: T,
}
#[node(active = [upstreams], output = value: T)]
impl<T: Element> MutableNode for MergeStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
for stream in self.upstreams.iter() {
if state.ticked(stream.clone().as_node()) {
self.value = stream.peek_value();
break;
}
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use crate::graph::*;
use crate::nodes::*;
#[test]
fn merge_emits_from_both_streams() {
let a = ticker(Duration::from_nanos(100)).count();
let b = ticker(Duration::from_nanos(200))
.count()
.map(|x: u64| x * 100);
let merged = merge(vec![a, b]).collect();
merged
.run(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(Duration::from_nanos(400)),
)
.unwrap();
let times: Vec<NanoTime> = merged.peek_value().iter().map(|v| v.time).collect();
assert_eq!(
times,
vec![
NanoTime::new(0),
NanoTime::new(100),
NanoTime::new(200),
NanoTime::new(300),
NanoTime::new(400),
NanoTime::new(500),
]
);
}
#[test]
fn merge_last_ticked_value_wins() {
let src = ticker(Duration::from_nanos(100)).count();
let merged = merge(vec![src]).collect();
merged
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(3))
.unwrap();
let values: Vec<u64> = merged.peek_value().iter().map(|v| v.value).collect();
assert_eq!(values, vec![1, 2, 3]);
}
}