use std::sync::Arc;
use amaru_consensus::{
effects::{ResourceBlockValidation, ResourceHeaderValidation},
headers_tree::data_generation::Action,
};
use amaru_kernel::{BlockHeight, GlobalParameters, IsHeader, Tip, Transaction};
use amaru_ouroboros::{
ChainStore, ConnectionsResource, ResourceMempool,
can_validate_blocks::mock::{MockCanValidateBlocks, MockCanValidateHeaders},
};
use amaru_protocols::{manager::ManagerMessage, store_effects::Store};
use anyhow::anyhow;
use pure_stage::{
Effects, StageGraph, StageRef, TryInStage,
simulation::{RandStdRng, SimulationBuilder},
};
use tracing_subscriber::EnvFilter;
use crate::{
stages::build_node::build_node,
tests::{
configuration::NodeTestConfig, in_memory_connection_provider::InMemoryConnectionProvider, node::Node,
nodes::Nodes,
},
};
pub fn create_nodes(rng: &mut RandStdRng, configs: Vec<NodeTestConfig>) -> anyhow::Result<Nodes> {
let connections: ConnectionsResource = Arc::new(InMemoryConnectionProvider::default());
let mut nodes = vec![];
for config in configs {
let _span = config.enter_span();
let mut stage_graph = SimulationBuilder::default()
.with_seed(config.seed)
.with_mailbox_size(10000)
.with_trace_buffer(config.trace_buffer.clone());
let config = config.with_connections(connections.clone());
let (manager_stage, actions_stage) = create_node(&config, &mut stage_graph)?;
nodes.push(Node::new(config, stage_graph.run(), manager_stage, actions_stage));
}
tracing::info!("Initializing nodes");
let mut nodes = Nodes::new(nodes);
nodes.initialize(rng);
Ok(nodes)
}
pub fn create_node(
node_config: &NodeTestConfig,
stage_graph: &mut impl StageGraph,
) -> anyhow::Result<(StageRef<ManagerMessage>, StageRef<Action>)> {
let config = node_config.make_node_configuration()?;
let global_parameters: &GlobalParameters = config.network.into();
let mut global_parameters = global_parameters.clone();
global_parameters.consensus_security_param = node_config.chain_length;
let manager_stage = build_node(&config, &global_parameters, None, stage_graph)
.map_err(|e| anyhow!("Cannot build node.\nThe node config is\n{:?}\n\nThe error is {e:?}", node_config))?;
let actions_stage = stage_graph.stage("actions", actions_stage);
let actions_stage = stage_graph.wire_up(actions_stage, (manager_stage.clone(), node_config.seed));
set_resources(node_config, stage_graph)?;
Ok((manager_stage, actions_stage.without_state()))
}
pub fn start_responder(
simulation_builder: &mut impl StageGraph,
connections: ConnectionsResource,
) -> anyhow::Result<()> {
let configuration = NodeTestConfig::responder().with_connections(connections);
create_node(&configuration, simulation_builder)?;
Ok(())
}
pub fn start_initiator(
simulation_builder: &mut SimulationBuilder,
connections: ConnectionsResource,
) -> anyhow::Result<()> {
let configuration = NodeTestConfig::initiator().with_connections(connections);
create_node(&configuration, simulation_builder)?;
Ok(())
}
type ActionsState = (StageRef<ManagerMessage>, u64);
async fn actions_stage(state: ActionsState, msg: Action, eff: Effects<Action>) -> ActionsState {
let (manager_stage, seed) = &state;
tracing::info!("Received action: {msg:?}");
let store = Store::new(eff.clone());
let tip = match &msg {
Action::RollForward { header, .. } => {
tracing::info!(point = %header.point(), "rollforward");
store
.store_header(header)
.or_terminate(&eff, |e| async move {
tracing::error!("Cannot store the header {}: {e:?}. The seed is {seed}", &header);
})
.await;
store
.roll_forward_chain(&header.point())
.or_terminate(&eff, |e| async move {
tracing::error!("Cannot rollforward chain: {e:?}. The seed is {seed}");
})
.await;
Tip::new(header.point(), BlockHeight::from(header.slot().as_u64()))
}
Action::Rollback { rollback_point, .. } => {
tracing::info!(point = %rollback_point, "rollback");
store
.rollback_chain(rollback_point)
.or_terminate(&eff, |e| async move {
tracing::error!("Cannot rollback the chain to {}: {e:?}. The seed is {seed}", &rollback_point,);
})
.await;
Tip::new(*rollback_point, BlockHeight::from(rollback_point.slot_or_default().as_u64()))
}
};
store
.set_best_chain_hash(&msg.hash())
.or_terminate(&eff, |e| async move {
tracing::error!("Cannot set the best chain: {e:?}. The seed is {seed}");
})
.await;
eff.send(manager_stage, ManagerMessage::NewTip(tip)).await;
state
}
fn set_resources(node_config: &NodeTestConfig, stage_graph: &mut impl StageGraph) -> anyhow::Result<()> {
stage_graph.resources().put::<ResourceBlockValidation>(Arc::new(MockCanValidateBlocks));
stage_graph.resources().put::<ResourceHeaderValidation>(Arc::new(MockCanValidateHeaders));
stage_graph.resources().put::<ResourceMempool<Transaction>>(node_config.mempool.clone());
stage_graph.resources().put(node_config.connections.clone());
Ok(())
}
pub fn setup_logging(enable: bool) {
if !enable {
return;
};
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).with_test_writer().try_init();
}