use std::{cmp, str::FromStr, sync::Arc};
use log::*;
use minotari_app_utilities::{consts, identity_management, identity_management::load_from_json};
use tari_common::{
configuration::bootstrap::ApplicationType,
exit_codes::{ExitCode, ExitError},
};
use tari_comms::{
NodeIdentity,
UnspawnedCommsNode,
multiaddr::{Error as MultiaddrError, Multiaddr},
peer_manager::{NodeId, Peer},
protocol::rpc::RpcServer,
tor::TorIdentity,
};
use tari_comms_dht::Dht;
use tari_core::{
base_node::{
self,
LocalNodeCommsInterface,
StateMachineHandle,
chain_metadata_service::ChainMetadataServiceInitializer,
service::BaseNodeServiceInitializer,
state_machine_service::initializer::BaseNodeStateMachineInitializer,
tari_pulse_service::TariPulseServiceInitializer,
},
chain_storage::{BlockchainBackend, BlockchainDatabase, async_db::AsyncBlockchainDb},
consensus::BaseNodeConsensusManager,
mempool::{self, Mempool, MempoolServiceInitializer, MempoolSyncInitializer, service::MempoolHandle},
proof_of_work::randomx_factory::RandomXFactory,
};
use tari_p2p::{
P2pConfig,
TransportType,
auto_update::SoftwareUpdaterService,
comms_connector::pubsub_connector,
initialization,
initialization::P2pInitializer,
peer_seeds::SeedPeer,
services::{
liveness::{LivenessInitializer, config::LivenessConfig},
monitor_peers::MonitorPeersInitializer,
},
};
use tari_service_framework::{ServiceHandles, StackBuilder};
use tari_shutdown::ShutdownSignal;
use tari_transaction_components::crypto_factories::CryptoFactories;
use crate::{
ApplicationConfig,
HttpCacheConfig,
config::WalletHttpServiceConfig,
http::create_base_node_wallet_http_server,
};
const LOG_TARGET: &str = "c::bn::initialization";
const BASE_NODE_BUFFER_MIN_SIZE: usize = 30;
pub struct BaseNodeBootstrapper<'a, B> {
pub app_config: &'a ApplicationConfig,
pub node_identity: Arc<NodeIdentity>,
pub db: BlockchainDatabase<B>,
pub mempool: Mempool,
pub rules: BaseNodeConsensusManager,
pub factories: CryptoFactories,
pub randomx_factory: RandomXFactory,
pub interrupt_signal: ShutdownSignal,
}
impl<B> BaseNodeBootstrapper<'_, B>
where B: BlockchainBackend + 'static
{
#[allow(clippy::too_many_lines)]
pub async fn bootstrap(self) -> Result<ServiceHandles, ExitError> {
let mut base_node_config = self.app_config.base_node.clone();
let mut p2p_config = self.app_config.base_node.p2p.clone();
let http_cache_cfg = self.app_config.base_node.http_wallet_query_service.http_cache.clone();
let peer_seeds = &self.app_config.peer_seeds;
let buf_size = cmp::max(BASE_NODE_BUFFER_MIN_SIZE, base_node_config.buffer_size);
let (publisher, peer_message_subscriptions) = pubsub_connector(buf_size);
let peer_message_subscriptions = Arc::new(peer_message_subscriptions);
let mempool_config = base_node_config.mempool.service.clone();
let force_sync_peers = base_node_config
.force_sync_peers
.iter()
.map(|s| SeedPeer::from_str(s))
.map(|r| r.map(Peer::from))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
let force_sync_node_ids: Vec<NodeId> = force_sync_peers.clone().into_iter().map(|p| p.node_id).collect();
let monitored_peers = base_node_config
.monitored_peers
.iter()
.map(|s| SeedPeer::from_str(s))
.map(|r| r.map(Peer::from))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
let mut monitored_node_ids: Vec<NodeId> = monitored_peers.clone().into_iter().map(|p| p.node_id).collect();
let mut total_monitored_ids = force_sync_node_ids.clone();
total_monitored_ids.append(&mut monitored_node_ids);
base_node_config.state_machine.blockchain_sync_config.forced_sync_peers = force_sync_node_ids.clone();
debug!(target: LOG_TARGET, "{} sync peer(s) configured", force_sync_node_ids.len());
let mempool_sync = MempoolSyncInitializer::new(mempool_config, self.mempool.clone());
let mempool_protocol = mempool_sync.get_protocol_extension();
let tor_identity = load_from_json(&base_node_config.tor_identity_file)
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
p2p_config.transport.tor.identity = tor_identity;
let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER);
let mut handles = StackBuilder::new(self.interrupt_signal.clone())
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
user_agent,
peer_seeds.clone(),
base_node_config.network,
self.node_identity.clone(),
publisher,
))
.add_initializer(SoftwareUpdaterService::new(
ApplicationType::BaseNode,
consts::APP_VERSION_NUMBER
.parse()
.expect("Unable to parse application version. Not valid semver"),
self.app_config.auto_update.clone(),
))
.add_initializer(BaseNodeServiceInitializer::new(
peer_message_subscriptions.clone(),
self.db.clone().into(),
self.mempool.clone(),
self.rules.clone(),
base_node_config.messaging_request_timeout,
self.randomx_factory.clone(),
base_node_config.state_machine.clone(),
))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
peer_message_subscriptions.clone(),
))
.add_initializer(mempool_sync)
.add_initializer(LivenessInitializer::new(
LivenessConfig {
auto_ping_interval: Some(base_node_config.metadata_auto_ping_interval),
monitored_peers: total_monitored_ids,
..Default::default()
},
peer_message_subscriptions,
))
.add_initializer(MonitorPeersInitializer::new(
base_node_config.metadata_auto_ping_interval,
))
.add_initializer(ChainMetadataServiceInitializer)
.add_initializer(BaseNodeStateMachineInitializer::new(
self.db.clone().into(),
base_node_config.state_machine.clone(),
self.rules,
self.factories,
self.randomx_factory,
self.app_config.base_node.bypass_range_proof_verification,
))
.add_initializer(TariPulseServiceInitializer::new(
base_node_config.tari_pulse_interval,
base_node_config.tari_pulse_health_check,
base_node_config.network,
))
.build()
.await?;
let comms = handles
.take_handle::<UnspawnedCommsNode>()
.expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode");
let comms = comms.add_protocol_extension(mempool_protocol);
let comms = Self::setup_rpc_services(
comms,
&handles,
self.db.into(),
&p2p_config,
&self.app_config.base_node.http_wallet_query_service,
self.interrupt_signal.clone(),
http_cache_cfg,
)
.await;
let comms = if p2p_config.transport.transport_type == TransportType::Tor {
let tor_id_path = base_node_config.tor_identity_file.clone();
let node_id_path = base_node_config.identity_file.clone();
let node_id = comms.node_identity();
let after_comms = move |identity: TorIdentity| {
let address_string = format!("/onion3/{}:{}", identity.service_id, identity.onion_port);
if let Err(e) = identity_management::save_as_json(&tor_id_path, &identity) {
error!(target: LOG_TARGET, "Failed to save tor identity{e:?}");
}
trace!(target: LOG_TARGET, "resave the tor identity {identity:?}");
let result: Result<Multiaddr, MultiaddrError> = address_string.parse();
if result.is_err() {
error!(target: LOG_TARGET, "Failed to parse tor identity as multiaddr{result:?}");
return;
}
let address = result.unwrap();
if !node_id.public_addresses().contains(&address) {
node_id.add_public_address(address);
}
if let Err(e) = identity_management::save_as_json(&node_id_path, &*node_id) {
error!(target: LOG_TARGET, "Failed to save node identity identity{e:?}");
}
};
initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await
} else {
let after_comms = |_identity| {};
initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await
};
let comms = comms.map_err(|e| e.to_exit_error())?;
match p2p_config.transport.transport_type {
TransportType::Tcp => {}, _ => {
identity_management::save_as_json(&base_node_config.identity_file, &*comms.node_identity())
.map_err(|e| ExitError::new(ExitCode::IdentityError, e))?;
},
};
for peer in force_sync_peers {
comms
.peer_manager()
.add_or_update_peer(peer)
.await
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
}
for peer in monitored_peers {
comms
.peer_manager()
.add_or_update_peer(peer)
.await
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
}
handles.register(comms);
Ok(handles)
}
async fn setup_rpc_services(
comms: UnspawnedCommsNode,
handles: &ServiceHandles,
db: AsyncBlockchainDb<B>,
p2p_config: &P2pConfig,
wallet_query_service_config: &WalletHttpServiceConfig,
shutdown_signal: ShutdownSignal,
http_cache_cfg: HttpCacheConfig,
) -> UnspawnedCommsNode {
let dht = handles.expect_handle::<Dht>();
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions)
.with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer)
.with_cull_oldest_peer_rpc_connection_on_full(p2p_config.cull_oldest_peer_rpc_connection_on_full)
.finish();
let rpc_server = rpc_server
.add_service(dht.rpc_service())
.add_service(base_node::create_base_node_sync_rpc_service(
db.clone(),
base_node_service,
))
.add_service(mempool::create_mempool_rpc_service(
handles.expect_handle::<MempoolHandle>(),
))
.add_service(base_node::rpc::create_base_node_wallet_rpc_service(
db.clone(),
handles.expect_handle::<MempoolHandle>(),
handles.expect_handle::<StateMachineHandle>(),
wallet_query_service_config.external_address.clone(),
));
handles.register(rpc_server.get_handle());
let wallet_http_server = create_base_node_wallet_http_server(
wallet_query_service_config.port,
wallet_query_service_config
.listen_ip
.unwrap_or("0.0.0.0".parse().expect("should not fail")),
db.clone(),
handles.expect_handle::<StateMachineHandle>(),
handles.expect_handle::<MempoolHandle>(),
shutdown_signal.clone(),
http_cache_cfg,
);
match wallet_http_server.start::<B>().await {
Ok(_) => {
handles.register(wallet_http_server);
},
Err(error) => {
error!(
target: LOG_TARGET,
"Failed to start wallet http server: {error:?}"
);
},
}
comms.add_protocol_extension(rpc_server)
}
}