use std::cmp::Eq;
use std::hash::Hash;
use crate::queue::{TimeQueue, ValueAt};
use crate::types::*;
use derive_new::new;
#[derive(new)]
pub struct CallBackStream<T: Element + Hash + Eq> {
#[new(default)]
value: T,
#[new(default)]
queue: TimeQueue<T>,
}
#[node(output = value: T)]
impl<T: Element + Hash + Eq> MutableNode for CallBackStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
let mut ticked = false;
while self.queue.pending(state.time()) {
self.value = self.queue.pop();
ticked = true;
}
if !self.queue.is_empty() {
let callback_time = self.queue.next_time();
state.add_callback(callback_time);
}
Ok(ticked)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
if !self.queue.is_empty() {
let time = self.queue.next_time();
state.add_callback(time);
}
Ok(())
}
}
impl<T: Element + Hash + Eq> CallBackStream<T> {
pub fn push(&mut self, value_at: ValueAt<T>) {
self.queue.push(value_at.value, value_at.time)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::*;
use crate::nodes::*;
use std::cell::RefCell;
use std::rc::Rc;
#[test]
fn callback_stream_default_value_before_run() {
let src: Rc<RefCell<CallBackStream<u64>>> = Rc::new(RefCell::new(CallBackStream::new()));
assert_eq!(src.peek_value(), 0u64);
}
#[test]
fn callback_stream_emits_pushed_values_in_order() {
let src: Rc<RefCell<CallBackStream<u64>>> = Rc::new(RefCell::new(CallBackStream::new()));
src.borrow_mut().push(ValueAt::new(10, NanoTime::new(100)));
src.borrow_mut().push(ValueAt::new(20, NanoTime::new(200)));
src.borrow_mut().push(ValueAt::new(30, NanoTime::new(300)));
let collected = src.clone().as_stream().collect();
collected
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let vals: Vec<u64> = collected.peek_value().iter().map(|v| v.value).collect();
assert_eq!(vals, vec![10, 20, 30]);
}
#[test]
fn callback_stream_emits_correct_timestamps() {
let src: Rc<RefCell<CallBackStream<u64>>> = Rc::new(RefCell::new(CallBackStream::new()));
src.borrow_mut().push(ValueAt::new(1, NanoTime::new(50)));
src.borrow_mut().push(ValueAt::new(2, NanoTime::new(150)));
let collected = src.clone().as_stream().collect();
collected
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let times: Vec<NanoTime> = collected.peek_value().iter().map(|v| v.time).collect();
assert_eq!(times, vec![NanoTime::new(50), NanoTime::new(150)]);
}
}