minotari_app_utilities::deny_non_64_bit_archs!();
#[macro_use]
mod table;
mod bootstrap;
mod builder;
pub mod cli;
mod commands;
pub mod config;
mod consensus_constants_tracker;
mod grpc;
mod grpc_method;
#[cfg(feature = "metrics")]
mod metrics;
mod recovery;
mod utils;
mod http;
mod xmrig_proxy;
use std::{process, sync::Arc};
use commands::{cli_loop::CliLoop, command::CommandContext};
use futures::FutureExt;
pub use grpc_method::GrpcMethod;
pub use http::HttpCacheConfig;
use log::*;
use minotari_app_grpc::{
authentication::ServerAuthenticationInterceptor,
tari_rpc::{self, readiness_status::State as ReadinessState},
tls::identity::read_identity,
};
use minotari_app_utilities::common_cli_args::CommonCliArgs;
use tari_common::{
MAX_GRPC_MESSAGE_SIZE,
configuration::bootstrap::{ApplicationType, grpc_default_port},
exit_codes::{ExitCode, ExitError},
};
use tari_common_types::grpc_authentication::GrpcAuthentication;
use tari_comms::{NodeIdentity, multiaddr::Multiaddr, utils::multiaddr::multiaddr_to_socketaddr};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
task::{self, JoinHandle},
time::timeout,
};
use tonic::{
codegen::InterceptedService,
transport::{Identity, Server, ServerTlsConfig},
};
pub use crate::config::{ApplicationConfig, BaseNodeConfig, DatabaseType};
#[cfg(feature = "metrics")]
pub use crate::metrics::MetricsConfig;
use crate::{cli::Cli, grpc::readiness_grpc_server::ReadinessGrpcServer};
const LOG_TARGET: &str = "minotari::base_node::app";
pub async fn run_base_node(
shutdown: Shutdown,
node_identity: Arc<NodeIdentity>,
config: Arc<ApplicationConfig>,
) -> Result<(), ExitError> {
let data_dir = config.base_node.data_dir.clone();
let data_dir_str = data_dir.clone().into_os_string().into_string().unwrap();
let mut config_path = data_dir.clone();
config_path.push("config.toml");
let cli = Cli {
common: CommonCliArgs {
base_path: data_dir_str,
config: config_path.into_os_string().into_string().unwrap(),
log_config: None,
log_path: None,
network: None,
config_property_overrides: vec![],
},
init: true,
rebuild_db: false,
non_interactive_mode: true,
watch: None,
profile_with_tokio_console: false,
mining_enabled: false,
grpc_enabled: false,
grpc_address: None,
second_layer_grpc_enabled: false,
disable_splash_screen: true,
print_env: false,
libtor_data_dir: None,
};
run_base_node_with_cli(node_identity, config, cli, shutdown).await
}
#[allow(clippy::too_many_lines)]
pub async fn run_base_node_with_cli(
node_identity: Arc<NodeIdentity>,
config: Arc<ApplicationConfig>,
cli: Cli,
shutdown: Shutdown,
) -> Result<(), ExitError> {
#[cfg(feature = "metrics")]
{
metrics::install(
ApplicationType::BaseNode,
&node_identity,
&config.metrics,
shutdown.to_signal(),
);
}
let (grpc_address, auth, tls_identity) = prepare_grpc_params(&config).await?;
let (readiness_grpc_server, readiness_handler) = ReadinessGrpcServer::new();
let mut readiness_grpc_shutdown = Shutdown::new();
let mut readiness_task: Option<JoinHandle<Result<(), anyhow::Error>>> = None;
if config.base_node.grpc_enabled && config.base_node.grpc_readiness_enabled {
readiness_task = Some(task::spawn(run_grpc(
readiness_grpc_server,
grpc_address.clone(),
auth.clone(),
tls_identity.clone(),
readiness_grpc_shutdown.to_signal(),
)));
} else {
info!(target: LOG_TARGET, "Readiness gRPC server will not be started.");
}
readiness_handler.send_readiness_status(ReadinessState::StartingUp);
if cli.rebuild_db {
info!(target: LOG_TARGET, "Node is in recovery mode, entering recovery");
readiness_handler.send_readiness_status(ReadinessState::RecoveringPreparing);
recovery::initiate_recover_db(&config.base_node)?;
readiness_handler.send_readiness_status(ReadinessState::RecoveringRebuilding);
recovery::run_recovery(&config.base_node, readiness_handler)
.await
.map_err(|e| ExitError::new(ExitCode::RecoveryError, e))?;
return Ok(());
};
let ctx =
builder::configure_and_initialize_node(config.clone(), node_identity, shutdown.to_signal(), &readiness_handler)
.await?;
ctx.start()
.map_err(|e| ExitError::new(ExitCode::DatabaseError, format!("Could not start database.{e:?}")))?;
let context = CommandContext::new(&ctx, shutdown.clone(), cli.common.config_property_overrides.clone());
readiness_handler.send_readiness_status(ReadinessState::Ready);
readiness_grpc_shutdown.trigger();
if let Some(task) = readiness_task {
const READINESS_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
let abort_handle = task.abort_handle();
match timeout(READINESS_SHUTDOWN_TIMEOUT, task).await {
Ok(Ok(Ok(()))) => {
info!(target: LOG_TARGET, "Readiness gRPC server shutdown successfully");
},
Ok(Ok(Err(e))) => {
error!(target: LOG_TARGET, "Readiness gRPC server returned an error: {e}");
},
Ok(Err(e)) => {
error!(target: LOG_TARGET, "Readiness gRPC server task failed: {e}");
},
Err(_elapsed) => {
error!(
target: LOG_TARGET,
"Readiness gRPC server shutdown did not complete in {:?}; aborting it to release the listener",
READINESS_SHUTDOWN_TIMEOUT
);
abort_handle.abort();
tokio::task::yield_now().await;
},
}
}
let grpc = grpc::base_node_grpc_server::BaseNodeGrpcServer::from_base_node_context(&ctx, config.base_node.clone());
if config.base_node.grpc_enabled {
let grpc_handle = task::spawn(run_grpc(
grpc,
grpc_address.clone(),
auth.clone(),
tls_identity,
shutdown.to_signal(),
));
match timeout(std::time::Duration::from_millis(500), grpc_handle).await {
Err(_still_running) => {
},
Ok(Ok(Ok(()))) => {
info!(target: LOG_TARGET, "GRPC server returned during startup window");
},
Ok(Ok(Err(e))) => {
return Err(ExitError::new(
ExitCode::GrpcError,
format!("Failed to start gRPC server on {grpc_address}: {e}"),
));
},
Ok(Err(e)) => {
return Err(ExitError::new(
ExitCode::GrpcError,
format!("gRPC server task panicked during startup: {e}"),
));
},
}
}
if config.base_node.xmrig_proxy_enabled {
if config.base_node.xmrig_proxy_wallet_payment_address.is_empty() {
warn!(
target: LOG_TARGET,
"xmrig_proxy_enabled is true but xmrig_proxy_wallet_payment_address is not set. XMRig proxy will not \
start."
);
} else {
let proxy_wallet_address = config.base_node.xmrig_proxy_wallet_payment_address.clone();
let proxy_listener = config.base_node.xmrig_proxy_address.clone();
let proxy_extra = config.base_node.xmrig_proxy_coinbase_extra.as_bytes().to_vec();
let proxy_range_proof = config.base_node.xmrig_proxy_range_proof_type;
let signal = shutdown.to_signal();
let network = config.base_node.network;
let node_service = ctx.local_node();
let consensus_rules = ctx.consensus_rules().clone();
let state_machine = ctx.state_machine();
match proxy_wallet_address.parse::<tari_common_types::tari_address::TariAddress>() {
Ok(wallet_addr) if wallet_addr.network() == network => {
task::spawn(async move {
if let Err(e) = xmrig_proxy::run_xmrig_proxy(
node_service,
consensus_rules,
state_machine,
proxy_listener,
wallet_addr,
proxy_extra,
proxy_range_proof,
signal,
)
.await
{
error!(target: LOG_TARGET, "XMRig proxy error: {e}");
}
});
},
Ok(wallet_addr) => {
warn!(
target: LOG_TARGET,
"Invalid xmrig_proxy_wallet_payment_address: address network '{}' does not match node \
network '{network}'. XMRig proxy will not start.",
wallet_addr.network()
);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Invalid xmrig_proxy_wallet_payment_address: {e}. XMRig proxy will not start."
);
},
}
}
}
let main_loop = CliLoop::new(context, cli.watch, cli.non_interactive_mode);
if cli.non_interactive_mode {
println!("Node started in non-interactive mode (pid = {})", process::id());
} else {
info!(
target: LOG_TARGET,
"Node has been successfully configured and initialized. Starting CLI loop."
);
}
if !config.base_node.force_sync_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Force Sync Peers have been set! This node will only sync to the nodes in this set."
);
}
info!(target: LOG_TARGET, "Minotari base node has STARTED");
main_loop.cli_loop(cli.disable_splash_screen).await;
ctx.wait_for_shutdown().await;
println!("Goodbye!");
Ok(())
}
async fn run_grpc<T: tari_rpc::base_node_server::BaseNode>(
grpc: T,
grpc_address: Multiaddr,
auth_config: GrpcAuthentication,
tls_identity: Option<Identity>,
interrupt_signal: ShutdownSignal,
) -> Result<(), anyhow::Error> {
info!(target: LOG_TARGET, "Starting GRPC on {grpc_address}");
let grpc_address = multiaddr_to_socketaddr(&grpc_address)?;
let auth = ServerAuthenticationInterceptor::new(auth_config)
.ok_or(anyhow::anyhow!("Unable to prepare server gRPC authentication"))?;
let sized_server = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::new(grpc)
.max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)
.max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE);
let service = InterceptedService::new(sized_server, auth);
let mut server_builder = if let Some(identity) = tls_identity {
Server::builder().tls_config(ServerTlsConfig::new().identity(identity))?
} else {
Server::builder()
};
server_builder
.add_service(service)
.serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ()))
.await
.map_err(|err| {
error!(target: LOG_TARGET, "GRPC encountered an error: {err:?}");
err
})?;
info!(target: LOG_TARGET, "Stopping GRPC");
Ok(())
}
async fn prepare_grpc_params(
config: &ApplicationConfig,
) -> Result<(Multiaddr, GrpcAuthentication, Option<Identity>), ExitError> {
let grpc_address = config.base_node.grpc_address.clone().unwrap_or_else(|| {
let port = grpc_default_port(ApplicationType::BaseNode, config.base_node.network);
format!("/ip4/127.0.0.1/tcp/{port}").parse().unwrap()
});
let auth = config.base_node.grpc_authentication.clone();
let mut tls_identity = None;
if config.base_node.grpc_tls_enabled {
tls_identity = read_identity(config.base_node.config_dir.clone())
.await
.map(Some)
.map_err(|e| ExitError::new(ExitCode::TlsConfigurationError, e.to_string()))?;
}
Ok((grpc_address, auth, tls_identity))
}