use std::cmp::Eq;
use std::hash::Hash;
use std::rc::Rc;
use crate::queue::TimeQueue;
use crate::types::*;
use derive_new::new;
#[derive(new)]
pub(crate) struct DelayStream<T: Element + Hash + Eq> {
#[new(default)]
value: T,
#[new(default)]
queue: TimeQueue<T>,
#[new(default)]
initialized: bool,
upstream: Rc<dyn Stream<T>>,
delay: NanoTime,
}
impl<T: Element + Hash + Eq> MutableNode for DelayStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if self.delay == NanoTime::ZERO {
self.value = self.upstream.peek_value();
Ok(true)
} else {
let current_time = state.time();
let mut ticked = false;
if state.ticked(self.upstream.clone().as_node()) {
if !self.initialized {
self.value = self.upstream.peek_value();
self.initialized = true;
}
let next_time = current_time + self.delay;
state.add_callback(next_time);
self.queue.push(self.upstream.peek_value(), next_time)
}
while self.queue.pending(current_time) {
self.value = self.queue.pop();
ticked = true;
}
Ok(ticked)
}
}
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.upstream.clone().as_node()], vec![])
}
}
impl<T: Element + Hash + Eq> StreamPeekRef<T> for DelayStream<T> {
fn peek_ref(&self) -> &T {
&self.value
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::*;
use crate::nodes::*;
use std::time::Duration;
#[test]
fn delay_works() {
let source = ticker(Duration::from_nanos(100))
.count()
.logged("source", log::Level::Info);
let delayed = source
.delay(Duration::from_nanos(10))
.logged("delayed", log::Level::Info);
let captured_source = source.collect();
let captured_delayed = delayed.collect();
let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
let run_for = RunFor::Cycles(6);
let mut graph = Graph::new(
vec![
captured_source.clone().as_node(),
captured_delayed.clone().as_node(),
],
run_mode,
run_for,
);
let expected_source = vec![
ValueAt {
value: 1,
time: NanoTime::new(0),
},
ValueAt {
value: 2,
time: NanoTime::new(100),
},
ValueAt {
value: 3,
time: NanoTime::new(200),
},
];
let expected_delayed = vec![
ValueAt {
value: 1,
time: NanoTime::new(10),
},
ValueAt {
value: 2,
time: NanoTime::new(110),
},
ValueAt {
value: 3,
time: NanoTime::new(210),
},
];
graph.run().unwrap();
assert_eq!(expected_source, captured_source.peek_value());
assert_eq!(expected_delayed, captured_delayed.peek_value());
}
#[test]
fn long_delay_works() {
let delayed = ticker(Duration::from_nanos(10))
.count()
.delay(Duration::from_nanos(100))
.collect();
delayed
.run(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(Duration::from_nanos(120)),
)
.unwrap();
let expected = vec![
ValueAt {
value: 1,
time: NanoTime::new(100),
},
ValueAt {
value: 2,
time: NanoTime::new(110),
},
ValueAt {
value: 3,
time: NanoTime::new(120),
},
ValueAt {
value: 4,
time: NanoTime::new(130),
},
];
assert_eq!(expected, delayed.peek_value());
}
#[test]
fn delay_initializes_to_first_value() {
let source = ticker(Duration::from_secs(1)).count().map(|x| x as i64 + 4); let delayed = source.delay(Duration::from_secs(5));
let diff = bimap(Dep::Active(source), Dep::Passive(delayed), |a, b| a - b);
diff.accumulate()
.finally(|res, _| {
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 5, 5, 5, 5]);
Ok(())
})
.run(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(Duration::from_secs(8)),
)
.unwrap();
}
#[test]
fn zero_delay_works() {
let delayed = ticker(Duration::from_nanos(10))
.count()
.delay(Duration::from_nanos(0))
.collect();
delayed
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(4))
.unwrap();
let expected = vec![
ValueAt {
value: 1,
time: NanoTime::new(0),
},
ValueAt {
value: 2,
time: NanoTime::new(10),
},
ValueAt {
value: 3,
time: NanoTime::new(20),
},
ValueAt {
value: 4,
time: NanoTime::new(30),
},
];
assert_eq!(expected, delayed.peek_value());
}
}