use std::{
fs,
fs::OpenOptions,
io::Write,
path::PathBuf,
process,
time::{Duration, Instant},
};
use anyhow::Error;
use async_trait::async_trait;
use chrono::Local;
use clap::Parser;
use minotari_app_utilities::utilities::UniPublicKey;
use tari_common_types::types::CompressedPublicKey;
use tari_comms::{
multiaddr::Multiaddr,
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags},
};
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle};
use tokio::{sync::watch, task};
use super::{CommandContext, HandleCommand};
#[derive(Debug, Parser)]
pub struct ArgsTestPeerLiveness {
public_key: UniPublicKey,
address: Multiaddr,
exit: Option<bool>,
output_to_file: Option<bool>,
refresh_file: Option<bool>,
output_directory: Option<PathBuf>,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
enum PingResult {
Initial,
Success,
Fail,
}
#[async_trait]
impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
async fn handle_command(&mut self, args: ArgsTestPeerLiveness) -> Result<(), Error> {
println!("\nTesting peer liveness...\n");
let peer_manager = self.comms.peer_manager();
let public_key = args.public_key.into();
if *self.comms.node_identity().public_key() == public_key {
return Err(Error::msg("Self liveness test not supported"));
}
let node_id = NodeId::from_public_key(&public_key);
let node_id_clone = node_id.clone();
let public_key_clone = public_key.clone();
let address_clone = args.address.clone();
let _res = peer_manager.soft_delete_peer(&node_id).await;
let peer = Peer::new(
public_key.clone(),
node_id.clone(),
MultiaddressesWithStats::from_addresses_with_source(vec![args.address], &PeerAddressSource::Config),
PeerFlags::empty(),
PeerFeatures::COMMUNICATION_NODE,
vec![],
String::new(),
);
peer_manager.add_or_update_peer(peer).await?;
let (tx, mut rx) = watch::channel(PingResult::Initial);
let start = Instant::now();
for _ in 0..5 {
if self.dial_peer(node_id.clone()).await.is_ok() {
println!("🏓 Peer ({node_id}, {public_key}) dialed successfully");
let liveness = self.liveness.clone();
task::spawn(async move {
ping_peer_liveness(liveness, node_id, public_key, tx).await;
});
break;
} else {
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
let mut count = 0;
loop {
tokio::select! {
_ = rx.changed() => {
let test_duration = start.elapsed();
let responsive = *rx.borrow();
let date_time = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
print_results_to_console(&date_time, responsive, &public_key_clone, &node_id_clone, &address_clone, test_duration);
if let Some(true) = args.output_to_file {
print_to_file(
&date_time,
responsive,
args.output_directory,
args.refresh_file,
public_key_clone,
address_clone,
test_duration
).await;
}
if let Some(true) = args.exit {
println!("The liveness test is complete and base node will now exit\n");
self.shutdown.trigger();
tokio::time::sleep(Duration::from_secs(1)).await;
match responsive {
PingResult::Success => process::exit(0),
_ => process::exit(1),
}
}
break;
},
_ = tokio::time::sleep(Duration::from_secs(1)) => {
count += 1;
if count >= 180 {
if let Some(true) = args.exit {
println!(" >> The liveness test failed to complete and base node will now exit\n");
self.shutdown.trigger();
tokio::time::sleep(Duration::from_secs(1)).await;
process::exit(1)
} else {
println!(" >> The liveness test failed to complete\n");
break;
}
}
},
}
}
Ok(())
}
}
fn print_results_to_console(
date_time: &str,
responsive: PingResult,
public_key: &CompressedPublicKey,
node_id: &NodeId,
address: &Multiaddr,
test_duration: Duration,
) {
println!();
if responsive == PingResult::Success {
println!("✅ Peer is responsive");
} else {
println!("❌ Peer is unresponsive");
}
println!(" Date Time: {date_time}");
println!(" Public Key: {public_key}");
println!(" Node ID: {node_id}");
println!(" Address: {address}");
println!(" Result: {responsive:?}");
println!(" Test Duration: {test_duration:.2?}");
println!();
}
async fn ping_peer_liveness(
mut liveness: LivenessHandle,
node_id: NodeId,
public_key: CompressedPublicKey,
tx: watch::Sender<PingResult>,
) {
let mut liveness_events = liveness.get_event_stream();
if let Ok(nonce) = liveness.send_ping(node_id.clone()).await {
println!("🏓 Pinging peer ({node_id}, {public_key}) with nonce {nonce} ...");
for _ in 0..5 {
match liveness_events.recv().await {
Ok(event) => {
if let LivenessEvent::ReceivedPong(pong) = &*event &&
pong.node_id == node_id &&
pong.nonce == nonce
{
println!(
"🏓️ Pong: peer ({}, {}) responded with nonce {}, round-trip-time is {:.2?}!",
pong.node_id,
public_key,
pong.nonce,
pong.latency.unwrap_or_default()
);
let _ = tx.send(PingResult::Success);
return;
}
},
Err(e) => {
println!("🏓 Ping peer ({node_id}, {public_key}) gave error: {e}");
},
}
}
let _ = tx.send(PingResult::Fail);
}
}
async fn print_to_file(
date_time: &str,
responsive: PingResult,
output_directory: Option<PathBuf>,
refresh_file: Option<bool>,
public_key: CompressedPublicKey,
address: Multiaddr,
test_duration: Duration,
) {
let test_result = if responsive == PingResult::Success {
"PASS"
} else {
"FAIL"
};
let file_name = "peer_liveness_test.csv";
let file_path = if let Some(path) = output_directory.clone() {
if let Ok(true) = fs::exists(&path) {
path.join(file_name)
} else if fs::create_dir_all(&path).is_ok() {
path.join(file_name)
} else {
PathBuf::from(file_name)
}
} else {
PathBuf::from(file_name)
};
if let Some(true) = refresh_file {
let _unused = fs::remove_file(&file_path);
tokio::time::sleep(Duration::from_secs(1)).await;
}
let write_header = !file_path.exists();
if let Ok(mut file) = OpenOptions::new().append(true).create(true).open(file_path.clone()) {
let mut file_content = String::new();
if write_header {
file_content.push_str("Date Time,Public Key,Address,Result,Test Duration\n");
}
file_content.push_str(&format!(
"{date_time},{public_key},{address},{test_result},{test_duration:.2?}"
));
match writeln!(file, "{file_content}") {
Ok(_) => {
println!("📝 Test result written to file: {}", file_path.display());
},
Err(e) => {
println!("❌ Error writing test result to file: {e}");
},
}
}
}