use crate::queue::HashByRef;
use crate::queue::TimeQueue;
use crate::types::{NanoTime, Node};
use crossbeam::channel::{Receiver, SendError, Sender, select};
use lazy_static::lazy_static;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Error, Write};
use std::path::Path;
use std::rc::Rc;
#[cfg(feature = "async")]
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::vec;
lazy_static! {
static ref GRAPH_ID: Mutex<usize> = Mutex::new(0);
}
struct NodeData {
node: Rc<dyn Node>,
upstreams: Vec<(usize, bool)>,
downstreams: Vec<(usize, bool)>,
layer: usize,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RunMode {
RealTime,
HistoricalFrom(NanoTime),
}
impl RunMode {
pub fn start_time(&self) -> NanoTime {
match self {
RunMode::RealTime => NanoTime::now(),
RunMode::HistoricalFrom(start_time) => *start_time,
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum RunFor {
Duration(Duration),
Cycles(u32),
Forever,
}
impl RunFor {
pub fn done(&self, cycle: u32, elapsed: NanoTime) -> bool {
match self {
RunFor::Cycles(cycles) => cycle > *cycles,
RunFor::Duration(duration) => elapsed > NanoTime::from(*duration),
RunFor::Forever => false,
}
}
}
fn average_duration(duration: Duration, n: u32) -> Duration {
let avg_nanos = if n == 0 {
0
} else {
duration.as_nanos() / n as u128
};
Duration::from_nanos(avg_nanos as u64)
}
#[derive(Clone, Debug)]
pub(crate) struct ReadyNotifier {
pub node_index: usize,
pub sender: Sender<usize>,
}
impl ReadyNotifier {
pub fn notify(&self) -> anyhow::Result<(), SendError<usize>> {
self.sender.send(self.node_index)
}
}
pub struct GraphState {
time: NanoTime,
is_last_cycle: bool,
stop_requested: bool,
current_node_index: Option<usize>,
scheduled_callbacks: TimeQueue<usize>,
always_callbacks: Vec<usize>,
node_to_index: HashMap<HashByRef<dyn Node>, usize>,
node_ticked: Vec<bool>,
#[cfg(feature = "async")]
run_time: Arc<tokio::runtime::Runtime>,
run_mode: RunMode,
run_for: RunFor,
ready_notifier: Sender<usize>,
ready_callbacks: Receiver<usize>,
start_time: NanoTime,
id: usize,
nodes: Vec<NodeData>,
dirty_nodes_by_layer: Vec<Vec<usize>>,
node_dirty: Vec<bool>,
}
impl GraphState {
pub fn new(
#[cfg(feature = "async")] run_time: Arc<tokio::runtime::Runtime>,
run_mode: RunMode,
run_for: RunFor,
start_time: NanoTime,
) -> Self {
let (ready_notifier, ready_callbacks) = crossbeam::channel::unbounded();
let mut id = GRAPH_ID.lock().unwrap();
let slf = Self {
time: NanoTime::ZERO,
is_last_cycle: false,
stop_requested: false,
current_node_index: None,
scheduled_callbacks: TimeQueue::new(),
always_callbacks: Vec::new(),
node_to_index: HashMap::new(),
node_ticked: Vec::new(),
#[cfg(feature = "async")]
run_time,
ready_notifier,
run_mode,
run_for,
ready_callbacks,
start_time,
id: *id,
nodes: Vec::new(),
dirty_nodes_by_layer: Vec::new(),
node_dirty: Vec::new(),
};
*id += 1;
slf
}
pub fn time(&self) -> NanoTime {
self.time
}
pub fn elapsed(&self) -> NanoTime {
self.time - self.start_time
}
pub fn start_time(&self) -> NanoTime {
self.start_time
}
pub(crate) fn ready_notifier(&self) -> ReadyNotifier {
ReadyNotifier {
node_index: self.current_node_index.unwrap(),
sender: self.ready_notifier.clone(),
}
}
#[cfg(feature = "async")]
pub fn tokio_runtime(&self) -> Arc<tokio::runtime::Runtime> {
self.run_time.clone()
}
pub fn add_callback(&mut self, time: NanoTime) {
self.add_callback_for_node(self.current_node_index.unwrap(), time);
}
pub(crate) fn current_node_id(&self) -> usize {
self.current_node_index.unwrap()
}
pub fn always_callback(&mut self) {
let ix = self.current_node_index.unwrap();
self.always_callbacks.push(ix);
}
pub fn is_last_cycle(&self) -> bool {
self.is_last_cycle
}
pub fn request_stop(&mut self) {
self.stop_requested = true;
}
pub fn ticked(&self, node: Rc<dyn Node>) -> bool {
self.node_ticked[self.node_index(node).unwrap()]
}
#[allow(dead_code)]
pub(crate) fn node_index_ticked(&self, node_index: usize) -> bool {
self.node_ticked[node_index]
}
fn has_scheduled_callbacks(&self) -> bool {
!self.scheduled_callbacks.is_empty()
}
fn next_scheduled_time(&self) -> NanoTime {
if self.scheduled_callbacks.is_empty() {
NanoTime::MAX
} else {
self.scheduled_callbacks.next_time()
}
}
pub(crate) fn add_callback_for_node(&mut self, node_index: usize, time: NanoTime) {
self.scheduled_callbacks.push(node_index, time);
}
fn has_pending_scheduled_callbacks(&self) -> bool {
self.scheduled_callbacks.pending(self.time)
}
fn wait_ready_callback(&mut self, end_time: NanoTime) -> Option<usize> {
let now = NanoTime::now();
if now > end_time {
None
} else {
let timeout = u64::from(end_time - now);
select! {
recv(self.ready_callbacks) -> msg => {
Some(msg.unwrap())
},
default(Duration::from_nanos(timeout)) => {
None
}
}
}
}
pub fn node_index(&self, node: Rc<dyn Node>) -> Option<usize> {
let key = HashByRef::new(node.clone());
self.node_to_index.get(&key).copied()
}
fn reset(&mut self) {
for i in self.node_ticked.iter_mut() {
*i = false;
}
}
fn push_node(&mut self, node: Rc<dyn Node>) {
let index = self.node_ticked.len();
self.node_ticked.push(false);
self.node_to_index
.insert(HashByRef::new(node.clone()), index);
}
fn seen(&self, node: Rc<dyn Node>) -> bool {
self.node_to_index.contains_key(&HashByRef::new(node))
}
fn set_ticked(&mut self, index: usize) {
self.node_ticked[index] = true;
}
pub fn run_mode(&self) -> RunMode {
self.run_mode
}
pub fn run_for(&self) -> RunFor {
self.run_for
}
pub fn log(&self, level: log::Level, msg: &str) {
if log_enabled!(level)
&& let Some(ix) = self.current_node_index
{
let id = self.id;
let type_name = &self.nodes[ix].node.type_name();
log!(target: type_name, level, "[{id:},{ix:}]{msg:}");
}
}
pub(crate) fn mark_dirty(&mut self, index: usize) {
if !self.node_dirty[index] {
let layer = self.nodes[index].layer;
self.dirty_nodes_by_layer[layer].push(index);
self.node_dirty[index] = true;
}
}
}
pub struct Graph {
pub(crate) state: GraphState,
}
impl Graph {
pub fn new(root_nodes: Vec<Rc<dyn Node>>, run_mode: RunMode, run_for: RunFor) -> Graph {
#[cfg(feature = "async")]
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let start_time = run_mode.start_time();
let state = GraphState::new(
#[cfg(feature = "async")]
Arc::new(tokio_runtime),
run_mode,
run_for,
start_time,
);
let mut graph = Graph { state };
graph.initialise(root_nodes);
graph
}
#[cfg(feature = "async")]
pub fn new_with(
root_nodes: Vec<Rc<dyn Node>>,
tokio_runtime: Arc<tokio::runtime::Runtime>,
run_mode: RunMode,
run_for: RunFor,
start_time: NanoTime,
) -> Graph {
let state = GraphState::new(tokio_runtime, run_mode, run_for, start_time);
let mut graph = Graph { state };
graph.initialise(root_nodes);
graph
}
pub(crate) fn setup_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("setup", |node, state| node.setup(state))
}
pub(crate) fn start_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("start", |node, state| node.start(state))
}
pub(crate) fn stop_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("stop", |node, state| node.stop(state))
}
pub(crate) fn teardown_nodes(&mut self) -> anyhow::Result<()> {
self.apply_nodes("teardown", |node, state| node.teardown(state))
}
fn apply_nodes(
&mut self,
desc: &str,
func: impl Fn(Rc<dyn Node>, &mut GraphState) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let timer = Instant::now();
for ix in 0..self.state.nodes.len() {
let node = &self.state.nodes[ix];
self.state.current_node_index = Some(ix);
func(node.node.clone(), &mut self.state).map_err(|e| {
let context = self.format_context(ix, 3);
e.context(format!("Error during {desc} in node [{ix}]:\n{context}"))
})?;
self.state.current_node_index = None;
}
debug!(
"graph {:?}, {:?} took {:?} for {:?} nodes",
self.state.id,
desc,
timer.elapsed(),
self.state.nodes.len()
);
Ok(())
}
fn resolve_start_end(
&self,
start_time: &mut NanoTime,
end_time: &mut NanoTime,
end_cycle: &mut u32,
is_realtime: &mut bool,
) {
*end_time = NanoTime::MAX; *end_cycle = u32::MAX; match self.state.run_mode() {
RunMode::RealTime => {
*is_realtime = true;
*start_time = NanoTime::now();
}
RunMode::HistoricalFrom(t) => {
*is_realtime = false;
*start_time = t;
}
};
match self.state.run_for {
RunFor::Duration(duration) => {
*end_time = *start_time + duration.as_nanos() as u64;
debug!("end_time = {end_time}",);
}
RunFor::Cycles(cycle) => {
*end_cycle = cycle;
debug!("end_cycle = {end_cycle}",);
}
RunFor::Forever => {}
}
}
pub(crate) fn run_nodes(&mut self) -> anyhow::Result<()> {
let run_timer = Instant::now();
let mut cycles: u32 = 0;
let mut empty_cycles: u32 = 0;
let mut end_time = NanoTime::MAX;
let mut end_cycle = u32::MAX;
let mut is_realtime = false;
let mut start_time = NanoTime::ZERO;
self.resolve_start_end(
&mut start_time,
&mut end_time,
&mut end_cycle,
&mut is_realtime,
);
self.state.start_time = start_time;
loop {
if self.state.is_last_cycle && (self.state.time >= end_time || cycles >= end_cycle) {
debug!(
"Finished. {:}, {:}, {:}, {:}",
self.state.time >= end_time,
cycles >= end_cycle,
self.state.time,
end_time
);
break;
}
if !self.state.is_last_cycle && (cycles >= end_cycle - 1 || self.state.time >= end_time)
{
debug!("last cycle");
self.state.is_last_cycle = true;
}
if is_realtime {
let progressed = self.process_callbacks_realtime(end_time);
if !progressed {
empty_cycles += 1;
continue;
}
} else {
let progressed = self.process_callbacks_historical();
if !progressed {
debug!("Terminating early.");
break;
}
}
self.cycle()?;
cycles += 1;
debug!("cycles={cycles}");
if self.state.stop_requested {
debug!("Stop requested by node, terminating early.");
break;
}
}
let elapsed = run_timer.elapsed();
debug!("{empty_cycles} empty cycles");
debug!(
"Completed {:} cycles in {:?}. {:?} average.",
cycles,
run_timer.elapsed(),
average_duration(elapsed, cycles)
);
Ok(())
}
pub fn run(&mut self) -> anyhow::Result<()> {
self.setup_nodes()?;
self.start_nodes()?;
self.run_nodes()?;
self.stop_nodes()?;
self.teardown_nodes()?;
Ok(())
}
fn initialise(&mut self, root_nodes: Vec<Rc<dyn Node>>) -> &mut Graph {
let timer = Instant::now();
for node in root_nodes {
if !self.state.seen(node.clone()) {
self.initialise_node(&node);
}
}
let mut max_layer: i32 = -1;
for i in 0..self.state.nodes.len() {
max_layer = max(max_layer, self.state.nodes[i].layer.try_into().unwrap());
self.state.node_dirty.push(false);
for j in 0..self.state.nodes[i].upstreams.len() {
let (up_index, active) = self.state.nodes[i].upstreams[j];
self.state.nodes[up_index].downstreams.push((i, active));
}
}
for _ in 0..max_layer + 1 {
self.state.dirty_nodes_by_layer.push(vec![]);
}
debug!(
"{:} nodes wired in {:?}",
self.state.nodes.len(),
timer.elapsed()
);
self
}
fn initialise_upstreams(
&mut self,
upstreams: &[Rc<dyn Node>],
is_active: bool,
layer: &mut usize,
upstream_indexes: &mut Vec<(usize, bool)>,
) {
for upstream_node in upstreams {
let upstream_index = self.initialise_node(upstream_node);
upstream_indexes.push((upstream_index, is_active));
*layer = max(*layer, self.state.nodes[upstream_index].layer + 1);
}
}
fn initialise_node(&mut self, node: &Rc<dyn Node>) -> usize {
if self.state.seen(node.clone()) {
self.state.node_index(node.clone()).unwrap()
} else {
let mut layer = 0;
let mut upstream_indexes = vec![];
let upstreams = node.upstreams();
self.initialise_upstreams(&upstreams.active, true, &mut layer, &mut upstream_indexes);
self.initialise_upstreams(&upstreams.passive, false, &mut layer, &mut upstream_indexes);
let node_data = NodeData {
node: node.clone(),
upstreams: upstream_indexes,
downstreams: vec![],
layer,
};
let index = self.state.nodes.len();
self.state.push_node(node.clone());
self.state.nodes.push(node_data);
index
}
}
fn mark_dirty(&mut self, index: usize) {
if !self.state.node_dirty[index] {
let layer = self.state.nodes[index].layer;
self.state.dirty_nodes_by_layer[layer].push(index);
self.state.node_dirty[index] = true;
}
}
fn process_scheduled_callbacks(&mut self) -> bool {
let mut progressed = false;
for i in 0..self.state.always_callbacks.len() {
let ix = self.state.always_callbacks[i];
self.mark_dirty(ix);
progressed = true;
}
while self.state.has_pending_scheduled_callbacks() {
let ix = self.state.scheduled_callbacks.pop();
self.mark_dirty(ix);
progressed = true;
}
progressed
}
fn process_callbacks_historical(&mut self) -> bool {
if !self.state.ready_callbacks.is_empty() {
panic!("ready_callbacks are not supported in realtime mode.");
}
if self.state.has_scheduled_callbacks() {
self.state.time = self.state.next_scheduled_time();
}
self.process_scheduled_callbacks()
}
fn process_ready_callbacks(&mut self) -> bool {
let mut progressed = false;
while !self.state.ready_callbacks.is_empty() {
let ix = self.state.ready_callbacks.recv().unwrap();
self.mark_dirty(ix);
progressed = true;
}
progressed
}
fn process_callbacks_realtime(&mut self, end_time: NanoTime) -> bool {
let mut progressed = self.process_ready_callbacks();
if self.process_scheduled_callbacks() {
progressed = true;
}
if !progressed {
let wait_until = min(end_time, self.state.next_scheduled_time());
if let Some(ix) = self.state.wait_ready_callback(wait_until) {
self.mark_dirty(ix);
progressed = true;
}
}
self.state.time = NanoTime::now();
progressed
}
fn cycle(&mut self) -> anyhow::Result<()> {
for lyr in 0..self.state.dirty_nodes_by_layer.len() {
for i in 0..self.state.dirty_nodes_by_layer[lyr].len() {
let ix = self.state.dirty_nodes_by_layer[lyr][i];
self.cycle_node(ix)?;
}
}
self.reset();
Ok(())
}
fn cycle_node(&mut self, index: usize) -> anyhow::Result<()> {
let node = &self.state.nodes[index].node;
self.state.current_node_index = Some(index);
let result = node.clone().cycle(&mut self.state);
self.state.current_node_index = None;
let ticked = result.map_err(|e| {
let context = self.format_context(index, 3);
e.context(format!("Error in node [{index}]:\n{context}"))
})?;
if ticked {
self.state.set_ticked(index);
for i in 0..self.state.nodes[index].downstreams.len() {
let (downstream_index, active) = self.state.nodes[index].downstreams[i];
if active {
self.mark_dirty(downstream_index)
}
}
}
Ok(())
}
fn reset(&mut self) {
self.state.reset();
for layer in self.state.dirty_nodes_by_layer.iter_mut() {
layer.clear();
}
for i in self.state.node_dirty.iter_mut() {
*i = false;
}
}
fn format_context(&self, target_index: usize, range: usize) -> String {
let mut output = String::new();
let start = target_index.saturating_sub(range);
let end = (target_index + range + 1).min(self.state.nodes.len());
for i in start..end {
let node_data = &self.state.nodes[i];
let marker = if i == target_index { ">>> " } else { " " };
output.push_str(&format!("{marker}[{i:02}] "));
for _ in 0..node_data.layer {
output.push_str(" ");
}
output.push_str(&format!("{}\n", node_data.node));
}
output
}
pub fn print(&mut self) -> &mut Graph {
for (i, node_data) in self.state.nodes.iter().enumerate() {
print!("[{i:02}] ");
for _ in 0..node_data.layer {
print!(" ");
}
println!("{:}", node_data.node);
}
self
}
pub fn export(&self, path: &str) -> Result<(), Error> {
let path = Path::new(&path);
let mut output = File::create(path)?;
writeln!(output, "graph [")?;
for i in 0..self.state.nodes.len() {
writeln!(output, " node [")?;
writeln!(output, " id {i}")?;
writeln!(
output,
" label \"[{i}] {}\"",
self.state.nodes[i].node
)?;
writeln!(output, " graphics")?;
writeln!(output, " [")?;
writeln!(output, " w 200.0")?;
writeln!(output, " h 30.0")?;
writeln!(output, " ]")?;
writeln!(output, " ]")?;
}
for (i, node) in self.state.nodes.iter().enumerate() {
for downstream in node.downstreams.iter() {
let downstream_index = downstream.0;
writeln!(output, " edge [")?;
writeln!(output, " source {i}")?;
writeln!(output, " target {downstream_index}")?;
writeln!(output, " ]")?;
}
}
writeln!(output, "]")
}
}
#[cfg(test)]
mod tests {
use crate::graph::*;
use crate::nodes::*;
use crate::queue::ValueAt;
use crate::types::*;
use std::cell::RefCell;
use itertools::Itertools;
#[test]
fn historical_mode_works() {
let num_inputs = 7;
let inputs: Vec<Rc<RefCell<CallBackStream<i32>>>> = (0..num_inputs)
.map(|_| Rc::new(RefCell::new(CallBackStream::new())))
.collect();
let captured = inputs
.iter()
.map(|stream| stream.clone().as_stream().distinct())
.tree_reduce(
|a, b| add(&a, &b),
)
.unwrap()
.collect();
let mut expected: Vec<ValueAt<i32>> = vec![];
push_all(
&inputs,
ValueAt {
value: 1,
time: NanoTime::new(100),
},
);
expected.push(ValueAt {
value: 7,
time: NanoTime::new(100),
});
push_all(
&inputs,
ValueAt {
value: 1,
time: NanoTime::new(200),
},
);
push_first(
&inputs,
ValueAt {
value: 2,
time: NanoTime::new(300),
},
);
expected.push(ValueAt {
value: 8,
time: NanoTime::new(300),
});
push_first(
&inputs,
ValueAt {
value: 2,
time: NanoTime::new(400),
},
);
let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
Graph::new(vec![captured.clone().as_node()], run_mode, RunFor::Forever)
.print()
.run()
.unwrap();
let captured_data = captured.peek_value();
println!();
println!("captured_data {:?}", captured_data);
println!("expected {:?}", expected);
println!();
assert_eq!(captured_data, expected);
}
#[test]
fn error_context_shows_graph_structure() {
use std::time::Duration;
let mut stream: Rc<dyn Stream<u64>> = ticker(Duration::from_nanos(100)).count();
for _ in 0..10 {
stream = stream.map(|x: u64| x);
}
stream = stream.try_map(|x: u64| {
if x == 3 {
anyhow::bail!("intentional failure at count 3")
} else {
Ok(x)
}
});
for _ in 0..10 {
stream = stream.map(|x: u64| x);
}
let mut graph = Graph::new(
vec![stream.as_node()],
RunMode::HistoricalFrom(NanoTime::ZERO),
RunFor::Cycles(10),
);
graph.print();
let result = graph.run();
assert!(result.is_err(), "Expected error but got: {:?}", result);
let err_msg = format!("{:?}", result.unwrap_err());
let expected = r#"Error in node [14]:
[11] MapStream<u64, u64>
[12] MapStream<u64, u64>
[13] MapStream<u64, u64>
>>> [14] TryMapStream<u64, u64>
[15] MapStream<u64, u64>
[16] MapStream<u64, u64>
[17] MapStream<u64, u64>
Caused by:
intentional failure at count 3"#;
assert!(
err_msg.contains(expected),
"Error message mismatch.\n\nExpected to contain:\n{expected}\n\nActual:\n{err_msg}"
);
}
fn push_all(inputs: &[Rc<RefCell<CallBackStream<i32>>>], value_at: ValueAt<i32>) {
inputs
.iter()
.for_each(|input| input.borrow_mut().push(value_at.clone()));
}
fn push_first(inputs: &[Rc<RefCell<CallBackStream<i32>>>], value_at: ValueAt<i32>) {
inputs[0].borrow_mut().push(value_at);
}
}