mod add_peer;
mod ban_peer;
mod block_timing;
mod check_db;
mod check_for_updates;
mod create_tls_certs;
mod dial_peer;
mod discover_peer;
mod fetch_all_orphan_headers;
mod get_block;
mod get_chain_metadata;
mod get_db_stats;
mod get_mempool_state;
mod get_mempool_stats;
mod get_network_stats;
mod get_peer;
mod get_state_info;
mod header_stats;
mod list_bad_blocks;
mod list_banned_peers;
mod list_connections;
mod list_headers;
mod list_peers;
mod list_reorgs;
mod list_validator_nodes;
mod period_stats;
mod ping_peer;
mod print_env;
mod quit;
mod reset_offline_peers;
mod rewind_blockchain;
mod search_kernel;
mod search_output;
mod search_payref;
mod search_utxo;
mod status;
mod test_peer_liveness;
mod unban_all_peers;
mod version;
mod watch_command;
mod whoami;
use std::{
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use anyhow::{Error, anyhow};
use async_trait::async_trait;
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
use strum::{EnumVariantNames, VariantNames};
use tari_comms::{
CommsNode,
NodeIdentity,
peer_manager::{Peer, PeerManagerError},
protocol::rpc::RpcServerHandle,
};
use tari_comms_dht::{DhtDiscoveryRequester, MetricsCollectorHandle};
use tari_core::{
base_node::{LocalNodeCommsInterface, state_machine_service::states::StatusInfo},
chain_storage::{LMDBDatabase, async_db::AsyncBlockchainDb},
consensus::BaseNodeConsensusManager,
mempool::service::LocalMempoolService,
};
use tari_node_components::blocks::ChainHeader;
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_shutdown::Shutdown;
use tokio::{sync::watch, time};
pub use watch_command::WatchCommand;
use crate::{
ApplicationConfig,
builder::BaseNodeContext,
commands::{nom_parser::ParsedCommand, parser::FromHex},
};
#[derive(Debug, Parser)]
pub struct Args {
#[clap(subcommand)]
pub command: Command,
}
impl Args {
pub fn is_quit(&self) -> bool {
matches!(self.command, Command::Quit(_) | Command::Exit(_))
}
}
#[derive(Debug, Subcommand, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum Command {
Version(version::Args),
CheckForUpdates(check_for_updates::Args),
Status(status::Args),
GetChainMetadata(get_chain_metadata::Args),
GetDbStats(get_db_stats::Args),
GetPeer(get_peer::Args),
ListPeers(list_peers::Args),
DialPeer(dial_peer::Args),
PingPeer(ping_peer::Args),
ResetOfflinePeers(reset_offline_peers::Args),
RewindBlockchain(rewind_blockchain::Args),
AddPeer(add_peer::ArgsAddPeer),
TestPeerLiveness(test_peer_liveness::ArgsTestPeerLiveness),
BanPeer(ban_peer::ArgsBan),
UnbanPeer(ban_peer::ArgsUnban),
UnbanAllPeers(unban_all_peers::Args),
ListBannedPeers(list_banned_peers::Args),
ListConnections(list_connections::Args),
ListHeaders(list_headers::Args),
CheckDb(check_db::Args),
PeriodStats(period_stats::Args),
HeaderStats(header_stats::Args),
BlockTiming(block_timing::Args),
ListReorgs(list_reorgs::Args),
FetchAllOrphanHeaders(fetch_all_orphan_headers::Args),
ListBadBlocks(list_bad_blocks::Args),
DiscoverPeer(discover_peer::Args),
GetBlock(get_block::Args),
SearchUtxo(search_utxo::Args),
SearchOutput(search_output::Args),
SearchPayref(search_payref::Args),
SearchKernel(search_kernel::Args),
GetMempoolStats(get_mempool_stats::Args),
GetMempoolState(get_mempool_state::Args),
GetMempoolTx(get_mempool_state::ArgsTx),
Whoami(whoami::Args),
GetStateInfo(get_state_info::Args),
GetNetworkStats(get_network_stats::Args),
ListValidatorNodes(list_validator_nodes::Args),
CreateTlsCerts(create_tls_certs::Args),
Quit(quit::Args),
Exit(quit::Args),
Watch(watch_command::Args),
PrintEnv(print_env::Args),
}
impl Command {
pub fn variants() -> Vec<String> {
Command::VARIANTS.iter().map(|s| s.to_string()).collect()
}
}
#[async_trait]
pub trait HandleCommand<T> {
async fn handle_command(&mut self, args: T) -> Result<(), Error>;
}
pub struct CommandContext {
pub config: Arc<ApplicationConfig>,
consensus_rules: BaseNodeConsensusManager,
blockchain_db: AsyncBlockchainDb<LMDBDatabase>,
discovery_service: DhtDiscoveryRequester,
dht_metrics_collector: MetricsCollectorHandle,
rpc_server: RpcServerHandle,
base_node_identity: Arc<NodeIdentity>,
comms: CommsNode,
liveness: LivenessHandle,
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
state_machine_info: watch::Receiver<StatusInfo>,
pub software_updater: SoftwareUpdaterHandle,
last_time_full: Instant,
pub shutdown: Shutdown,
pub config_property_overrides: Vec<(String, String)>,
}
impl CommandContext {
pub fn new(ctx: &BaseNodeContext, shutdown: Shutdown, config_property_overrides: Vec<(String, String)>) -> Self {
Self {
config: ctx.config(),
consensus_rules: ctx.consensus_rules().clone(),
blockchain_db: ctx.blockchain_db().into(),
discovery_service: ctx.base_node_dht().discovery_service_requester(),
dht_metrics_collector: ctx.base_node_dht().metrics_collector(),
rpc_server: ctx.rpc_server(),
base_node_identity: ctx.base_node_identity(),
comms: ctx.base_node_comms().clone(),
liveness: ctx.liveness(),
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
state_machine_info: ctx.get_state_machine_info_channel(),
software_updater: ctx.software_updater(),
last_time_full: Instant::now(),
shutdown,
config_property_overrides,
}
}
pub async fn handle_command_str(&mut self, line: &str) -> Result<Option<WatchCommand>, Error> {
let args: Args = line.parse()?;
if let Command::Watch(command) = args.command {
Ok(Some(command))
} else {
let time_out = match args.command {
Command::Version(_) |
Command::Whoami(_) |
Command::CheckForUpdates(_) |
Command::AddPeer(_) |
Command::BanPeer(_) |
Command::UnbanAllPeers(_) |
Command::UnbanPeer(_) |
Command::GetPeer(_) |
Command::ResetOfflinePeers(_) |
Command::DialPeer(_) |
Command::PingPeer(_) |
Command::DiscoverPeer(_) |
Command::ListPeers(_) |
Command::ListBannedPeers(_) |
Command::ListConnections(_) |
Command::GetNetworkStats(_) |
Command::BlockTiming(_) |
Command::GetChainMetadata(_) |
Command::GetDbStats(_) |
Command::GetStateInfo(_) |
Command::ListReorgs(_) |
Command::FetchAllOrphanHeaders(_) |
Command::ListBadBlocks(_) |
Command::GetBlock(_) |
Command::ListHeaders(_) |
Command::HeaderStats(_) |
Command::SearchPayref(_) |
Command::SearchKernel(_) |
Command::GetMempoolStats(_) |
Command::GetMempoolState(_) |
Command::GetMempoolTx(_) |
Command::Status(_) |
Command::Watch(_) |
Command::ListValidatorNodes(_) |
Command::CreateTlsCerts(_) |
Command::PrintEnv(_) |
Command::Quit(_) |
Command::SearchOutput(_) |
Command::Exit(_) => 30,
Command::TestPeerLiveness(_) => 240,
Command::CheckDb(_) | Command::PeriodStats(_) | Command::RewindBlockchain(_) => 600,
Command::SearchUtxo(_) => 1200,
};
let fut = self.handle_command(args.command);
if let Err(e) = time::timeout(Duration::from_secs(time_out), fut).await? {
return Err(Error::msg(format!("{e} ({time_out} s)")));
};
Ok(None)
}
}
}
impl FromStr for Args {
type Err = Error;
fn from_str(line: &str) -> Result<Self, Self::Err> {
let args = ParsedCommand::parse(line)?;
let matches = Args::command().no_binary_name(true).try_get_matches_from(args)?;
let command = Args::from_arg_matches(&matches)?;
Ok(command)
}
}
#[async_trait]
impl HandleCommand<Command> for CommandContext {
async fn handle_command(&mut self, command: Command) -> Result<(), Error> {
match command {
Command::Version(args) => self.handle_command(args).await,
Command::CheckForUpdates(args) => self.handle_command(args).await,
Command::Status(args) => self.handle_command(args).await,
Command::GetChainMetadata(args) => self.handle_command(args).await,
Command::GetDbStats(args) => self.handle_command(args).await,
Command::GetPeer(args) => self.handle_command(args).await,
Command::TestPeerLiveness(args) => self.handle_command(args).await,
Command::GetStateInfo(args) => self.handle_command(args).await,
Command::GetNetworkStats(args) => self.handle_command(args).await,
Command::ListPeers(args) => self.handle_command(args).await,
Command::DialPeer(args) => self.handle_command(args).await,
Command::PingPeer(args) => self.handle_command(args).await,
Command::AddPeer(args) => self.handle_command(args).await,
Command::BanPeer(args) => self.handle_command(args).await,
Command::UnbanPeer(args) => self.handle_command(args).await,
Command::ResetOfflinePeers(args) => self.handle_command(args).await,
Command::RewindBlockchain(args) => self.handle_command(args).await,
Command::UnbanAllPeers(args) => self.handle_command(args).await,
Command::ListHeaders(args) => self.handle_command(args).await,
Command::CheckDb(args) => self.handle_command(args).await,
Command::PeriodStats(args) => self.handle_command(args).await,
Command::HeaderStats(args) => self.handle_command(args).await,
Command::BlockTiming(args) => self.handle_command(args).await,
Command::ListReorgs(args) => self.handle_command(args).await,
Command::FetchAllOrphanHeaders(args) => self.handle_command(args).await,
Command::ListBadBlocks(args) => self.handle_command(args).await,
Command::DiscoverPeer(args) => self.handle_command(args).await,
Command::GetBlock(args) => self.handle_command(args).await,
Command::SearchUtxo(args) => self.handle_command(args).await,
Command::SearchOutput(args) => self.handle_command(args).await,
Command::SearchPayref(args) => self.handle_command(args).await,
Command::SearchKernel(args) => self.handle_command(args).await,
Command::ListConnections(args) => self.handle_command(args).await,
Command::GetMempoolStats(args) => self.handle_command(args).await,
Command::GetMempoolState(args) => self.handle_command(args).await,
Command::GetMempoolTx(args) => self.handle_command(args).await,
Command::Whoami(args) => self.handle_command(args).await,
Command::ListBannedPeers(args) => self.handle_command(args).await,
Command::Quit(args) | Command::Exit(args) => self.handle_command(args).await,
Command::Watch(args) => self.handle_command(args).await,
Command::ListValidatorNodes(args) => self.handle_command(args).await,
Command::CreateTlsCerts(args) => self.handle_command(args).await,
Command::PrintEnv(args) => self.handle_command(args).await,
}
}
}
impl CommandContext {
async fn fetch_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
self.comms.peer_manager().get_banned_peers().await
}
async fn get_chain_headers(&self, start: u64, end: Option<u64>) -> Result<Vec<ChainHeader>, Error> {
let blockchain_db = &self.blockchain_db;
match end {
Some(end) => blockchain_db.fetch_chain_headers(start..=end).await.map_err(Into::into),
None => {
let from_tip = start;
if from_tip == 0 {
return Ok(Vec::new());
}
let tip = blockchain_db.fetch_tip_header().await?.height();
blockchain_db
.fetch_chain_headers(tip.saturating_sub(from_tip - 1)..=tip)
.await
.map_err(Into::into)
},
}
}
}
#[derive(Debug, Clone)]
pub enum TypeOrHex<T> {
Type(T),
Hex(FromHex<Vec<u8>>),
}
impl<T> FromStr for TypeOrHex<T>
where T: FromStr
{
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(t) = T::from_str(s) {
Ok(Self::Type(t))
} else {
FromHex::from_str(s).map(Self::Hex).map_err(|_| {
anyhow!(
"Argument was not a valid string for {} or hex value",
std::any::type_name::<T>()
)
})
}
}
}