use std::sync::Arc;
use amaru_consensus::{
effects::{ResourceBlockValidation, ResourceHeaderValidation, ResourceMeter},
errors::ConsensusError,
headers_tree::HeadersTreeState,
stages::{pull::SyncTracker, select_chain::SelectChain, validate_header::ValidateHeader},
};
use amaru_kernel::{
BlockHeader, ConsensusParameters, EraHistory, GlobalParameters, HeaderHash, ORIGIN_HASH, Peer, Transaction,
};
use amaru_mempool::InMemoryMempool;
use amaru_metrics::METRICS_METER_NAME;
use amaru_network::connection::TokioConnections;
use amaru_ouroboros::{ChainStore, ConnectionsResource, HasStakeDistribution, ResourceMempool};
use amaru_protocols::{
manager::ManagerMessage,
store_effects::{ResourceHeaderStore, ResourceParameters},
};
use amaru_stores::rocksdb::consensus::RocksDBStore;
use anyhow::{Context, anyhow};
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use pure_stage::{
StageGraph, StageRef,
tokio::{TokioBuilder, TokioRunning},
};
use tokio::runtime::Handle;
use tracing::info;
use crate::stages::{
build_stage_graph::build_stage_graph,
config::{Config, StoreType},
ledger::Ledger,
};
pub fn build_and_run_node(config: Config, meter_provider: Option<SdkMeterProvider>) -> anyhow::Result<TokioRunning> {
let mut stage_builder = TokioBuilder::default();
build_node(&config, config.network.into(), meter_provider, &mut stage_builder)?;
Ok(stage_builder.run(Handle::current().clone()))
}
pub fn build_node(
config: &Config,
global_parameters: &GlobalParameters,
meter_provider: Option<SdkMeterProvider>,
stage_builder: &mut impl StageGraph,
) -> anyhow::Result<StageRef<ManagerMessage>> {
let era_history: &EraHistory = config.network.into();
let ledger = Ledger::new(config, era_history.clone(), global_parameters.clone())
.context("Failed to create ledger. Have you bootstrapped your node?")?;
let tip = ledger.get_tip();
info!(
tip.hash = %tip.hash(),
tip.slot = u64::from(tip.slot_or_default()),
"build_node"
);
let chain_store = initialize_chain_store(config, &tip.hash())?;
let tip = chain_store.load_tip(&tip.hash()).context("the ledger tip must exist in the chain store")?;
let peers = config.upstream_peers.iter().map(|p| Peer::new(p)).collect();
let chain_selector = make_chain_selector(chain_store.clone(), &peers, global_parameters.consensus_security_param)?;
let validate_header =
make_validate_header(global_parameters, era_history, chain_store.clone(), ledger.get_stake_distribution()?);
register_resources(stage_builder, chain_store, global_parameters, ledger, validate_header, meter_provider);
let manager_stage =
build_stage_graph(config, era_history, chain_selector, SyncTracker::new(&peers), tip, stage_builder);
stage_builder
.preload(&manager_stage, [ManagerMessage::Listen(config.listen_address()?)])
.map_err(|e| anyhow!(format!("{e:?}")))?;
for peer in &config.upstream_peers {
let Ok(_) = stage_builder.preload(&manager_stage, [ManagerMessage::AddPeer(Peer::new(peer))]) else {
tracing::warn!("supplied more peers than can be initially connected");
break;
};
}
Ok(manager_stage)
}
fn register_resources(
stage_graph: &mut impl StageGraph,
chain_store: Arc<dyn ChainStore<BlockHeader>>,
global_parameters: &GlobalParameters,
ledger: Ledger,
validate_header: ValidateHeader,
meter_provider: Option<SdkMeterProvider>,
) {
stage_graph.resources().put::<ResourceHeaderStore>(chain_store);
stage_graph.resources().put::<ResourceParameters>(global_parameters.clone());
stage_graph.resources().put::<ResourceBlockValidation>(ledger.get_block_validation());
stage_graph.resources().put::<ResourceHeaderValidation>(Arc::new(validate_header));
stage_graph.resources().put::<ConnectionsResource>(Arc::new(TokioConnections::new(65535)));
stage_graph.resources().put::<ResourceMempool<Transaction>>(Arc::new(InMemoryMempool::default()));
if let Some(provider) = meter_provider {
let meter = provider.meter(METRICS_METER_NAME);
stage_graph.resources().put::<ResourceMeter>(Arc::new(meter));
};
}
fn initialize_chain_store(config: &Config, tip: &HeaderHash) -> anyhow::Result<Arc<dyn ChainStore<BlockHeader>>> {
let chain_store: Arc<dyn ChainStore<BlockHeader>> = match config.chain_store {
StoreType::InMem(ref chain_store) => chain_store.clone(),
StoreType::RocksDb(ref rocks_db_config) if config.migrate_chain_db => {
Arc::new(RocksDBStore::open_and_migrate(rocks_db_config)?)
}
StoreType::RocksDb(ref rocks_db_config) => Arc::new(RocksDBStore::open(rocks_db_config)?),
};
if *tip != ORIGIN_HASH && chain_store.load_header(tip).is_none() {
anyhow::bail!("Tip {} not found in chain database '{}'", tip, config.chain_store)
};
chain_store.set_anchor_hash(tip)?;
chain_store.set_best_chain_hash(tip)?;
Ok(chain_store)
}
fn make_chain_selector(
chain_store: Arc<dyn ChainStore<BlockHeader>>,
peers: &Vec<Peer>,
consensus_security_parameter: usize,
) -> Result<SelectChain, ConsensusError> {
let mut tree_state = HeadersTreeState::new(consensus_security_parameter);
let anchor = chain_store.get_anchor_hash();
for peer in peers {
tree_state.initialize_peer(chain_store.clone(), peer, &anchor)?;
}
Ok(SelectChain::new(tree_state))
}
fn make_validate_header(
global_parameters: &GlobalParameters,
era_history: &EraHistory,
chain_store: Arc<dyn ChainStore<BlockHeader>>,
stake_distribution: Arc<dyn HasStakeDistribution>,
) -> ValidateHeader {
let consensus_parameters =
Arc::new(ConsensusParameters::new(global_parameters.clone(), era_history, Default::default()));
ValidateHeader::new(consensus_parameters, chain_store, stake_distribution)
}