use std::{
collections::VecDeque,
fmt::{Debug, Formatter},
sync::Arc,
};
use amaru_consensus::headers_tree::data_generation::Action;
use amaru_protocols::{manager::ManagerMessage, mux::HandlerMessage, protocol::PROTO_N2N_CHAIN_SYNC};
use futures_util::FutureExt;
use parking_lot::Mutex;
use pure_stage::{
Effect, Resources, StageGraphRunning, StageRef,
simulation::{Blocked, SimulationRunning},
trace_buffer::TraceBuffer,
};
use crate::tests::configuration::{NodeTestConfig, NodeType};
pub struct Node {
config: NodeTestConfig,
running: SimulationRunning,
manager_stage: StageRef<ManagerMessage>,
actions_stage: StageRef<Action>,
pending_actions: VecDeque<Action>,
initialized: bool,
}
impl Node {
pub fn new(
config: NodeTestConfig,
running: SimulationRunning,
manager_stage: StageRef<ManagerMessage>,
actions_stage: StageRef<Action>,
) -> Self {
let actions = config.actions.clone();
let mut node = Self {
config,
running,
manager_stage,
actions_stage,
pending_actions: VecDeque::from(actions),
initialized: false,
};
node.install_breakpoint_for_initialization();
node
}
fn install_breakpoint_for_initialization(&mut self) {
self.running.breakpoint("chainsync_registered", move |eff| {
if let Effect::Send { msg, .. } = eff {
if let Ok(handler_msg) = msg.cast_ref::<HandlerMessage>() {
matches!(handler_msg, HandlerMessage::Registered(proto) if *proto == PROTO_N2N_CHAIN_SYNC.erase() || *proto == PROTO_N2N_CHAIN_SYNC.responder().erase())
} else {
false
}
} else {
false
}
});
}
#[expect(clippy::panic)]
pub fn run_until_blocked(&mut self) {
let _span = self.enter_span();
match self.running.run_until_blocked() {
Blocked::Breakpoint(name, effect) => {
if name.as_str() == "chainsync_registered" {
tracing::info!("Node {} chainsync registered", self.node_id());
self.initialized = true
}
self.running.clear_breakpoint("chainsync_registered");
self.running.handle_effect(effect);
}
Blocked::Sleeping { .. } => {
panic!("Node {} should not be sleeping", self.node_id());
}
Blocked::Deadlock(_) => {
panic!("Deadlock detected during initialization");
}
Blocked::Idle | Blocked::Busy { .. } => {}
Blocked::Terminated(_) => {}
}
}
#[expect(clippy::panic)]
pub fn run_effect(&mut self) {
let _span = self.enter_span();
match self.running.try_effect() {
Ok(effect) => {
self.running.handle_effect(effect);
}
Err(Blocked::Sleeping { .. }) => {
self.running.skip_to_next_wakeup(None);
}
Err(Blocked::Idle) | Err(Blocked::Busy { .. }) | Err(Blocked::Terminated(_)) => {}
Err(Blocked::Deadlock(_)) => {
panic!("Deadlock detected");
}
Err(Blocked::Breakpoint(name, ..)) => {
tracing::warn!("The breakpoint {name} is not handled");
}
}
}
pub fn advance_inputs(&mut self) {
let _span = self.enter_span();
self.running.await_external_effect().now_or_never();
self.running.receive_inputs();
}
pub fn is_initialized(&self) -> bool {
self.initialized
}
pub fn is_terminated(&self) -> bool {
self.running.is_terminated()
}
pub fn enqueue_pending_action(&mut self) {
if let Some(action) = self.pending_actions.pop_front() {
self.running.enqueue_msg(&self.actions_stage, [action]);
}
}
pub fn enqueue_manager_message(&mut self, msg: ManagerMessage) {
self.running.enqueue_msg(&self.manager_stage, [msg]);
}
pub fn node_id(&self) -> &str {
&self.config.listen_address
}
pub fn resources(&self) -> Resources {
self.running.resources().clone()
}
pub fn trace_buffer(&self) -> Arc<Mutex<TraceBuffer>> {
self.config.trace_buffer.clone()
}
pub fn has_pending_actions(&self) -> bool {
!self.pending_actions.is_empty()
}
pub fn has_runnable_effects(&self) -> bool {
self.running.has_runnable()
}
pub fn is_node_under_test(&self) -> bool {
self.config.node_type == NodeType::NodeUnderTest
}
pub fn is_upstream(&self) -> bool {
self.config.node_type == NodeType::UpstreamNode
}
pub fn is_downstream(&self) -> bool {
self.config.node_type == NodeType::DownstreamNode
}
fn enter_span(&self) -> tracing::span::EnteredSpan {
tracing::info_span!("node", id = %self.node_id()).entered()
}
}
impl Debug for Node {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node").field("node_id", &self.node_id()).field("config", &self.config).finish()
}
}