#[macro_use]
extern crate tracing;
mod log;
mod rpc_service;
mod subcommands;
use crate::log::{reset_critical_failure, set_critical_failure};
use crate::subcommands::EvmNetworkCommand;
use ant_bootstrap::{BootstrapCacheStore, InitialPeersConfig};
use ant_evm::{get_evm_network, EvmNetwork, RewardsAddress};
use ant_logging::metrics::init_metrics;
use ant_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use ant_node::utils::get_root_dir_and_keypair;
use ant_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use ant_protocol::{
node::get_antnode_root_dir,
node_rpc::{NodeCtrl, StopResult},
version,
};
use clap::{command, Parser};
use color_eyre::{eyre::eyre, Result};
use const_hex::traits::FromHex;
use libp2p::PeerId;
use std::{
env,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
process::Command,
time::Duration,
};
use tokio::{
runtime::Runtime,
sync::{broadcast::error::RecvError, mpsc},
time::sleep,
};
use tracing_appender::non_blocking::WorkerGuard;
#[derive(Debug, Clone)]
pub enum LogOutputDestArg {
Stdout,
DataDir,
Path(PathBuf),
}
impl std::fmt::Display for LogOutputDestArg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogOutputDestArg::Stdout => write!(f, "stdout"),
LogOutputDestArg::DataDir => write!(f, "data-dir"),
LogOutputDestArg::Path(path) => write!(f, "{}", path.display()),
}
}
}
pub fn parse_log_output(val: &str) -> Result<LogOutputDestArg> {
match val {
"stdout" => Ok(LogOutputDestArg::Stdout),
"data-dir" => Ok(LogOutputDestArg::DataDir),
value => Ok(LogOutputDestArg::Path(PathBuf::from(value))),
}
}
#[derive(Parser, Debug)]
#[command(disable_version_flag = true)]
#[clap(name = "antnode cli", version = env!("CARGO_PKG_VERSION"))]
struct Opt {
#[clap(long)]
alpha: bool,
#[clap(long)]
crate_version: bool,
#[cfg(feature = "open-metrics")]
#[clap(
long,
default_value_t = false,
required_if_eq("metrics_server_port", "0")
)]
enable_metrics_server: bool,
#[command(subcommand)]
evm_network: Option<EvmNetworkCommand>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::UNSPECIFIED))]
ip: IpAddr,
#[expect(rustdoc::invalid_html_tags)]
#[clap(long, default_value_t = LogOutputDestArg::DataDir, value_parser = parse_log_output, verbatim_doc_comment)]
log_output_dest: LogOutputDestArg,
#[clap(long, value_parser = LogFormat::parse_from_str, verbatim_doc_comment)]
log_format: Option<LogFormat>,
#[clap(long, verbatim_doc_comment)]
max_log_files: Option<usize>,
#[clap(long, verbatim_doc_comment)]
max_archived_log_files: Option<usize>,
#[cfg(feature = "open-metrics")]
#[clap(long, default_value_t = 0)]
metrics_server_port: u16,
#[clap(long, verbatim_doc_comment)]
network_id: Option<u8>,
#[clap(long, default_value_t = false)]
no_upnp: bool,
#[cfg(not(feature = "nightly"))]
#[clap(long)]
package_version: bool,
#[command(flatten)]
peers: InitialPeersConfig,
#[clap(long)]
protocol_version: bool,
#[clap(long, default_value_t = 0)]
port: u16,
#[clap(long)]
rewards_address: Option<String>,
#[clap(long, default_value_t = false)]
relay: bool,
#[expect(rustdoc::invalid_html_tags)]
#[clap(long, verbatim_doc_comment)]
root_dir: Option<PathBuf>,
#[clap(long)]
rpc: Option<SocketAddr>,
#[clap(long)]
version: bool,
}
fn main() -> Result<()> {
color_eyre::install()?;
let opt = Opt::parse();
let network_id = if let Some(network_id) = opt.network_id {
network_id
} else if opt.alpha {
2
} else {
1
};
version::set_network_id(network_id);
let identify_protocol_str = version::IDENTIFY_PROTOCOL_STR
.read()
.expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR");
if opt.version {
println!(
"{}",
ant_build_info::version_string(
"Autonomi Node",
env!("CARGO_PKG_VERSION"),
Some(&identify_protocol_str)
)
);
return Ok(());
}
let rewards_address = RewardsAddress::from_hex(opt.rewards_address.as_ref().expect(
"the following required arguments were not provided: --rewards-address <REWARDS_ADDRESS>",
))?;
if opt.crate_version {
println!("Crate version: {}", env!("CARGO_PKG_VERSION"));
return Ok(());
}
if opt.protocol_version {
println!("Network version: {identify_protocol_str}");
return Ok(());
}
#[cfg(not(feature = "nightly"))]
if opt.package_version {
println!("Package version: {}", ant_build_info::package_version());
return Ok(());
}
let evm_network: EvmNetwork = match opt.evm_network.as_ref() {
Some(evm_network) => Ok(evm_network.clone().into()),
None => match get_evm_network(opt.peers.local, Some(network_id)) {
Ok(net) => Ok(net),
Err(_) => Err(eyre!(
"EVM network not specified. Please specify a network using the subcommand or by setting the `EVM_NETWORK` environment variable."
)),
},
}?;
println!("EVM network: {evm_network:?}");
let node_socket_addr = SocketAddr::new(opt.ip, opt.port);
let (root_dir, keypair) = get_root_dir_and_keypair(&opt.root_dir)?;
let (log_output_dest, log_reload_handle, _log_appender_guard) =
init_logging(&opt, keypair.public().to_peer_id())?;
let bootstrap_cache = BootstrapCacheStore::new_from_initial_peers_config(&opt.peers, None)?;
let msg = format!(
"Running {} v{}",
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION")
);
info!("\n{}\n{}", msg, "=".repeat(msg.len()));
ant_build_info::log_version_info(env!("CARGO_PKG_VERSION"), &identify_protocol_str);
debug!(
"antnode built with git version: {}",
ant_build_info::git_info()
);
let rt = Runtime::new()?;
if opt.peers.local {
rt.spawn(init_metrics(std::process::id()));
}
let initial_peers = rt.block_on(opt.peers.get_addrs(None, Some(100)))?;
info!("Initial peers len: {:?}", initial_peers.len());
let restart_options = rt.block_on(async move {
let mut node_builder = NodeBuilder::new(
keypair,
initial_peers,
rewards_address,
evm_network,
node_socket_addr,
root_dir,
);
node_builder.local(opt.peers.local);
node_builder.no_upnp(opt.no_upnp);
node_builder.bootstrap_cache(bootstrap_cache);
node_builder.relay_client(opt.relay);
#[cfg(feature = "open-metrics")]
let mut node_builder = node_builder;
#[cfg(feature = "open-metrics")]
let metrics_server_port = if opt.enable_metrics_server || opt.metrics_server_port != 0 {
Some(opt.metrics_server_port)
} else {
None
};
#[cfg(feature = "open-metrics")]
node_builder.metrics_server_port(metrics_server_port);
let restart_options =
run_node(node_builder, opt.rpc, &log_output_dest, log_reload_handle).await?;
Ok::<_, eyre::Report>(restart_options)
})?;
rt.shutdown_timeout(Duration::from_secs(2));
if let Some((retain_peer_id, root_dir, port)) = restart_options {
start_new_node_process(retain_peer_id, root_dir, port);
println!("A new node process has been started successfully.");
} else {
println!("The node process has been stopped.");
}
Ok(())
}
async fn run_node(
node_builder: NodeBuilder,
rpc: Option<SocketAddr>,
log_output_dest: &str,
log_reload_handle: ReloadHandle,
) -> Result<Option<(bool, PathBuf, u16)>> {
let started_instant = std::time::Instant::now();
reset_critical_failure(log_output_dest);
info!("Starting node ...");
let running_node = node_builder.build_and_run()?;
println!(
"
Node started
PeerId is {}
You can check your reward balance by running:
`safe wallet balance --peer-id={}`
",
running_node.peer_id(),
running_node.peer_id()
);
let pid = std::process::id();
let pid_file = running_node.root_dir_path().join("antnode.pid");
std::fs::write(pid_file, pid.to_string().as_bytes())?;
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<NodeCtrl>(5);
let node_events_rx = running_node.node_events_channel().subscribe();
monitor_node_events(node_events_rx, ctrl_tx.clone());
let ctrl_tx_clone = ctrl_tx.clone();
tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
warn!("Listening to ctrl-c error: {err}");
}
if let Err(err) = ctrl_tx_clone
.send(NodeCtrl::Stop {
delay: Duration::from_secs(1),
result: StopResult::Error(eyre!("Ctrl-C received!")),
})
.await
{
error!("Failed to send node control msg to antnode bin main thread: {err}");
}
});
if let Some(addr) = rpc {
rpc_service::start_rpc_service(
addr,
log_output_dest,
running_node.clone(),
ctrl_tx,
started_instant,
log_reload_handle,
);
}
loop {
match ctrl_rx.recv().await {
Some(NodeCtrl::Restart {
delay,
retain_peer_id,
}) => {
let root_dir = running_node.root_dir_path();
let node_port = running_node.get_node_listening_port().await?;
let msg = format!("Node is restarting in {delay:?}...");
info!("{msg}");
println!("{msg} Node path: {log_output_dest}");
sleep(delay).await;
return Ok(Some((retain_peer_id, root_dir, node_port)));
}
Some(NodeCtrl::Stop { delay, result }) => {
let msg = format!("Node is stopping in {delay:?}...");
info!("{msg}");
println!("{msg} Node log path: {log_output_dest}");
sleep(delay).await;
match result {
StopResult::Success(message) => {
info!("Node stopped successfully: {}", message);
return Ok(None);
}
StopResult::Error(cause) => {
error!("Node stopped with error: {}", cause);
set_critical_failure(log_output_dest, &cause.to_string());
return Err(cause);
}
}
}
Some(NodeCtrl::Update(_delay)) => {
println!("No self-update supported yet.");
}
None => {
info!("Internal node ctrl cmds channel has been closed, restarting node");
break Err(eyre!("Internal node ctrl cmds channel has been closed"));
}
}
}
}
fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Sender<NodeCtrl>) {
let _handle = tokio::spawn(async move {
loop {
match node_events_rx.recv().await {
Ok(NodeEvent::ConnectedToNetwork) => Marker::NodeConnectedToNetwork.log(),
Ok(NodeEvent::ChannelClosed) | Err(RecvError::Closed) => {
if let Err(err) = ctrl_tx
.send(NodeCtrl::Stop {
delay: Duration::from_secs(1),
result: StopResult::Error(eyre!("Node events channel closed!")),
})
.await
{
error!("Failed to send node control msg to antnode bin main thread: {err}");
break;
}
}
Ok(NodeEvent::TerminateNode(reason)) => {
if let Err(err) = ctrl_tx
.send(NodeCtrl::Stop {
delay: Duration::from_secs(1),
result: StopResult::Error(eyre!("Node terminated due to: {reason:?}")),
})
.await
{
error!("Failed to send node control msg to antnode bin main thread: {err}");
break;
}
}
Ok(event) => {
debug!("Currently ignored node event {event:?}");
}
Err(RecvError::Lagged(n)) => {
warn!("Skipped {n} node events!");
continue;
}
}
}
});
}
fn init_logging(opt: &Opt, peer_id: PeerId) -> Result<(String, ReloadHandle, Option<WorkerGuard>)> {
let logging_targets = vec![
("ant_bootstrap".to_string(), Level::INFO),
("ant_build_info".to_string(), Level::DEBUG),
("ant_evm".to_string(), Level::DEBUG),
("ant_logging".to_string(), Level::DEBUG),
("ant_networking".to_string(), Level::INFO),
("ant_node".to_string(), Level::DEBUG),
("ant_protocol".to_string(), Level::DEBUG),
("antnode".to_string(), Level::DEBUG),
("evmlib".to_string(), Level::DEBUG),
];
let output_dest = match &opt.log_output_dest {
LogOutputDestArg::Stdout => LogOutputDest::Stdout,
LogOutputDestArg::DataDir => {
let path = get_antnode_root_dir(peer_id)?.join("logs");
LogOutputDest::Path(path)
}
LogOutputDestArg::Path(path) => LogOutputDest::Path(path.clone()),
};
#[cfg(not(feature = "otlp"))]
let (reload_handle, log_appender_guard) = {
let mut log_builder = ant_logging::LogBuilder::new(logging_targets);
log_builder.output_dest(output_dest.clone());
log_builder.format(opt.log_format.unwrap_or(LogFormat::Default));
if let Some(files) = opt.max_log_files {
log_builder.max_log_files(files);
}
if let Some(files) = opt.max_archived_log_files {
log_builder.max_archived_log_files(files);
}
log_builder.initialize()?
};
#[cfg(feature = "otlp")]
let (_rt, reload_handle, log_appender_guard) = {
let rt = Runtime::new()?;
let (reload_handle, log_appender_guard) = rt.block_on(async {
let mut log_builder = ant_logging::LogBuilder::new(logging_targets);
log_builder.output_dest(output_dest.clone());
log_builder.format(opt.log_format.unwrap_or(LogFormat::Default));
if let Some(files) = opt.max_log_files {
log_builder.max_log_files(files);
}
if let Some(files) = opt.max_archived_log_files {
log_builder.max_archived_log_files(files);
}
log_builder.initialize()
})?;
(rt, reload_handle, log_appender_guard)
};
Ok((output_dest.to_string(), reload_handle, log_appender_guard))
}
fn start_new_node_process(retain_peer_id: bool, root_dir: PathBuf, port: u16) {
let current_exe = env::current_exe().expect("could not get current executable path");
let mut args: Vec<String> = env::args().collect();
info!("Original args are: {args:?}");
info!("Current exe is: {current_exe:?}");
args.retain(|arg| arg != "--first");
let current_exe = match current_exe.to_str() {
Some(s) => {
if s.contains(" (deleted)") {
warn!("The current executable path contains ' (deleted)', which may lead to unexpected behavior. This has been removed from the exe location string");
s.replace(" (deleted)", "")
} else {
s.to_string()
}
}
None => {
error!("Failed to convert current executable path to string");
return;
}
};
let mut cmd = Command::new(current_exe);
cmd.args(&args[1..]);
if retain_peer_id {
cmd.arg("--root-dir");
cmd.arg(format!("{root_dir:?}"));
cmd.arg("--port");
cmd.arg(port.to_string());
}
warn!(
"Attempting to start a new process as node process loop has been broken: {:?}",
cmd
);
let _handle = match cmd.spawn() {
Ok(status) => status,
Err(e) => {
eprintln!("Failed to execute hard-restart command: {e:?}");
error!("Failed to execute hard-restart command: {e:?}");
return;
}
};
}