use std::cell::{Cell, RefCell};
use std::cmp::Eq;
use std::hash::Hash;
use std::rc::Rc;
use crate::queue::TimeQueue;
use crate::types::*;
pub(crate) struct FeedbackStream<T: Element + Hash + Eq> {
value: T,
queue: Rc<RefCell<TimeQueue<T>>>,
node_id: Rc<Cell<Option<usize>>>,
}
impl<T: Element + Hash + Eq> StreamPeekRef<T> for FeedbackStream<T> {
fn peek_ref(&self) -> &T {
&self.value
}
}
impl<T: Element + Hash + Eq> MutableNode for FeedbackStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
let mut ticked = false;
loop {
let mut q = self.queue.borrow_mut();
if !q.pending(state.time()) {
break;
}
self.value = q.pop();
ticked = true;
}
Ok(ticked)
}
fn upstreams(&self) -> UpStreams {
UpStreams::default()
}
fn setup(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
self.node_id.set(Some(state.current_node_id()));
Ok(())
}
}
pub struct FeedbackSink<T: Element + Hash + Eq> {
queue: Rc<RefCell<TimeQueue<T>>>,
node_id: Rc<Cell<Option<usize>>>,
}
impl<T: Element + Hash + Eq> Clone for FeedbackSink<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
node_id: self.node_id.clone(),
}
}
}
impl<T: Element + Hash + Eq> FeedbackSink<T> {
pub fn send(&self, value: T, state: &mut GraphState) {
let time = state.time() + 1;
self.queue.borrow_mut().push(value, time);
state.add_callback_for_node(self.node_id.get().unwrap(), time);
}
}
#[must_use]
pub fn feedback<T: Element + Hash + Eq>() -> (FeedbackSink<T>, Rc<dyn Stream<T>>) {
let queue = Rc::new(RefCell::new(TimeQueue::new()));
let node_id = Rc::new(Cell::new(None));
let stream = FeedbackStream {
value: T::default(),
queue: queue.clone(),
node_id: node_id.clone(),
};
let sink = FeedbackSink { queue, node_id };
(sink, stream.into_stream())
}
#[must_use]
pub fn feedback_node() -> (FeedbackSink<()>, Rc<dyn Node>) {
let (sink, stream) = feedback::<()>();
(sink, stream.as_node())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Dep::*;
use crate::graph::*;
use crate::nodes::*;
use std::time::Duration;
#[test]
fn feedback_passive_works() {
let period = Duration::from_nanos(100);
let (tx, rx) = feedback::<u64>();
let source = ticker(period).count();
let value = bimap(Active(source), Passive(rx), |src, fb| src + fb * 10);
let fb = value.feedback(tx);
let res = value.accumulate().finally(|values, _| {
assert_eq!(vec![1, 12, 123, 1234, 12345, 123456], values);
Ok(())
});
Graph::new(
vec![fb, res],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(period * 5),
)
.run()
.unwrap();
}
#[test]
fn feedback_active_works() {
let (tx, rx) = feedback::<u64>();
let source = constant(1);
let value = bimap(Active(source), Active(rx), |src, fb| src + fb * 10);
let fb = value.feedback(tx);
let res = value.collect().finally(|values, _| {
let expected = vec![
ValueAt::new(1, NanoTime::new(0)),
ValueAt::new(11, NanoTime::new(1)),
ValueAt::new(111, NanoTime::new(2)),
ValueAt::new(1111, NanoTime::new(3)),
ValueAt::new(11111, NanoTime::new(4)),
];
assert_eq!(expected, values);
Ok(())
});
Graph::new(
vec![fb, res],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(5),
)
.run()
.unwrap();
}
#[test]
fn feedback_works() {
let period = Duration::from_nanos(100);
let lookback = 5;
let level: i64 = 3;
let source = ticker(period).count();
let (tx, rx) = feedback_node();
let delayed = source.delay_with_reset(period * lookback, rx);
let diff = bimap(Active(source), Passive(delayed), |a, b| a as i64 - b as i64);
let trigger = diff
.filter_value(move |p| p.abs() > level)
.as_node()
.feedback(tx);
let res = diff.accumulate().finally(|value, _| {
let expected = vec![0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3];
assert_eq!(expected, value);
Ok(())
});
Graph::new(
vec![trigger, res],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Duration(period * 14),
)
.run()
.unwrap();
}
}