use derive_new::new;
use std::rc::Rc;
use crate::types::*;
#[derive(new)]
pub(crate) struct FilterStream<T: Element> {
source: Rc<dyn Stream<T>>,
condition: Rc<dyn Stream<bool>>,
#[new(default)]
value: T,
}
#[node(active = [source, condition], output = value: T)]
impl<T: Element> MutableNode for FilterStream<T> {
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
let val = self.source.peek_value();
let ticked = self.condition.peek_value();
if ticked {
self.value = val;
}
Ok(ticked)
}
}
#[cfg(test)]
mod tests {
use crate::graph::*;
use crate::nodes::*;
#[test]
fn passes_values_when_condition_true() {
let filtered = ticker(Duration::from_nanos(100))
.count()
.filter_value(|x| x.is_multiple_of(2))
.collect();
filtered
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(6))
.unwrap();
let values: Vec<u64> = filtered.peek_value().iter().map(|v| v.value).collect();
assert_eq!(values, vec![2, 4, 6]);
}
#[test]
fn suppresses_all_when_condition_always_false() {
let filtered = ticker(Duration::from_nanos(100))
.count()
.filter_value(|_| false)
.collect();
filtered
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(5))
.unwrap();
assert!(filtered.peek_value().is_empty());
}
#[test]
fn condition_stream_controls_emission() {
let source = ticker(Duration::from_nanos(100)).count();
let condition = source.map(|x: u64| x.is_multiple_of(2));
let filtered = source.filter(condition).collect();
filtered
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(6))
.unwrap();
assert!(filtered.peek_value().iter().all(|v| v.value % 2 == 0));
}
}