1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use crate::types::*;
use derive_new::new;
use std::rc::Rc;
/// Only propagates it's source when it's value changes. Used
/// by [distinct](crate::nodes::StreamOperators::distinct).
#[derive(new)]
pub(crate) struct DistinctStream<T: Element> {
source: Rc<dyn Stream<T>>, // the source stream
#[new(default)] // used by derive_new
value: T,
}
#[node(active = [source], output = value: T)]
impl<T: Element + PartialEq> MutableNode for DistinctStream<T> {
// called by Graph when it determines this node needs
// to be cycled
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
let curr = self.source.peek_value();
if self.value == curr {
// value did not change, do not tick
Ok(false)
} else {
// value changed, tick
self.value = curr;
Ok(true)
}
}
}
#[cfg(test)]
mod tests {
use crate::graph::*;
use crate::nodes::*;
#[test]
fn suppresses_repeated_values() {
// count() → map to (x+1)/2 gives: 1,1,2,2,3,3 for ticks 1..6
// DistinctStream starts with T::default() = 0; first value (1) differs, so it emits.
// distinct should emit only when the value changes: 1, 2, 3
let distinct = ticker(Duration::from_nanos(100))
.count()
.map(|x: u64| x.div_ceil(2)) // 1,1,2,2,3,3 for ticks 1..6
.distinct()
.collect();
distinct
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(6))
.unwrap();
let values: Vec<u64> = distinct.peek_value().iter().map(|v| v.value).collect();
assert_eq!(values, vec![1, 2, 3]);
}
#[test]
fn emits_every_tick_when_all_unique() {
// count() produces 1,2,3,4 — all distinct, so every tick should emit
let distinct = ticker(Duration::from_nanos(100))
.count()
.distinct()
.collect();
distinct
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(4))
.unwrap();
assert_eq!(distinct.peek_value().len(), 4);
}
}