use crate::queue::HashByRef;
use crate::queue::TimeQueue;
use crate::types::{NanoTime, Node};
use crossbeam::channel::{Receiver, SendError, Sender, select};
use lazy_static::lazy_static;
use std::cmp::{max, min};
use std::collections::HashMap;
#[cfg(feature = "dynamic-graph-beta")]
use std::collections::HashSet;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Error, Write};
use std::path::Path;
use std::rc::Rc;
#[cfg(feature = "async")]
use std::sync::Arc;
use std::sync::Mutex;
#[cfg(feature = "async")]
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use std::vec;
lazy_static! {
static ref GRAPH_ID: Mutex<usize> = Mutex::new(0);
}
struct NodeData {
node: Rc<dyn Node>,
upstreams: Vec<(usize, bool)>,
downstreams: Vec<(usize, bool)>,
layer: usize,
active: bool,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RunMode {
RealTime,
HistoricalFrom(NanoTime),
}
impl RunMode {
pub fn start_time(&self) -> NanoTime {
match self {
RunMode::RealTime => NanoTime::now(),
RunMode::HistoricalFrom(start_time) => *start_time,
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum RunFor {
Duration(Duration),
Cycles(u32),
Forever,
}
impl RunFor {
pub fn done(&self, cycle: u32, elapsed: NanoTime) -> bool {
match self {
RunFor::Cycles(cycles) => cycle > *cycles,
RunFor::Duration(duration) => elapsed > NanoTime::from(*duration),
RunFor::Forever => false,
}
}
}
fn average_duration(duration: Duration, n: u32) -> Duration {
let avg_nanos = if n == 0 {
0
} else {
duration.as_nanos() / n as u128
};
Duration::from_nanos(avg_nanos as u64)
}
#[derive(Clone, Debug)]
pub(crate) struct ReadyNotifier {
pub node_index: usize,
pub sender: Sender<usize>,
}
impl ReadyNotifier {
pub fn notify(&self) -> anyhow::Result<(), SendError<usize>> {
self.sender.send(self.node_index)
}
}
pub struct GraphState {
time: NanoTime,
first_cycle: bool,
is_last_cycle: bool,
stop_requested: bool,
current_node_index: Option<usize>,
scheduled_callbacks: TimeQueue<usize>,
always_callbacks: Vec<usize>,
node_to_index: HashMap<HashByRef<dyn Node>, usize>,
node_ticked: Vec<bool>,
#[cfg(feature = "async")]
run_time: OnceLock<Arc<tokio::runtime::Runtime>>,
run_mode: RunMode,
run_for: RunFor,
ready_notifier: Sender<usize>,
ready_callbacks: Receiver<usize>,
start_time: NanoTime,
id: usize,
nodes: Vec<NodeData>,
dirty_nodes_by_layer: Vec<Vec<usize>>,
node_dirty: Vec<bool>,
#[cfg(feature = "dynamic-graph-beta")]
pending_additions: Vec<(Rc<dyn Node>, usize, bool, bool)>,
#[cfg(feature = "dynamic-graph-beta")]
pending_removals: Vec<Rc<dyn Node>>,
}
impl GraphState {
pub fn new(run_mode: RunMode, run_for: RunFor, start_time: NanoTime) -> Self {
let (ready_notifier, ready_callbacks) = crossbeam::channel::unbounded();
let mut id = GRAPH_ID.lock().unwrap();
let slf = Self {
time: NanoTime::ZERO,
first_cycle: true,
is_last_cycle: false,
stop_requested: false,
current_node_index: None,
scheduled_callbacks: TimeQueue::new(),
always_callbacks: Vec::new(),
node_to_index: HashMap::new(),
node_ticked: Vec::new(),
#[cfg(feature = "async")]
run_time: OnceLock::new(),
ready_notifier,
run_mode,
run_for,
ready_callbacks,
start_time,
id: *id,
nodes: Vec::new(),
dirty_nodes_by_layer: Vec::new(),
node_dirty: Vec::new(),
#[cfg(feature = "dynamic-graph-beta")]
pending_additions: Vec::new(),
#[cfg(feature = "dynamic-graph-beta")]
pending_removals: Vec::new(),
};
*id += 1;
slf
}
pub fn time(&self) -> NanoTime {
self.time
}
pub fn elapsed(&self) -> NanoTime {
self.time - self.start_time
}
pub fn start_time(&self) -> NanoTime {
self.start_time
}
pub(crate) fn ready_notifier(&self) -> ReadyNotifier {
ReadyNotifier {
node_index: self.current_node_index.unwrap(),
sender: self.ready_notifier.clone(),
}
}
#[cfg(feature = "async")]
pub fn tokio_runtime(&self) -> Arc<tokio::runtime::Runtime> {
self.run_time
.get_or_init(|| {
if tokio::runtime::Handle::try_current().is_ok() {
panic!(
"wingfoil cannot be run from an async context (e.g. `#[tokio::main]`). \
Call graph.run() from a synchronous thread instead. \
Tip: std::thread::spawn(|| graph.run(...)).join().unwrap()"
);
}
Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
)
})
.clone()
}
pub fn add_callback(&mut self, time: NanoTime) {
self.add_callback_for_node(self.current_node_index.unwrap(), time);
}
pub(crate) fn current_node_id(&self) -> usize {
self.current_node_index.unwrap()
}
pub fn always_callback(&mut self) {
let ix = self.current_node_index.unwrap();
self.always_callbacks.push(ix);
}
pub fn is_last_cycle(&self) -> bool {
self.is_last_cycle
}
pub fn request_stop(&mut self) {
self.stop_requested = true;
}
pub fn ticked(&self, node: Rc<dyn Node>) -> bool {
self.node_index(node)
.map(|i| self.node_ticked[i])
.unwrap_or(false)
}
#[cfg(feature = "dynamic-graph-beta")]
pub fn add_upstream(&mut self, upstream: Rc<dyn Node>, is_active: bool, recycle: bool) {
let caller_index = self.current_node_index.unwrap();
self.pending_additions
.push((upstream, caller_index, is_active, recycle));
}
#[cfg(feature = "dynamic-graph-beta")]
pub fn remove_node(&mut self, node: Rc<dyn Node>) {
self.pending_removals.push(node);
}
#[allow(dead_code)]
pub(crate) fn node_index_ticked(&self, node_index: usize) -> bool {
self.node_ticked[node_index]
}
fn has_scheduled_callbacks(&self) -> bool {
!self.scheduled_callbacks.is_empty()
}
fn next_scheduled_time(&self) -> NanoTime {
if self.scheduled_callbacks.is_empty() {
NanoTime::MAX
} else {
self.scheduled_callbacks.next_time()
}
}
pub(crate) fn add_callback_for_node(&mut self, node_index: usize, time: NanoTime) {
self.scheduled_callbacks.push(node_index, time);
}
fn has_pending_scheduled_callbacks(&self) -> bool {
self.scheduled_callbacks.pending(self.time)
}
fn wait_ready_callback(&mut self, end_time: NanoTime) -> Option<usize> {
let now = NanoTime::now();
if now > end_time {
None
} else {
let timeout = u64::from(end_time - now);
select! {
recv(self.ready_callbacks) -> msg => {
Some(msg.unwrap())
},
default(Duration::from_nanos(timeout)) => {
None
}
}
}
}
pub fn node_index(&self, node: Rc<dyn Node>) -> Option<usize> {
let key = HashByRef::new(node.clone());
self.node_to_index.get(&key).copied()
}
fn reset(&mut self) {
for i in self.node_ticked.iter_mut() {
*i = false;
}
}
fn push_node(&mut self, node: Rc<dyn Node>) {
let index = self.node_ticked.len();
self.node_ticked.push(false);
self.node_to_index
.insert(HashByRef::new(node.clone()), index);
}
fn seen(&self, node: Rc<dyn Node>) -> bool {
self.node_to_index.contains_key(&HashByRef::new(node))
}
fn set_ticked(&mut self, index: usize) {
self.node_ticked[index] = true;
}
pub fn run_mode(&self) -> RunMode {
self.run_mode
}
pub fn run_for(&self) -> RunFor {
self.run_for
}
pub fn log(&self, level: log::Level, msg: &str) {
let Some(ix) = self.current_node_index else {
return;
};
let id = self.id;
#[cfg(not(feature = "tracing"))]
if log_enabled!(level) {
let type_name = self.nodes[ix].node.type_name();
log!(target: &type_name, level, "[{id},{ix}]{msg}");
}
#[cfg(feature = "tracing")]
if tracing_log_enabled!(level) {
let type_name = self.nodes[ix].node.type_name();
tracing_log!(level, node = %type_name, "[{id},{ix}]{msg}");
}
}
pub(crate) fn mark_dirty(&mut self, index: usize) {
if !self.node_dirty[index] {
let layer = self.nodes[index].layer;
self.dirty_nodes_by_layer[layer].push(index);
self.node_dirty[index] = true;
}
}
}
pub struct Graph {
pub(crate) state: GraphState,
}
impl Graph {
pub fn new(root_nodes: Vec<Rc<dyn Node>>, run_mode: RunMode, run_for: RunFor) -> Graph {
let start_time = run_mode.start_time();
let state = GraphState::new(run_mode, run_for, start_time);
let mut graph = Graph { state };
graph.initialise(root_nodes);
graph
}
#[cfg(feature = "async")]
pub fn new_with(
root_nodes: Vec<Rc<dyn Node>>,
tokio_runtime: Arc<tokio::runtime::Runtime>,
run_mode: RunMode,
run_for: RunFor,
start_time: NanoTime,
) -> Graph {
let state = GraphState::new(run_mode, run_for, start_time);
state.run_time.set(tokio_runtime).ok();
let mut graph = Graph { state };
graph.initialise(root_nodes);
graph
}
pub(crate) fn setup_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("setup", |node, state| node.setup(state))
}
pub(crate) fn start_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("start", |node, state| node.start(state))
}
pub(crate) fn stop_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("stop", |node, state| node.stop(state))
}
pub(crate) fn teardown_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("teardown", |node, state| node.teardown(state))
}
#[cfg_attr(
feature = "instrument-apply-nodes",
tracing::instrument(skip(self, func))
)]
fn apply_nodes(
&mut self,
desc: &str,
func: impl Fn(Rc<dyn Node>, &mut GraphState) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let timer = Instant::now();
for ix in 0..self.state.nodes.len() {
if !self.state.nodes[ix].active {
continue;
}
let node = self.state.nodes[ix].node.clone();
self.state.current_node_index = Some(ix);
func(node, &mut self.state).map_err(|e| {
let context = self.format_context(ix, 3);
e.context(format!("Error during {desc} in node [{ix}]:\n{context}"))
})?;
self.state.current_node_index = None;
}
debug!(
"graph {:?}, {:?} took {:?} for {:?} nodes",
self.state.id,
desc,
timer.elapsed(),
self.state.nodes.len()
);
Ok(())
}
fn resolve_start_end(
&self,
start_time: &mut NanoTime,
end_time: &mut NanoTime,
end_cycle: &mut u32,
is_realtime: &mut bool,
) {
*end_time = NanoTime::MAX; *end_cycle = u32::MAX; match self.state.run_mode() {
RunMode::RealTime => {
*is_realtime = true;
*start_time = NanoTime::now();
}
RunMode::HistoricalFrom(t) => {
*is_realtime = false;
*start_time = t;
}
};
match self.state.run_for {
RunFor::Duration(duration) => {
*end_time = *start_time + duration.as_nanos() as u64;
debug!("end_time = {end_time}",);
}
RunFor::Cycles(cycle) => {
*end_cycle = cycle;
debug!("end_cycle = {end_cycle}",);
}
RunFor::Forever => {}
}
}
pub(crate) fn run_nodes(&mut self) -> anyhow::Result<()> {
let run_timer = Instant::now();
let mut cycles: u32 = 0;
let mut empty_cycles: u32 = 0;
let mut end_time = NanoTime::MAX;
let mut end_cycle = u32::MAX;
let mut is_realtime = false;
let mut start_time = NanoTime::ZERO;
self.resolve_start_end(
&mut start_time,
&mut end_time,
&mut end_cycle,
&mut is_realtime,
);
self.state.start_time = start_time;
loop {
if self.state.is_last_cycle && (self.state.time >= end_time || cycles >= end_cycle) {
debug!(
"Finished. {:}, {:}, {:}, {:}",
self.state.time >= end_time,
cycles >= end_cycle,
self.state.time,
end_time
);
break;
}
if !self.state.is_last_cycle && (cycles >= end_cycle - 1 || self.state.time >= end_time)
{
debug!("last cycle");
self.state.is_last_cycle = true;
}
if is_realtime {
let progressed = self.process_callbacks_realtime(end_time);
if !progressed {
empty_cycles += 1;
continue;
}
} else {
let progressed = self.process_callbacks_historical();
if !progressed {
debug!("Terminating early.");
break;
}
}
self.cycle()?;
cycles += 1;
debug!("cycles={cycles}");
if self.state.stop_requested {
debug!("Stop requested by node, terminating early.");
break;
}
}
let elapsed = run_timer.elapsed();
debug!("{empty_cycles} empty cycles");
debug!(
"Completed {:} cycles in {:?}. {:?} average.",
cycles,
run_timer.elapsed(),
average_duration(elapsed, cycles)
);
Ok(())
}
#[cfg_attr(feature = "instrument-run", tracing::instrument(skip_all))]
pub fn run(&mut self) -> anyhow::Result<()> {
self.setup_nodes()?;
self.start_nodes()?;
self.run_nodes()?;
self.stop_nodes()?;
self.teardown_nodes()?;
Ok(())
}
#[cfg_attr(feature = "instrument-initialise", tracing::instrument(skip_all))]
fn initialise(&mut self, root_nodes: Vec<Rc<dyn Node>>) -> &mut Graph {
let timer = Instant::now();
for node in root_nodes {
if !self.state.seen(node.clone()) {
self.initialise_node(&node);
}
}
let mut max_layer: i32 = -1;
for i in 0..self.state.nodes.len() {
max_layer = max(max_layer, self.state.nodes[i].layer.try_into().unwrap());
self.state.node_dirty.push(false);
for j in 0..self.state.nodes[i].upstreams.len() {
let (up_index, active) = self.state.nodes[i].upstreams[j];
self.state.nodes[up_index].downstreams.push((i, active));
}
}
for _ in 0..max_layer + 1 {
self.state.dirty_nodes_by_layer.push(vec![]);
}
debug!(
"{:} nodes wired in {:?}",
self.state.nodes.len(),
timer.elapsed()
);
self
}
fn initialise_upstreams(
&mut self,
upstreams: &[Rc<dyn Node>],
is_active: bool,
layer: &mut usize,
upstream_indexes: &mut Vec<(usize, bool)>,
) {
for upstream_node in upstreams {
let upstream_index = self.initialise_node(upstream_node);
upstream_indexes.push((upstream_index, is_active));
*layer = max(*layer, self.state.nodes[upstream_index].layer + 1);
}
}
fn initialise_node(&mut self, node: &Rc<dyn Node>) -> usize {
if self.state.seen(node.clone()) {
self.state.node_index(node.clone()).unwrap()
} else {
let mut layer = 0;
let mut upstream_indexes = vec![];
let upstreams = node.upstreams();
self.initialise_upstreams(&upstreams.active, true, &mut layer, &mut upstream_indexes);
self.initialise_upstreams(&upstreams.passive, false, &mut layer, &mut upstream_indexes);
let node_data = NodeData {
node: node.clone(),
upstreams: upstream_indexes,
downstreams: vec![],
layer,
active: true,
};
let index = self.state.nodes.len();
self.state.push_node(node.clone());
self.state.nodes.push(node_data);
index
}
}
fn mark_dirty(&mut self, index: usize) {
if !self.state.node_dirty[index] {
let layer = self.state.nodes[index].layer;
self.state.dirty_nodes_by_layer[layer].push(index);
self.state.node_dirty[index] = true;
}
}
fn process_scheduled_callbacks(&mut self) -> bool {
let mut progressed = false;
for i in 0..self.state.always_callbacks.len() {
let ix = self.state.always_callbacks[i];
self.mark_dirty(ix);
progressed = true;
}
while self.state.has_pending_scheduled_callbacks() {
let ix = self.state.scheduled_callbacks.pop();
self.mark_dirty(ix);
progressed = true;
}
progressed
}
fn process_callbacks_historical(&mut self) -> bool {
if !self.state.ready_callbacks.is_empty() {
panic!("ready_callbacks are not supported in realtime mode.");
}
if self.state.has_scheduled_callbacks() {
let next = self.state.next_scheduled_time();
self.state.time = if self.state.first_cycle {
self.state.first_cycle = false;
next
} else {
next.max(self.state.time + 1)
};
}
self.process_scheduled_callbacks()
}
fn process_ready_callbacks(&mut self) -> bool {
let mut progressed = false;
while !self.state.ready_callbacks.is_empty() {
let ix = self.state.ready_callbacks.recv().unwrap();
self.mark_dirty(ix);
progressed = true;
}
progressed
}
fn process_callbacks_realtime(&mut self, end_time: NanoTime) -> bool {
let mut progressed = self.process_ready_callbacks();
if self.process_scheduled_callbacks() {
progressed = true;
}
if !progressed {
let wait_until = min(end_time, self.state.next_scheduled_time());
if let Some(ix) = self.state.wait_ready_callback(wait_until) {
self.mark_dirty(ix);
progressed = true;
}
}
self.state.time = NanoTime::now().max(self.state.time + 1);
progressed
}
#[cfg_attr(feature = "instrument-cycle", tracing::instrument(skip_all))]
fn cycle(&mut self) -> anyhow::Result<()> {
for lyr in 0..self.state.dirty_nodes_by_layer.len() {
for i in 0..self.state.dirty_nodes_by_layer[lyr].len() {
let ix = self.state.dirty_nodes_by_layer[lyr][i];
self.cycle_node(ix)?;
}
}
self.reset();
#[cfg(feature = "dynamic-graph-beta")]
self.process_pending_removals()?;
#[cfg(feature = "dynamic-graph-beta")]
self.process_pending_additions()?;
Ok(())
}
#[cfg_attr(
feature = "instrument-cycle-node",
tracing::instrument(skip(self), fields(node = tracing::field::Empty))
)]
fn cycle_node(&mut self, index: usize) -> anyhow::Result<()> {
if !self.state.nodes[index].active {
return Ok(());
}
#[cfg(feature = "instrument-cycle-node")]
tracing::Span::current().record("node", self.state.nodes[index].node.type_name());
let node = &self.state.nodes[index].node;
self.state.current_node_index = Some(index);
let result = node.clone().cycle(&mut self.state);
self.state.current_node_index = None;
let ticked = result.map_err(|e| {
let context = self.format_context(index, 3);
e.context(format!("Error in node [{index}]:\n{context}"))
})?;
if ticked {
self.state.set_ticked(index);
for i in 0..self.state.nodes[index].downstreams.len() {
let (downstream_index, active) = self.state.nodes[index].downstreams[i];
if active {
self.mark_dirty(downstream_index)
}
}
}
Ok(())
}
fn reset(&mut self) {
self.state.reset();
for layer in self.state.dirty_nodes_by_layer.iter_mut() {
layer.clear();
}
for i in self.state.node_dirty.iter_mut() {
*i = false;
}
}
#[cfg(feature = "dynamic-graph-beta")]
fn process_pending_removals(&mut self) -> anyhow::Result<()> {
let removals = std::mem::take(&mut self.state.pending_removals);
for node in removals {
let Some(index) = self.state.node_index(node.clone()) else {
continue;
};
if !self.state.nodes[index].active {
continue;
}
let upstreams: Vec<(usize, bool)> = self.state.nodes[index].upstreams.clone();
for (up_idx, _) in &upstreams {
self.state.nodes[*up_idx]
.downstreams
.retain(|(di, _)| *di != index);
}
let downstreams: Vec<(usize, bool)> = self.state.nodes[index].downstreams.clone();
for (dn_idx, _) in &downstreams {
self.state.nodes[*dn_idx]
.upstreams
.retain(|(ui, _)| *ui != index);
}
self.state.current_node_index = Some(index);
node.clone().stop(&mut self.state).map_err(|e| {
e.context(format!(
"Error during stop in node [{index}] (pending removal)"
))
})?;
node.clone().teardown(&mut self.state).map_err(|e| {
e.context(format!(
"Error during teardown in node [{index}] (pending removal)"
))
})?;
self.state.current_node_index = None;
self.state.nodes[index].active = false;
}
Ok(())
}
#[cfg(feature = "dynamic-graph-beta")]
fn process_pending_additions(&mut self) -> anyhow::Result<()> {
let additions = std::mem::take(&mut self.state.pending_additions);
if additions.is_empty() {
return Ok(());
}
let start_index = self.state.nodes.len();
for (node, _caller_index, _is_active, _recycle) in &additions {
if !self.state.seen(node.clone()) {
self.initialise_node(node);
}
}
let new_indices: Vec<usize> = (start_index..self.state.nodes.len()).collect();
for _ in &new_indices {
self.state.node_dirty.push(false);
}
for &ix in &new_indices {
let upstreams = self.state.nodes[ix].upstreams.clone();
for (up_idx, active) in upstreams {
self.state.nodes[up_idx].downstreams.push((ix, active));
}
}
let mut wired_edges: HashSet<(usize, usize)> = HashSet::new();
for (node, caller_index, is_active, recycle) in &additions {
let node_index = self.state.node_index(node.clone()).unwrap();
if wired_edges.insert((*caller_index, node_index)) {
self.state.nodes[*caller_index]
.upstreams
.push((node_index, *is_active));
self.state.nodes[node_index]
.downstreams
.push((*caller_index, *is_active));
}
self.fix_layers(*caller_index);
if *recycle {
let time = self.state.time + NanoTime::new(1);
let new_set: HashSet<usize> = new_indices.iter().cloned().collect();
let mut stack = vec![node_index];
let mut visited: HashSet<usize> = HashSet::new();
while let Some(ix) = stack.pop() {
if !visited.insert(ix) {
continue;
}
let upstreams: Vec<(usize, bool)> = self.state.nodes[ix].upstreams.clone();
let has_preexisting = upstreams.iter().any(|(up, _)| !new_set.contains(up));
let is_source = upstreams.is_empty();
if has_preexisting || is_source {
self.state.add_callback_for_node(ix, time);
}
for &(up_idx, _) in &upstreams {
if new_set.contains(&up_idx) {
stack.push(up_idx);
}
}
}
}
}
for &ix in &new_indices {
let node = self.state.nodes[ix].node.clone();
self.state.current_node_index = Some(ix);
node.setup(&mut self.state).map_err(|e| {
e.context(format!(
"Error during setup in node [{ix}] (dynamic addition)"
))
})?;
self.state.current_node_index = None;
}
for &ix in &new_indices {
let node = self.state.nodes[ix].node.clone();
self.state.current_node_index = Some(ix);
node.start(&mut self.state).map_err(|e| {
e.context(format!(
"Error during start in node [{ix}] (dynamic addition)"
))
})?;
self.state.current_node_index = None;
}
Ok(())
}
#[cfg(feature = "dynamic-graph-beta")]
fn fix_layers(&mut self, start: usize) {
let mut queue = std::collections::VecDeque::new();
queue.push_back(start);
while let Some(node_index) = queue.pop_front() {
let required = self.state.nodes[node_index]
.upstreams
.iter()
.map(|(up_idx, _)| self.state.nodes[*up_idx].layer)
.max()
.map_or(0, |m| m + 1);
if required > self.state.nodes[node_index].layer {
self.state.nodes[node_index].layer = required;
let downstreams: Vec<usize> = self.state.nodes[node_index]
.downstreams
.iter()
.map(|(di, _)| *di)
.collect();
for dn_idx in downstreams {
queue.push_back(dn_idx);
}
}
let layer = self.state.nodes[node_index].layer;
while self.state.dirty_nodes_by_layer.len() <= layer {
self.state.dirty_nodes_by_layer.push(vec![]);
}
}
}
fn format_context(&self, target_index: usize, range: usize) -> String {
let mut output = String::new();
let start = target_index.saturating_sub(range);
let end = (target_index + range + 1).min(self.state.nodes.len());
for i in start..end {
let node_data = &self.state.nodes[i];
let marker = if i == target_index { ">>> " } else { " " };
output.push_str(&format!("{marker}[{i:02}] "));
for _ in 0..node_data.layer {
output.push_str(" ");
}
output.push_str(&format!("{}\n", node_data.node));
}
output
}
pub fn print(&mut self) -> &mut Graph {
for (i, node_data) in self.state.nodes.iter().enumerate() {
print!("[{i:02}] ");
for _ in 0..node_data.layer {
print!(" ");
}
println!("{:}", node_data.node);
}
self
}
pub fn export(&self, path: &str) -> Result<(), Error> {
let path = Path::new(&path);
let mut output = File::create(path)?;
writeln!(output, "graph [")?;
for i in 0..self.state.nodes.len() {
writeln!(output, " node [")?;
writeln!(output, " id {i}")?;
writeln!(
output,
" label \"[{i}] {}\"",
self.state.nodes[i].node
)?;
writeln!(output, " graphics")?;
writeln!(output, " [")?;
writeln!(output, " w 200.0")?;
writeln!(output, " h 30.0")?;
writeln!(output, " ]")?;
writeln!(output, " ]")?;
}
for (i, node) in self.state.nodes.iter().enumerate() {
for downstream in node.downstreams.iter() {
let downstream_index = downstream.0;
writeln!(output, " edge [")?;
writeln!(output, " source {i}")?;
writeln!(output, " target {downstream_index}")?;
writeln!(output, " ]")?;
}
}
writeln!(output, "]")
}
}
#[cfg(test)]
mod tests {
use crate::graph::*;
use crate::nodes::*;
use crate::queue::ValueAt;
use crate::types::*;
use std::cell::RefCell;
use itertools::Itertools;
#[test]
fn run_for_cycles_done_when_exceeded() {
let rf = RunFor::Cycles(3);
assert!(!rf.done(3, NanoTime::ZERO)); assert!(rf.done(4, NanoTime::ZERO)); }
#[test]
fn run_for_duration_done_when_elapsed_exceeds() {
use std::time::Duration;
let rf = RunFor::Duration(Duration::from_nanos(100));
assert!(!rf.done(0, NanoTime::new(100))); assert!(rf.done(0, NanoTime::new(101))); }
#[test]
fn run_for_forever_never_done() {
let rf = RunFor::Forever;
assert!(!rf.done(u32::MAX, NanoTime::MAX));
}
#[test]
fn average_duration_normal() {
let src = Rc::new(RefCell::new(CallBackStream::<u64>::new()));
src.borrow_mut().push(ValueAt::new(1u64, NanoTime::new(10)));
src.clone()
.as_stream()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(1))
.unwrap();
let result = Graph::new(
vec![Rc::new(RefCell::new(CallBackStream::<u64>::new())).as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
)
.run();
assert!(result.is_ok());
}
#[test]
fn average_duration_zero_n_returns_zero() {
use std::time::Duration;
assert_eq!(average_duration(Duration::from_secs(10), 0), Duration::ZERO);
}
#[test]
fn average_duration_normal_case() {
use std::time::Duration;
assert_eq!(
average_duration(Duration::from_nanos(100), 4),
Duration::from_nanos(25)
);
}
#[test]
fn node_index_ticked_reflects_cycle() {
let src = Rc::new(RefCell::new(CallBackStream::<u64>::new()));
src.borrow_mut().push(ValueAt::new(1u64, NanoTime::new(1)));
let cnt = src.clone().as_stream().count();
cnt.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let mut state = GraphState::new(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
NanoTime::ZERO,
);
state.node_ticked.push(false);
assert!(!state.node_index_ticked(0));
state.node_ticked[0] = true;
assert!(state.node_index_ticked(0));
}
#[test]
fn graph_state_log_with_no_current_node_is_noop() {
let state = GraphState::new(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
NanoTime::ZERO,
);
state.log(log::Level::Info, "test message");
}
#[test]
fn graph_export_writes_gml_file() {
use std::fs;
let src: Rc<dyn Stream<u64>> = Rc::new(RefCell::new(CallBackStream::<u64>::new()));
let mapped = src.map(|v| v + 1);
let graph = mapped.into_graph(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(0));
let path = "/tmp/wingfoil_test_export.gml";
graph.export(path).unwrap();
let content = fs::read_to_string(path).unwrap();
assert!(content.contains("graph ["));
assert!(content.contains("node ["));
fs::remove_file(path).unwrap();
}
#[test]
fn historical_mode_works() {
let num_inputs = 7;
let inputs: Vec<Rc<RefCell<CallBackStream<i32>>>> = (0..num_inputs)
.map(|_| Rc::new(RefCell::new(CallBackStream::new())))
.collect();
let captured = inputs
.iter()
.map(|stream| stream.clone().as_stream().distinct())
.tree_reduce(
|a, b| add(&a, &b),
)
.unwrap()
.collect();
let mut expected: Vec<ValueAt<i32>> = vec![];
push_all(
&inputs,
ValueAt {
value: 1,
time: NanoTime::new(100),
},
);
expected.push(ValueAt {
value: 7,
time: NanoTime::new(100),
});
push_all(
&inputs,
ValueAt {
value: 1,
time: NanoTime::new(200),
},
);
push_first(
&inputs,
ValueAt {
value: 2,
time: NanoTime::new(300),
},
);
expected.push(ValueAt {
value: 8,
time: NanoTime::new(300),
});
push_first(
&inputs,
ValueAt {
value: 2,
time: NanoTime::new(400),
},
);
let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
Graph::new(vec![captured.clone().as_node()], run_mode, RunFor::Forever)
.print()
.run()
.unwrap();
let captured_data = captured.peek_value();
println!();
println!("captured_data {:?}", captured_data);
println!("expected {:?}", expected);
println!();
assert_eq!(captured_data, expected);
}
#[test]
fn error_context_shows_graph_structure() {
use std::time::Duration;
let mut stream: Rc<dyn Stream<u64>> = ticker(Duration::from_nanos(100)).count();
for _ in 0..10 {
stream = stream.map(|x: u64| x);
}
stream = stream.try_map(|x: u64| {
if x == 3 {
anyhow::bail!("intentional failure at count 3")
} else {
Ok(x)
}
});
for _ in 0..10 {
stream = stream.map(|x: u64| x);
}
let mut graph = Graph::new(
vec![stream.as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(10),
);
graph.print();
let result = graph.run();
assert!(result.is_err(), "Expected error but got: {:?}", result);
let err_msg = format!("{:?}", result.unwrap_err());
let expected = r#"Error in node [14]:
[11] MapStream<u64, u64>
[12] MapStream<u64, u64>
[13] MapStream<u64, u64>
>>> [14] TryMapStream<u64, u64>
[15] MapStream<u64, u64>
[16] MapStream<u64, u64>
[17] MapStream<u64, u64>
Caused by:
intentional failure at count 3"#;
assert!(
err_msg.contains(expected),
"Error message mismatch.\n\nExpected to contain:\n{expected}\n\nActual:\n{err_msg}"
);
}
fn push_all(inputs: &[Rc<RefCell<CallBackStream<i32>>>], value_at: ValueAt<i32>) {
inputs
.iter()
.for_each(|input| input.borrow_mut().push(value_at.clone()));
}
fn push_first(inputs: &[Rc<RefCell<CallBackStream<i32>>>], value_at: ValueAt<i32>) {
inputs[0].borrow_mut().push(value_at);
}
struct TimeCapturingNode {
times: Vec<NanoTime>,
resched_time: NanoTime,
}
impl MutableNode for TimeCapturingNode {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
let t = state.time();
self.times.push(t);
if self.times.len() == 1 {
state.add_callback(self.resched_time);
}
Ok(true)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
state.add_callback(NanoTime::new(100));
Ok(())
}
}
#[test]
fn time_advances_with_duplicate_scheduled_time() {
let node = Rc::new(RefCell::new(TimeCapturingNode {
times: vec![],
resched_time: NanoTime::new(100), }));
Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(2),
)
.run()
.unwrap();
let times = node.borrow().times.clone();
assert_eq!(times.len(), 2, "expected exactly 2 cycles");
assert!(
times[1] > times[0],
"time did not advance between cycles: {:?} -> {:?}",
times[0],
times[1]
);
}
#[test]
fn time_advances_with_past_scheduled_time() {
let node = Rc::new(RefCell::new(TimeCapturingNode {
times: vec![],
resched_time: NanoTime::new(50), }));
Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(2),
)
.run()
.unwrap();
let times = node.borrow().times.clone();
assert_eq!(times.len(), 2, "expected exactly 2 cycles");
assert!(
times[1] > times[0],
"time went backwards or stalled: {:?} -> {:?}",
times[0],
times[1]
);
}
#[test]
fn ticked_on_unregistered_node_returns_false() {
use std::time::Duration;
let orphan = ticker(Duration::from_nanos(1)).count();
let state = GraphState::new(
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(1),
NanoTime::ZERO,
);
assert!(!state.ticked(orphan.as_node()));
}
#[cfg(feature = "dynamic-graph-beta")]
mod dynamism {
use super::*;
struct DynAdderNode {
trigger: Rc<dyn Node>,
extra: Rc<dyn Node>,
add_after: u64,
cycle_count: u64,
extra_ticks: Rc<RefCell<u64>>,
stop_count: Rc<RefCell<u64>>,
teardown_count: Rc<RefCell<u64>>,
}
impl MutableNode for DynAdderNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.cycle_count += 1;
if self.cycle_count == self.add_after {
state.add_upstream(self.extra.clone(), true, false);
}
if state.ticked(self.extra.clone()) {
*self.extra_ticks.borrow_mut() += 1;
}
Ok(true)
}
fn stop(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
*self.stop_count.borrow_mut() += 1;
Ok(())
}
fn teardown(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
*self.teardown_count.borrow_mut() += 1;
Ok(())
}
}
struct SimpleCounter {
trigger: Rc<dyn Node>,
n: u64,
}
impl MutableNode for SimpleCounter {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, _: &mut GraphState) -> anyhow::Result<bool> {
self.n += 1;
Ok(true)
}
}
impl StreamPeekRef<u64> for SimpleCounter {
fn peek_ref(&self) -> &u64 {
&self.n
}
}
struct LifecycleCounterNode {
trigger: Rc<dyn Node>,
cycle_count: Rc<RefCell<u64>>,
stop_count: Rc<RefCell<u64>>,
teardown_count: Rc<RefCell<u64>>,
}
impl MutableNode for LifecycleCounterNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, _: &mut GraphState) -> anyhow::Result<bool> {
*self.cycle_count.borrow_mut() += 1;
Ok(true)
}
fn stop(&mut self, _: &mut GraphState) -> anyhow::Result<()> {
*self.stop_count.borrow_mut() += 1;
Ok(())
}
fn teardown(&mut self, _: &mut GraphState) -> anyhow::Result<()> {
*self.teardown_count.borrow_mut() += 1;
Ok(())
}
}
#[test]
fn add_upstream_dynamically_fires_only_after_wired() {
use std::time::Duration;
let ticker = ticker(Duration::from_nanos(1));
let extra: Rc<dyn Stream<u64>> = SimpleCounter {
trigger: ticker.clone(),
n: 0,
}
.into_stream();
let extra_ticks = Rc::new(RefCell::new(0u64));
let stop_count = Rc::new(RefCell::new(0u64));
let teardown_count = Rc::new(RefCell::new(0u64));
let adder = Rc::new(RefCell::new(DynAdderNode {
trigger: ticker.clone(),
extra: extra.clone().as_node(),
add_after: 3,
cycle_count: 0,
extra_ticks: extra_ticks.clone(),
stop_count: stop_count.clone(),
teardown_count: teardown_count.clone(),
}));
Graph::new(
vec![adder.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(6),
)
.run()
.unwrap();
assert_eq!(*extra_ticks.borrow(), 3, "extra_ticks");
}
struct DynRemoverNode {
trigger: Rc<dyn Node>,
target: Rc<dyn Node>,
remove_after: u64,
cycle_count: u64,
}
impl MutableNode for DynRemoverNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.cycle_count += 1;
if self.cycle_count == self.remove_after {
state.remove_node(self.target.clone());
}
Ok(true)
}
}
#[test]
fn remove_node_stops_firing_and_calls_lifecycle() {
use std::time::Duration;
let ticker = ticker(Duration::from_nanos(1));
let extra_ticks = Rc::new(RefCell::new(0u64));
let stop_count = Rc::new(RefCell::new(0u64));
let teardown_count = Rc::new(RefCell::new(0u64));
let extra_ticks_c = extra_ticks.clone();
let stop_c = stop_count.clone();
let teardown_c = teardown_count.clone();
let extra = ticker.count();
let adder = Rc::new(RefCell::new(DynAdderNode {
trigger: ticker.clone(),
extra: extra.clone().as_node(),
add_after: 1,
cycle_count: 0,
extra_ticks: extra_ticks_c,
stop_count: stop_c,
teardown_count: teardown_c,
}));
let remover = Rc::new(RefCell::new(DynRemoverNode {
trigger: ticker.clone(),
target: adder.clone().as_node(),
remove_after: 5,
cycle_count: 0,
}));
Graph::new(
vec![adder.clone().as_node(), remover.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(8),
)
.run()
.unwrap();
assert_eq!(*extra_ticks.borrow(), 4, "extra_ticks after removal");
assert_eq!(*stop_count.borrow(), 1, "stop_count");
assert_eq!(*teardown_count.borrow(), 1, "teardown_count");
}
#[test]
fn remove_cleans_up_caller_upstreams() {
use std::time::Duration;
let ticker = ticker(Duration::from_nanos(1));
let extra = ticker.count();
let extra_ticks = Rc::new(RefCell::new(0u64));
let stop_count = Rc::new(RefCell::new(0u64));
let teardown_count = Rc::new(RefCell::new(0u64));
let adder = Rc::new(RefCell::new(DynAdderNode {
trigger: ticker.clone(),
extra: extra.clone().as_node(),
add_after: 1,
cycle_count: 0,
extra_ticks: extra_ticks.clone(),
stop_count: stop_count.clone(),
teardown_count: teardown_count.clone(),
}));
let remover = Rc::new(RefCell::new(DynRemoverNode {
trigger: ticker.clone(),
target: extra.clone().as_node(),
remove_after: 3,
cycle_count: 0,
}));
let mut graph = Graph::new(
vec![adder.clone().as_node(), remover.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(6),
);
graph.run().unwrap();
let adder_idx = graph
.state
.node_index(adder.clone().as_node())
.expect("adder in graph");
let extra_idx = graph.state.node_index(extra.clone().as_node());
if let Some(extra_idx) = extra_idx {
let adder_upstreams = &graph.state.nodes[adder_idx].upstreams;
assert!(
!adder_upstreams.iter().any(|(ui, _)| *ui == extra_idx),
"removed node should not remain in caller's upstreams"
);
}
}
#[test]
fn add_upstream_with_recycle_delivers_first_value() {
use std::time::Duration;
struct RecycleNode {
trigger: Rc<dyn Node>,
extra: Rc<dyn Stream<u64>>,
added: bool,
values: Rc<RefCell<Vec<u64>>>,
}
impl MutableNode for RecycleNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if !self.added {
state.add_upstream(self.extra.clone().as_node(), true, true);
self.added = true;
}
if state.ticked(self.extra.clone().as_node()) {
self.values.borrow_mut().push(self.extra.peek_value());
}
Ok(true)
}
}
let ticker = ticker(Duration::from_nanos(1));
let extra: Rc<dyn Stream<u64>> = SimpleCounter {
trigger: ticker.clone(),
n: 0,
}
.into_stream();
let values = Rc::new(RefCell::new(Vec::<u64>::new()));
let node = Rc::new(RefCell::new(RecycleNode {
trigger: ticker.clone(),
extra: extra.clone(),
added: false,
values: values.clone(),
}));
Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(3),
)
.run()
.unwrap();
assert_eq!(*values.borrow(), vec![1u64, 2], "recycle values");
}
#[test]
fn add_upstream_passive_does_not_trigger() {
use std::time::Duration;
struct PassiveNode {
trigger: Rc<dyn Node>,
extra: Rc<dyn Stream<u64>>,
added: bool,
trigger_ticks: u64,
extra_observed: Rc<RefCell<Vec<u64>>>,
}
impl MutableNode for PassiveNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if !self.added {
state.add_upstream(self.extra.clone().as_node(), false, false);
self.added = true;
}
if state.ticked(self.trigger.clone()) {
self.trigger_ticks += 1;
self.extra_observed
.borrow_mut()
.push(self.extra.peek_value());
}
Ok(true)
}
}
let ticker = ticker(Duration::from_nanos(1));
let extra: Rc<dyn Stream<u64>> = SimpleCounter {
trigger: ticker.clone(),
n: 0,
}
.into_stream();
let extra_observed = Rc::new(RefCell::new(Vec::<u64>::new()));
let node = Rc::new(RefCell::new(PassiveNode {
trigger: ticker.clone(),
extra: extra.clone(),
added: false,
trigger_ticks: 0,
extra_observed: extra_observed.clone(),
}));
Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(4),
)
.run()
.unwrap();
assert_eq!(extra_observed.borrow().len(), 4);
}
#[test]
fn seen_prevents_double_setup_start() {
use std::time::Duration;
struct CountedNode {
ticker: Rc<dyn Node>,
setup_count: Rc<RefCell<u32>>,
start_count: Rc<RefCell<u32>>,
}
impl MutableNode for CountedNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.ticker.clone()], vec![])
}
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
Ok(true)
}
fn setup(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
*self.setup_count.borrow_mut() += 1;
Ok(())
}
fn start(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
*self.start_count.borrow_mut() += 1;
Ok(())
}
}
struct DoublerNode {
trigger: Rc<dyn Node>,
counted: Rc<dyn Node>,
add_count: u64,
}
impl MutableNode for DoublerNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if self.add_count < 2 {
state.add_upstream(self.counted.clone(), true, false);
self.add_count += 1;
}
Ok(true)
}
}
let setup_count = Rc::new(RefCell::new(0u32));
let start_count = Rc::new(RefCell::new(0u32));
let ticker = ticker(Duration::from_nanos(1));
let counted = Rc::new(RefCell::new(CountedNode {
ticker: ticker.clone(),
setup_count: setup_count.clone(),
start_count: start_count.clone(),
}));
let doubler = Rc::new(RefCell::new(DoublerNode {
trigger: ticker.clone(),
counted: counted.clone().as_node(),
add_count: 0,
}));
Graph::new(
vec![doubler.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(3),
)
.run()
.unwrap();
assert_eq!(*setup_count.borrow(), 1, "setup called once");
assert_eq!(*start_count.borrow(), 1, "start called once");
}
#[test]
fn layer_resort_after_deep_upstream_addition() {
use std::time::Duration;
let ticker = ticker(Duration::from_nanos(1));
let depth1 = ticker.count(); let depth2 = depth1.map(|x: u64| x * 2);
struct LayerCheckNode {
trigger: Rc<dyn Node>,
deep: Rc<dyn Stream<u64>>,
added: bool,
sum: u64,
}
impl MutableNode for LayerCheckNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if !self.added {
state.add_upstream(self.deep.clone().as_node(), true, false);
self.added = true;
}
if state.ticked(self.deep.clone().as_node()) {
self.sum += self.deep.peek_value();
}
Ok(true)
}
}
let node = Rc::new(RefCell::new(LayerCheckNode {
trigger: ticker.clone(),
deep: depth2.clone(),
added: false,
sum: 0,
}));
let mut graph = Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(4),
);
graph.run().unwrap();
let node_idx = graph
.state
.node_index(node.clone().as_node())
.expect("node in graph");
let depth2_idx = graph
.state
.node_index(depth2.clone().as_node())
.expect("depth2 in graph");
assert!(
graph.state.nodes[node_idx].layer > graph.state.nodes[depth2_idx].layer,
"aggregation layer should be > depth2 layer after fix_layers"
);
}
#[test]
fn add_and_remove_in_same_cycle_node_is_wired() {
use std::time::Duration;
struct AdderRemoverNode {
trigger: Rc<dyn Node>,
extra: Rc<dyn Node>,
done: bool,
extra_ticks: Rc<RefCell<u64>>,
}
impl MutableNode for AdderRemoverNode {
fn upstreams(&self) -> UpStreams {
UpStreams::new(vec![self.trigger.clone()], vec![])
}
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
if !self.done {
state.add_upstream(self.extra.clone(), true, false);
state.remove_node(self.extra.clone());
self.done = true;
}
if state.ticked(self.extra.clone()) {
*self.extra_ticks.borrow_mut() += 1;
}
Ok(true)
}
}
let ticker = ticker(Duration::from_nanos(1));
let extra: Rc<dyn Stream<u64>> = SimpleCounter {
trigger: ticker.clone(),
n: 0,
}
.into_stream();
let extra_ticks = Rc::new(RefCell::new(0u64));
let node = Rc::new(RefCell::new(AdderRemoverNode {
trigger: ticker.clone(),
extra: extra.clone().as_node(),
done: false,
extra_ticks: extra_ticks.clone(),
}));
Graph::new(
vec![node.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(4),
)
.run()
.unwrap();
assert_eq!(*extra_ticks.borrow(), 3, "extra_ticks");
}
#[test]
fn remove_node_that_never_cycled_calls_lifecycle() {
use std::time::Duration;
let clk = ticker(Duration::from_nanos(1));
let never = Rc::new(RefCell::new(CallBackStream::<i32>::new()));
let cycle_count = Rc::new(RefCell::new(0u64));
let stop_count = Rc::new(RefCell::new(0u64));
let teardown_count = Rc::new(RefCell::new(0u64));
let extra = Rc::new(RefCell::new(LifecycleCounterNode {
trigger: never.clone().as_node(),
cycle_count: cycle_count.clone(),
stop_count: stop_count.clone(),
teardown_count: teardown_count.clone(),
}));
let adder = Rc::new(RefCell::new(DynAdderNode {
trigger: clk.clone(),
extra: extra.clone().as_node(),
add_after: 1,
cycle_count: 0,
extra_ticks: Rc::new(RefCell::new(0u64)),
stop_count: Rc::new(RefCell::new(0u64)),
teardown_count: Rc::new(RefCell::new(0u64)),
}));
let remover = Rc::new(RefCell::new(DynRemoverNode {
trigger: clk.clone(),
target: extra.clone().as_node(),
remove_after: 3,
cycle_count: 0,
}));
Graph::new(
vec![adder.clone().as_node(), remover.clone().as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(5),
)
.run()
.unwrap();
assert_eq!(*cycle_count.borrow(), 0, "extra cycle() never called");
assert_eq!(*stop_count.borrow(), 1, "stop called once on removal");
assert_eq!(
*teardown_count.borrow(),
1,
"teardown called once on removal"
);
}
} }