use std::sync::Arc;
use log::*;
use minotari_app_grpc::tari_rpc::readiness_status::State as ReadinessState;
use tari_common::{
configuration::Network,
exit_codes::{ExitCode, ExitError},
};
use tari_comms::{CommsNode, peer_manager::NodeIdentity, protocol::rpc::RpcServerHandle};
use tari_comms_dht::Dht;
use tari_core::{
base_node::{
LocalNodeCommsInterface,
StateMachineHandle,
state_machine_service::states::StatusInfo,
tari_pulse_service::TariPulseHandle,
},
chain_storage::{
BlockchainDatabase,
ChainStorageError,
LMDBDatabase,
Validators,
create_lmdb_database_with_stats_channel,
},
consensus::BaseNodeConsensusManager,
mempool::{Mempool, service::LocalMempoolService},
proof_of_work::randomx_factory::RandomXFactory,
validation::{
DifficultyCalculator,
block_body::{BlockBodyFullValidator, BlockBodyInternalConsistencyValidator},
header::HeaderFullValidator,
transaction::TransactionFullValidator,
},
};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tari_transaction_components::crypto_factories::CryptoFactories;
use tokio::sync::watch;
use crate::{
ApplicationConfig,
DatabaseType,
bootstrap::BaseNodeBootstrapper,
consensus_constants_tracker::ConsensusConstantsTracker,
grpc::readiness_grpc_server::ReadinessStatusHandler,
};
const LOG_TARGET: &str = "c::bn::initialization";
pub struct BaseNodeContext {
config: Arc<ApplicationConfig>,
consensus_rules: BaseNodeConsensusManager,
blockchain_db: BlockchainDatabase<LMDBDatabase>,
base_node_comms: CommsNode,
base_node_dht: Dht,
base_node_handles: ServiceHandles,
}
impl BaseNodeContext {
pub fn start(&self) -> Result<(), ChainStorageError> {
self.blockchain_db.start()
}
pub async fn wait_for_shutdown(self) {
self.state_machine().shutdown_signal().wait().await;
info!(target: LOG_TARGET, "Waiting for communications stack shutdown");
self.base_node_comms.wait_until_shutdown().await;
info!(target: LOG_TARGET, "Communications stack has shutdown");
}
pub fn config(&self) -> Arc<ApplicationConfig> {
self.config.clone()
}
pub fn local_node(&self) -> LocalNodeCommsInterface {
self.base_node_handles.expect_handle()
}
pub fn local_mempool(&self) -> LocalMempoolService {
self.base_node_handles.expect_handle()
}
pub fn base_node_comms(&self) -> &CommsNode {
&self.base_node_comms
}
pub fn liveness(&self) -> LivenessHandle {
self.base_node_handles.expect_handle()
}
pub fn state_machine(&self) -> StateMachineHandle {
self.base_node_handles.expect_handle()
}
pub fn base_node_identity(&self) -> Arc<NodeIdentity> {
self.base_node_comms.node_identity()
}
pub fn base_node_dht(&self) -> &Dht {
&self.base_node_dht
}
pub fn software_updater(&self) -> SoftwareUpdaterHandle {
self.base_node_handles.expect_handle()
}
pub fn tari_pulse(&self) -> TariPulseHandle {
self.base_node_handles.expect_handle()
}
pub fn rpc_server(&self) -> RpcServerHandle {
self.base_node_handles.expect_handle()
}
pub fn blockchain_db(&self) -> BlockchainDatabase<LMDBDatabase> {
self.blockchain_db.clone()
}
pub fn network(&self) -> Network {
self.config.base_node.network
}
pub fn consensus_rules(&self) -> &BaseNodeConsensusManager {
&self.consensus_rules
}
pub fn get_state_machine_info_channel(&self) -> watch::Receiver<StatusInfo> {
self.base_node_handles
.expect_handle::<StateMachineHandle>()
.get_status_info_watch()
}
pub fn get_report_grpc_error(&self) -> bool {
self.config.base_node.report_grpc_error
}
}
pub async fn configure_and_initialize_node(
app_config: Arc<ApplicationConfig>,
node_identity: Arc<NodeIdentity>,
interrupt_signal: ShutdownSignal,
readiness_status_handler: &ReadinessStatusHandler,
) -> Result<BaseNodeContext, ExitError> {
let result = match &app_config.base_node.db_type {
DatabaseType::Lmdb => {
readiness_status_handler.send_readiness_status(ReadinessState::BuildingContextBlockchain);
let rules = BaseNodeConsensusManager::builder(app_config.base_node.network)
.build()
.map_err(|e| ExitError::new(ExitCode::UnknownError, e))?;
let backend = create_lmdb_database_with_stats_channel(
app_config.base_node.lmdb_path.as_path(),
app_config.base_node.lmdb.clone(),
rules,
Some(readiness_status_handler.lmdb_migration_status_tx.clone()),
)
.map_err(|e| ExitError::new(ExitCode::DatabaseError, e))?;
readiness_status_handler.send_readiness_status(ReadinessState::BuildingContextBootstrap);
build_node_context(backend, app_config, node_identity, interrupt_signal).await?
},
};
Ok(result)
}
async fn build_node_context(
backend: LMDBDatabase,
app_config: Arc<ApplicationConfig>,
base_node_identity: Arc<NodeIdentity>,
interrupt_signal: ShutdownSignal,
) -> Result<BaseNodeContext, ExitError> {
trace!(
target: LOG_TARGET,
"Building base node context for {} network", app_config.base_node.network
);
let rules = BaseNodeConsensusManager::builder(app_config.base_node.network)
.build()
.map_err(|e| ExitError::new(ExitCode::UnknownError, e))?;
let factories = CryptoFactories::default();
let randomx_factory = RandomXFactory::new(app_config.base_node.max_randomx_vms);
let difficulty_calculator = DifficultyCalculator::new(rules.clone(), randomx_factory.clone());
let validators = Validators::new(
BlockBodyFullValidator::new(rules.clone(), app_config.base_node.bypass_range_proof_verification),
HeaderFullValidator::new(rules.clone(), difficulty_calculator.clone()),
BlockBodyInternalConsistencyValidator::new(
rules.clone(),
app_config.base_node.bypass_range_proof_verification,
factories.clone(),
),
);
let blockchain_db = BlockchainDatabase::new(
backend,
rules.clone(),
validators,
app_config.base_node.storage,
difficulty_calculator,
)
.map_err(|err| {
if let ChainStorageError::DatabaseResyncRequired(reason) = err {
ExitError::new(
ExitCode::DbInconsistentState,
format!("You may need to re-sync your database because {reason}"),
)
} else {
ExitError::new(ExitCode::DatabaseError, err)
}
})?;
let consensus_tracker = ConsensusConstantsTracker::new(&app_config.base_node.data_dir);
let current_constants = rules.consensus_constants_vec();
let current_height = blockchain_db
.get_chain_metadata()
.map(|o| o.best_block_height())
.unwrap_or_default();
if let Err(error_msg) = consensus_tracker.check_for_changes(current_constants, current_height) {
error!(target: LOG_TARGET, "{}", error_msg);
eprintln!("\n{}\n", error_msg);
}
let mempool_validator = TransactionFullValidator::new(
factories.clone(),
app_config.base_node.bypass_range_proof_verification,
blockchain_db.clone(),
rules.clone(),
);
let mempool = Mempool::new(
app_config.base_node.mempool.clone(),
rules.clone(),
Box::new(mempool_validator),
);
trace!(target: LOG_TARGET, "Creating base node state machine.");
let base_node_handles = BaseNodeBootstrapper {
app_config: &app_config,
node_identity: base_node_identity,
db: blockchain_db.clone(),
mempool,
rules: rules.clone(),
factories: factories.clone(),
randomx_factory,
interrupt_signal: interrupt_signal.clone(),
}
.bootstrap()
.await?;
let base_node_comms = base_node_handles.expect_handle::<CommsNode>();
let base_node_dht = base_node_handles.expect_handle::<Dht>();
Ok(BaseNodeContext {
config: app_config,
consensus_rules: rules,
blockchain_db,
base_node_comms,
base_node_dht,
base_node_handles,
})
}