minotari_node 5.4.0-pre.0

The tari full base node implementation
//  Copyright 2020, The Tari Project
//
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
//  following conditions are met:
//
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
//  disclaimer.
//
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
//  following disclaimer in the documentation and/or other materials provided with the distribution.
//
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
//  products derived from this software without specific prior written permission.
//
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{cmp, str::FromStr, sync::Arc};

use log::*;
use minotari_app_utilities::{consts, identity_management, identity_management::load_from_json};
use tari_common::{
    configuration::bootstrap::ApplicationType,
    exit_codes::{ExitCode, ExitError},
};
use tari_comms::{
    NodeIdentity,
    UnspawnedCommsNode,
    multiaddr::{Error as MultiaddrError, Multiaddr},
    peer_manager::{NodeId, Peer},
    protocol::rpc::RpcServer,
    tor::TorIdentity,
};
use tari_comms_dht::Dht;
use tari_core::{
    base_node::{
        self,
        LocalNodeCommsInterface,
        StateMachineHandle,
        chain_metadata_service::ChainMetadataServiceInitializer,
        service::BaseNodeServiceInitializer,
        state_machine_service::initializer::BaseNodeStateMachineInitializer,
        tari_pulse_service::TariPulseServiceInitializer,
    },
    chain_storage::{BlockchainBackend, BlockchainDatabase, async_db::AsyncBlockchainDb},
    consensus::BaseNodeConsensusManager,
    mempool::{self, Mempool, MempoolServiceInitializer, MempoolSyncInitializer, service::MempoolHandle},
    proof_of_work::randomx_factory::RandomXFactory,
};
use tari_p2p::{
    P2pConfig,
    TransportType,
    auto_update::SoftwareUpdaterService,
    comms_connector::pubsub_connector,
    initialization,
    initialization::P2pInitializer,
    peer_seeds::SeedPeer,
    services::{
        liveness::{LivenessInitializer, config::LivenessConfig},
        monitor_peers::MonitorPeersInitializer,
    },
};
use tari_service_framework::{ServiceHandles, StackBuilder};
use tari_shutdown::ShutdownSignal;
use tari_transaction_components::crypto_factories::CryptoFactories;

use crate::{
    ApplicationConfig,
    HttpCacheConfig,
    config::WalletHttpServiceConfig,
    http::create_base_node_wallet_http_server,
};

const LOG_TARGET: &str = "c::bn::initialization";
/// The minimum buffer size for the base node pubsub_connector channel
const BASE_NODE_BUFFER_MIN_SIZE: usize = 30;

pub struct BaseNodeBootstrapper<'a, B> {
    pub app_config: &'a ApplicationConfig,
    pub node_identity: Arc<NodeIdentity>,
    pub db: BlockchainDatabase<B>,
    pub mempool: Mempool,
    pub rules: BaseNodeConsensusManager,
    pub factories: CryptoFactories,
    pub randomx_factory: RandomXFactory,
    pub interrupt_signal: ShutdownSignal,
}

impl<B> BaseNodeBootstrapper<'_, B>
where B: BlockchainBackend + 'static
{
    #[allow(clippy::too_many_lines)]
    pub async fn bootstrap(self) -> Result<ServiceHandles, ExitError> {
        let mut base_node_config = self.app_config.base_node.clone();
        let mut p2p_config = self.app_config.base_node.p2p.clone();
        let http_cache_cfg = self.app_config.base_node.http_wallet_query_service.http_cache.clone();
        let peer_seeds = &self.app_config.peer_seeds;

        let buf_size = cmp::max(BASE_NODE_BUFFER_MIN_SIZE, base_node_config.buffer_size);
        let (publisher, peer_message_subscriptions) = pubsub_connector(buf_size);
        let peer_message_subscriptions = Arc::new(peer_message_subscriptions);
        let mempool_config = base_node_config.mempool.service.clone();

        let force_sync_peers = base_node_config
            .force_sync_peers
            .iter()
            .map(|s| SeedPeer::from_str(s))
            .map(|r| r.map(Peer::from))
            .collect::<Result<Vec<_>, _>>()
            .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
        let force_sync_node_ids: Vec<NodeId> = force_sync_peers.clone().into_iter().map(|p| p.node_id).collect();

        let monitored_peers = base_node_config
            .monitored_peers
            .iter()
            .map(|s| SeedPeer::from_str(s))
            .map(|r| r.map(Peer::from))
            .collect::<Result<Vec<_>, _>>()
            .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
        let mut monitored_node_ids: Vec<NodeId> = monitored_peers.clone().into_iter().map(|p| p.node_id).collect();
        let mut total_monitored_ids = force_sync_node_ids.clone();
        total_monitored_ids.append(&mut monitored_node_ids);

        base_node_config.state_machine.blockchain_sync_config.forced_sync_peers = force_sync_node_ids.clone();

        debug!(target: LOG_TARGET, "{} sync peer(s) configured", force_sync_node_ids.len());

        let mempool_sync = MempoolSyncInitializer::new(mempool_config, self.mempool.clone());
        let mempool_protocol = mempool_sync.get_protocol_extension();

        let tor_identity = load_from_json(&base_node_config.tor_identity_file)
            .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
        p2p_config.transport.tor.identity = tor_identity;

        let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER);
        let mut handles = StackBuilder::new(self.interrupt_signal.clone())
            .add_initializer(P2pInitializer::new(
                p2p_config.clone(),
                user_agent,
                peer_seeds.clone(),
                base_node_config.network,
                self.node_identity.clone(),
                publisher,
            ))
            .add_initializer(SoftwareUpdaterService::new(
                ApplicationType::BaseNode,
                consts::APP_VERSION_NUMBER
                    .parse()
                    .expect("Unable to parse application version. Not valid semver"),
                self.app_config.auto_update.clone(),
            ))
            .add_initializer(BaseNodeServiceInitializer::new(
                peer_message_subscriptions.clone(),
                self.db.clone().into(),
                self.mempool.clone(),
                self.rules.clone(),
                base_node_config.messaging_request_timeout,
                self.randomx_factory.clone(),
                base_node_config.state_machine.clone(),
            ))
            .add_initializer(MempoolServiceInitializer::new(
                self.mempool.clone(),
                peer_message_subscriptions.clone(),
            ))
            .add_initializer(mempool_sync)
            .add_initializer(LivenessInitializer::new(
                LivenessConfig {
                    auto_ping_interval: Some(base_node_config.metadata_auto_ping_interval),
                    monitored_peers: total_monitored_ids,
                    ..Default::default()
                },
                peer_message_subscriptions,
            ))
            .add_initializer(MonitorPeersInitializer::new(
                base_node_config.metadata_auto_ping_interval,
            ))
            .add_initializer(ChainMetadataServiceInitializer)
            .add_initializer(BaseNodeStateMachineInitializer::new(
                self.db.clone().into(),
                base_node_config.state_machine.clone(),
                self.rules,
                self.factories,
                self.randomx_factory,
                self.app_config.base_node.bypass_range_proof_verification,
            ))
            .add_initializer(TariPulseServiceInitializer::new(
                base_node_config.tari_pulse_interval,
                base_node_config.tari_pulse_health_check,
                base_node_config.network,
            ))
            .build()
            .await?;

        let comms = handles
            .take_handle::<UnspawnedCommsNode>()
            .expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode");

        let comms = comms.add_protocol_extension(mempool_protocol);
        let comms = Self::setup_rpc_services(
            comms,
            &handles,
            self.db.into(),
            &p2p_config,
            &self.app_config.base_node.http_wallet_query_service,
            self.interrupt_signal.clone(),
            http_cache_cfg,
        )
        .await;

        let comms = if p2p_config.transport.transport_type == TransportType::Tor {
            let tor_id_path = base_node_config.tor_identity_file.clone();
            let node_id_path = base_node_config.identity_file.clone();
            let node_id = comms.node_identity();
            let after_comms = move |identity: TorIdentity| {
                let address_string = format!("/onion3/{}:{}", identity.service_id, identity.onion_port);
                if let Err(e) = identity_management::save_as_json(&tor_id_path, &identity) {
                    error!(target: LOG_TARGET, "Failed to save tor identity{e:?}");
                }
                trace!(target: LOG_TARGET, "resave the tor identity {identity:?}");
                let result: Result<Multiaddr, MultiaddrError> = address_string.parse();
                if result.is_err() {
                    error!(target: LOG_TARGET, "Failed to parse tor identity as multiaddr{result:?}");
                    return;
                }
                let address = result.unwrap();
                if !node_id.public_addresses().contains(&address) {
                    node_id.add_public_address(address);
                }
                if let Err(e) = identity_management::save_as_json(&node_id_path, &*node_id) {
                    error!(target: LOG_TARGET, "Failed to save node identity identity{e:?}");
                }
            };
            initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await
        } else {
            let after_comms = |_identity| {};
            initialization::spawn_comms_using_transport(comms, p2p_config.transport.clone(), after_comms).await
        };

        let comms = comms.map_err(|e| e.to_exit_error())?;
        // Save final node identity after comms has initialized. This is required because the public_address can be
        // changed by comms during initialization when using tor.
        match p2p_config.transport.transport_type {
            TransportType::Tcp => {}, // Do not overwrite TCP public_address in the base_node_id!
            _ => {
                identity_management::save_as_json(&base_node_config.identity_file, &*comms.node_identity())
                    .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?;
            },
        };
        for peer in force_sync_peers {
            comms
                .peer_manager()
                .add_or_update_peer(peer)
                .await
                .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
        }
        for peer in monitored_peers {
            comms
                .peer_manager()
                .add_or_update_peer(peer)
                .await
                .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
        }
        handles.register(comms);

        Ok(handles)
    }

    async fn setup_rpc_services(
        comms: UnspawnedCommsNode,
        handles: &ServiceHandles,
        db: AsyncBlockchainDb<B>,
        p2p_config: &P2pConfig,
        wallet_query_service_config: &WalletHttpServiceConfig,
        shutdown_signal: ShutdownSignal,
        http_cache_cfg: HttpCacheConfig,
    ) -> UnspawnedCommsNode {
        let dht = handles.expect_handle::<Dht>();
        let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
        let rpc_server = RpcServer::builder()
            .with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions)
            .with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer)
            .with_cull_oldest_peer_rpc_connection_on_full(p2p_config.cull_oldest_peer_rpc_connection_on_full)
            .finish();

        // Add your RPC services here ‍🏴‍☠️️☮️🌊
        let rpc_server = rpc_server
            .add_service(dht.rpc_service())
            .add_service(base_node::create_base_node_sync_rpc_service(
                db.clone(),
                base_node_service,
            ))
            .add_service(mempool::create_mempool_rpc_service(
                handles.expect_handle::<MempoolHandle>(),
            ))
            .add_service(base_node::rpc::create_base_node_wallet_rpc_service(
                db.clone(),
                handles.expect_handle::<MempoolHandle>(),
                handles.expect_handle::<StateMachineHandle>(),
                wallet_query_service_config.external_address.clone(),
            ));

        handles.register(rpc_server.get_handle());

        // wallet http server
        let wallet_http_server = create_base_node_wallet_http_server(
            wallet_query_service_config.port,
            wallet_query_service_config
                .listen_ip
                .unwrap_or("0.0.0.0".parse().expect("should not fail")),
            db.clone(),
            handles.expect_handle::<StateMachineHandle>(),
            handles.expect_handle::<MempoolHandle>(),
            shutdown_signal.clone(),
            http_cache_cfg,
        );
        match wallet_http_server.start::<B>().await {
            Ok(_) => {
                handles.register(wallet_http_server);
            },
            Err(error) => {
                error!(
                    target: LOG_TARGET,
                    "Failed to start wallet http server: {error:?}"
                );
            },
        }

        comms.add_protocol_extension(rpc_server)
    }
}