minotari_node 5.4.0-pre.2

The tari full base node implementation
//  Copyright 2022, The Tari Project
//
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
//  following conditions are met:
//
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
//  disclaimer.
//
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
//  following disclaimer in the documentation and/or other materials provided with the distribution.
//
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
//  products derived from this software without specific prior written permission.
//
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod add_peer;
mod ban_peer;
mod block_timing;
mod check_db;
mod check_for_updates;
mod create_tls_certs;
mod dial_peer;
mod discover_peer;
mod fetch_all_orphan_headers;
mod get_block;
mod get_chain_metadata;
mod get_db_stats;
mod get_mempool_state;
mod get_mempool_stats;
mod get_network_stats;
mod get_peer;
mod get_state_info;
mod header_stats;
mod list_bad_blocks;
mod list_banned_peers;
mod list_connections;
mod list_headers;
mod list_peers;
mod list_reorgs;
mod list_validator_nodes;
mod period_stats;
mod ping_peer;
mod print_env;
mod quit;
mod reset_offline_peers;
mod rewind_blockchain;
mod search_kernel;
mod search_output;
mod search_payref;
mod search_utxo;
mod status;
mod test_peer_liveness;
mod unban_all_peers;
mod version;
mod watch_command;
mod whoami;

use std::{
    str::FromStr,
    sync::Arc,
    time::{Duration, Instant},
};

use anyhow::{Error, anyhow};
use async_trait::async_trait;
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
use strum::{EnumVariantNames, VariantNames};
use tari_comms::{
    CommsNode,
    NodeIdentity,
    peer_manager::{Peer, PeerManagerError},
    protocol::rpc::RpcServerHandle,
};
use tari_comms_dht::{DhtDiscoveryRequester, MetricsCollectorHandle};
use tari_core::{
    base_node::{LocalNodeCommsInterface, state_machine_service::states::StatusInfo},
    chain_storage::{LMDBDatabase, async_db::AsyncBlockchainDb},
    consensus::BaseNodeConsensusManager,
    mempool::service::LocalMempoolService,
};
use tari_node_components::blocks::ChainHeader;
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_shutdown::Shutdown;
use tokio::{sync::watch, time};
pub use watch_command::WatchCommand;

use crate::{
    ApplicationConfig,
    builder::BaseNodeContext,
    commands::{nom_parser::ParsedCommand, parser::FromHex},
};

#[derive(Debug, Parser)]
pub struct Args {
    #[clap(subcommand)]
    pub command: Command,
}

impl Args {
    pub fn is_quit(&self) -> bool {
        matches!(self.command, Command::Quit(_) | Command::Exit(_))
    }
}

#[derive(Debug, Subcommand, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum Command {
    Version(version::Args),
    CheckForUpdates(check_for_updates::Args),
    Status(status::Args),
    GetChainMetadata(get_chain_metadata::Args),
    GetDbStats(get_db_stats::Args),
    GetPeer(get_peer::Args),
    ListPeers(list_peers::Args),
    DialPeer(dial_peer::Args),
    PingPeer(ping_peer::Args),
    ResetOfflinePeers(reset_offline_peers::Args),
    RewindBlockchain(rewind_blockchain::Args),
    AddPeer(add_peer::ArgsAddPeer),
    TestPeerLiveness(test_peer_liveness::ArgsTestPeerLiveness),
    BanPeer(ban_peer::ArgsBan),
    UnbanPeer(ban_peer::ArgsUnban),
    UnbanAllPeers(unban_all_peers::Args),
    ListBannedPeers(list_banned_peers::Args),
    ListConnections(list_connections::Args),
    ListHeaders(list_headers::Args),
    CheckDb(check_db::Args),
    PeriodStats(period_stats::Args),
    HeaderStats(header_stats::Args),
    BlockTiming(block_timing::Args),
    ListReorgs(list_reorgs::Args),
    FetchAllOrphanHeaders(fetch_all_orphan_headers::Args),
    ListBadBlocks(list_bad_blocks::Args),
    DiscoverPeer(discover_peer::Args),
    GetBlock(get_block::Args),
    SearchUtxo(search_utxo::Args),
    SearchOutput(search_output::Args),
    SearchPayref(search_payref::Args),
    SearchKernel(search_kernel::Args),
    GetMempoolStats(get_mempool_stats::Args),
    GetMempoolState(get_mempool_state::Args),
    GetMempoolTx(get_mempool_state::ArgsTx),
    Whoami(whoami::Args),
    GetStateInfo(get_state_info::Args),
    GetNetworkStats(get_network_stats::Args),
    ListValidatorNodes(list_validator_nodes::Args),
    CreateTlsCerts(create_tls_certs::Args),
    Quit(quit::Args),
    Exit(quit::Args),
    Watch(watch_command::Args),
    PrintEnv(print_env::Args),
}

impl Command {
    pub fn variants() -> Vec<String> {
        Command::VARIANTS.iter().map(|s| s.to_string()).collect()
    }
}

#[async_trait]
pub trait HandleCommand<T> {
    async fn handle_command(&mut self, args: T) -> Result<(), Error>;
}

pub struct CommandContext {
    pub config: Arc<ApplicationConfig>,
    consensus_rules: BaseNodeConsensusManager,
    blockchain_db: AsyncBlockchainDb<LMDBDatabase>,
    discovery_service: DhtDiscoveryRequester,
    dht_metrics_collector: MetricsCollectorHandle,
    rpc_server: RpcServerHandle,
    base_node_identity: Arc<NodeIdentity>,
    comms: CommsNode,
    liveness: LivenessHandle,
    node_service: LocalNodeCommsInterface,
    mempool_service: LocalMempoolService,
    state_machine_info: watch::Receiver<StatusInfo>,
    pub software_updater: SoftwareUpdaterHandle,
    last_time_full: Instant,
    pub shutdown: Shutdown,
    /// Config property overrides provided via `-p` args on the command line.
    pub config_property_overrides: Vec<(String, String)>,
}

impl CommandContext {
    pub fn new(ctx: &BaseNodeContext, shutdown: Shutdown, config_property_overrides: Vec<(String, String)>) -> Self {
        Self {
            config: ctx.config(),
            consensus_rules: ctx.consensus_rules().clone(),
            blockchain_db: ctx.blockchain_db().into(),
            discovery_service: ctx.base_node_dht().discovery_service_requester(),
            dht_metrics_collector: ctx.base_node_dht().metrics_collector(),
            rpc_server: ctx.rpc_server(),
            base_node_identity: ctx.base_node_identity(),
            comms: ctx.base_node_comms().clone(),
            liveness: ctx.liveness(),
            node_service: ctx.local_node(),
            mempool_service: ctx.local_mempool(),
            state_machine_info: ctx.get_state_machine_info_channel(),
            software_updater: ctx.software_updater(),
            last_time_full: Instant::now(),
            shutdown,
            config_property_overrides,
        }
    }

    pub async fn handle_command_str(&mut self, line: &str) -> Result<Option<WatchCommand>, Error> {
        let args: Args = line.parse()?;
        if let Command::Watch(command) = args.command {
            Ok(Some(command))
        } else {
            let time_out = match args.command {
                // These commands should complete quickly, some of them like 'discover-peer' returns immediately
                // although the requested action can take a long time
                Command::Version(_) |
                Command::Whoami(_) |
                Command::CheckForUpdates(_) |
                Command::AddPeer(_) |
                Command::BanPeer(_) |
                Command::UnbanAllPeers(_) |
                Command::UnbanPeer(_) |
                Command::GetPeer(_) |
                Command::ResetOfflinePeers(_) |
                Command::DialPeer(_) |
                Command::PingPeer(_) |
                Command::DiscoverPeer(_) |
                Command::ListPeers(_) |
                Command::ListBannedPeers(_) |
                Command::ListConnections(_) |
                Command::GetNetworkStats(_) |
                Command::BlockTiming(_) |
                Command::GetChainMetadata(_) |
                Command::GetDbStats(_) |
                Command::GetStateInfo(_) |
                Command::ListReorgs(_) |
                Command::FetchAllOrphanHeaders(_) |
                Command::ListBadBlocks(_) |
                Command::GetBlock(_) |
                Command::ListHeaders(_) |
                Command::HeaderStats(_) |
                Command::SearchPayref(_) |
                Command::SearchKernel(_) |
                Command::GetMempoolStats(_) |
                Command::GetMempoolState(_) |
                Command::GetMempoolTx(_) |
                Command::Status(_) |
                Command::Watch(_) |
                Command::ListValidatorNodes(_) |
                Command::CreateTlsCerts(_) |
                Command::PrintEnv(_) |
                Command::Quit(_) |
                Command::SearchOutput(_) |
                Command::Exit(_) => 30,
                // This test can potentially take a longer time and should be allowed to run longer
                Command::TestPeerLiveness(_) => 240,
                // These commands involve intense blockchain db operations and needs a lot of time to complete
                Command::CheckDb(_) | Command::PeriodStats(_) | Command::RewindBlockchain(_) => 600,
                Command::SearchUtxo(_) => 1200,
            };
            let fut = self.handle_command(args.command);
            if let Err(e) = time::timeout(Duration::from_secs(time_out), fut).await? {
                return Err(Error::msg(format!("{e} ({time_out} s)")));
            };
            Ok(None)
        }
    }
}

impl FromStr for Args {
    type Err = Error;

    fn from_str(line: &str) -> Result<Self, Self::Err> {
        let args = ParsedCommand::parse(line)?;
        let matches = Args::command().no_binary_name(true).try_get_matches_from(args)?;
        let command = Args::from_arg_matches(&matches)?;
        Ok(command)
    }
}

#[async_trait]
impl HandleCommand<Command> for CommandContext {
    async fn handle_command(&mut self, command: Command) -> Result<(), Error> {
        match command {
            Command::Version(args) => self.handle_command(args).await,
            Command::CheckForUpdates(args) => self.handle_command(args).await,
            Command::Status(args) => self.handle_command(args).await,
            Command::GetChainMetadata(args) => self.handle_command(args).await,
            Command::GetDbStats(args) => self.handle_command(args).await,
            Command::GetPeer(args) => self.handle_command(args).await,
            Command::TestPeerLiveness(args) => self.handle_command(args).await,
            Command::GetStateInfo(args) => self.handle_command(args).await,
            Command::GetNetworkStats(args) => self.handle_command(args).await,
            Command::ListPeers(args) => self.handle_command(args).await,
            Command::DialPeer(args) => self.handle_command(args).await,
            Command::PingPeer(args) => self.handle_command(args).await,
            Command::AddPeer(args) => self.handle_command(args).await,
            Command::BanPeer(args) => self.handle_command(args).await,
            Command::UnbanPeer(args) => self.handle_command(args).await,
            Command::ResetOfflinePeers(args) => self.handle_command(args).await,
            Command::RewindBlockchain(args) => self.handle_command(args).await,
            Command::UnbanAllPeers(args) => self.handle_command(args).await,
            Command::ListHeaders(args) => self.handle_command(args).await,
            Command::CheckDb(args) => self.handle_command(args).await,
            Command::PeriodStats(args) => self.handle_command(args).await,
            Command::HeaderStats(args) => self.handle_command(args).await,
            Command::BlockTiming(args) => self.handle_command(args).await,
            Command::ListReorgs(args) => self.handle_command(args).await,
            Command::FetchAllOrphanHeaders(args) => self.handle_command(args).await,
            Command::ListBadBlocks(args) => self.handle_command(args).await,
            Command::DiscoverPeer(args) => self.handle_command(args).await,
            Command::GetBlock(args) => self.handle_command(args).await,
            Command::SearchUtxo(args) => self.handle_command(args).await,
            Command::SearchOutput(args) => self.handle_command(args).await,
            Command::SearchPayref(args) => self.handle_command(args).await,
            Command::SearchKernel(args) => self.handle_command(args).await,
            Command::ListConnections(args) => self.handle_command(args).await,
            Command::GetMempoolStats(args) => self.handle_command(args).await,
            Command::GetMempoolState(args) => self.handle_command(args).await,
            Command::GetMempoolTx(args) => self.handle_command(args).await,
            Command::Whoami(args) => self.handle_command(args).await,
            Command::ListBannedPeers(args) => self.handle_command(args).await,
            Command::Quit(args) | Command::Exit(args) => self.handle_command(args).await,
            Command::Watch(args) => self.handle_command(args).await,
            Command::ListValidatorNodes(args) => self.handle_command(args).await,
            Command::CreateTlsCerts(args) => self.handle_command(args).await,
            Command::PrintEnv(args) => self.handle_command(args).await,
        }
    }
}

impl CommandContext {
    async fn fetch_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
        self.comms.peer_manager().get_banned_peers().await
    }

    /// Function to process the get-headers command
    async fn get_chain_headers(&self, start: u64, end: Option<u64>) -> Result<Vec<ChainHeader>, Error> {
        let blockchain_db = &self.blockchain_db;
        match end {
            Some(end) => blockchain_db.fetch_chain_headers(start..=end).await.map_err(Into::into),
            None => {
                let from_tip = start;
                if from_tip == 0 {
                    return Ok(Vec::new());
                }
                let tip = blockchain_db.fetch_tip_header().await?.height();
                blockchain_db
                    .fetch_chain_headers(tip.saturating_sub(from_tip - 1)..=tip)
                    .await
                    .map_err(Into::into)
            },
        }
    }
}

#[derive(Debug, Clone)]
pub enum TypeOrHex<T> {
    Type(T),
    Hex(FromHex<Vec<u8>>),
}

impl<T> FromStr for TypeOrHex<T>
where T: FromStr
{
    type Err = Error;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        if let Ok(t) = T::from_str(s) {
            Ok(Self::Type(t))
        } else {
            FromHex::from_str(s).map(Self::Hex).map_err(|_| {
                anyhow!(
                    "Argument was not a valid string for {} or hex value",
                    std::any::type_name::<T>()
                )
            })
        }
    }
}