use std::collections::BTreeSet;
use anyhow::anyhow;
use pure_stage::{Resources, simulation::RandStdRng};
use crate::tests::node::Node;
#[derive(Debug)]
pub struct Nodes {
nodes: Vec<Node>,
}
impl<'a> IntoIterator for &'a Nodes {
type Item = &'a Node;
type IntoIter = std::slice::Iter<'a, Node>;
fn into_iter(self) -> Self::IntoIter {
self.nodes.iter()
}
}
impl<'a> IntoIterator for &'a mut Nodes {
type Item = &'a mut Node;
type IntoIter = std::slice::IterMut<'a, Node>;
fn into_iter(self) -> Self::IntoIter {
self.nodes.iter_mut()
}
}
impl Nodes {
pub fn new(nodes: Vec<Node>) -> Self {
Self { nodes }
}
pub fn resources(&self) -> Vec<Resources> {
self.nodes.iter().map(|n| n.resources()).collect()
}
pub fn initialize(&mut self, rng: &mut RandStdRng) {
let mut initialized_nodes = BTreeSet::<String>::new();
loop {
for node in self.nodes.iter_mut() {
node.advance_inputs();
}
let Some(node) = self.pick_random_active_node(rng) else {
tracing::info!("All nodes terminated");
break;
};
node.run_until_blocked();
if node.is_initialized() && !initialized_nodes.contains(node.node_id()) {
initialized_nodes.insert(node.node_id().to_string());
}
if initialized_nodes.len() == self.nodes.len() {
break;
}
}
}
pub fn run(&mut self, rng: &mut RandStdRng) {
let max_steps = 1_000_000;
for step in 0..max_steps {
for node in self.nodes.iter_mut() {
node.enqueue_pending_action();
node.advance_inputs();
}
if self.nodes.iter().all(|n| !n.has_pending_actions()) {
tracing::info!("All actions consumed at step {step}, entering drain phase");
break;
}
let Some(node) = self.pick_random_active_node(rng) else {
tracing::info!("All nodes terminated at step {step}");
return;
};
node.run_effect();
}
self.drain(rng);
}
fn drain(&mut self, rng: &mut RandStdRng) {
let max_drain_steps = 1_000_000;
let mut steps_without_effect = 0;
let patience = 1_000;
for step in 0..max_drain_steps {
for node in self.nodes.iter_mut() {
node.advance_inputs();
}
let Some(node) = self.pick_random_active_node(rng) else {
tracing::info!("drain[{step}]: all nodes terminated");
return;
};
let had_runnable = node.has_runnable_effects();
node.run_effect();
if had_runnable || node.has_runnable_effects() {
steps_without_effect = 0;
} else {
steps_without_effect += 1;
}
if steps_without_effect >= patience {
tracing::info!("drain[{step}]: no effects for {patience} steps, drain complete");
return;
}
}
tracing::info!("Drain phase completed after {max_drain_steps} steps");
}
pub fn get_node_under_test(&mut self) -> anyhow::Result<&mut Node> {
self.nodes.iter_mut().find(|node| node.is_node_under_test()).ok_or(anyhow!("No node under test found"))
}
pub fn to_vec(self) -> Vec<Node> {
self.nodes
}
pub fn nodes(&self) -> &Vec<Node> {
&self.nodes
}
fn pick_random_active_node(&mut self, rng: &mut RandStdRng) -> Option<&mut Node> {
let active_indices: Vec<usize> =
self.nodes.iter().enumerate().filter(|(_, n)| !n.is_terminated()).map(|(i, _)| i).collect();
if active_indices.is_empty() {
return None;
}
let idx = active_indices[rng.random_range(0..active_indices.len())];
Some(&mut self.nodes[idx])
}
}