mod stress;
use std::{env, net::Ipv4Addr, path::Path, process, sync::Arc, time::Duration};
use futures::{future, future::Either};
use stress::{error::Error, prompt::user_prompt};
use tari_shutdown::Shutdown;
use tari_utilities::message_format::MessageFormat;
use tempfile::Builder;
use tokio::{sync::oneshot, time};
use crate::stress::{node, prompt::parse_from_short_str, service, service::StressTestServiceRequest};
#[tokio::main]
async fn main() {
env_logger::init();
match run().await {
Ok(_) | Err(Error::UserQuit) => {},
Err(err) => {
println!("{err:?}: {err}");
process::exit(1);
},
}
}
fn remove_arg(args: &mut Vec<String>, item: &str) -> Option<usize> {
if let Some(pos) = args.iter().position(|s| s == item) {
args.remove(pos);
Some(pos)
} else {
None
}
}
async fn run() -> Result<(), Error> {
let mut args = env::args().skip(1).collect::<Vec<_>>();
let is_tcp = remove_arg(&mut args, "--tcp").is_some();
let mut public_ip = None;
if let Some(pos) = remove_arg(&mut args, "--public-ip") {
public_ip = Some(args.remove(pos).parse::<Ipv4Addr>().unwrap());
}
let mut tor_identity_path = None;
if let Some(pos) = remove_arg(&mut args, "--tor-identity") {
tor_identity_path = Some(args.remove(pos));
}
let mut node_identity_path = None;
if let Some(pos) = remove_arg(&mut args, "--identity") {
node_identity_path = Some(args.remove(pos));
}
let mut peer = None;
if let Some(pos) = remove_arg(&mut args, "--peer") {
peer = Some(args.remove(pos));
}
let mut port = 9098;
if let Some(pos) = remove_arg(&mut args, "--port") {
port = args.remove(pos).parse().expect("Unable to parse port");
}
println!("Initializing...",);
let tor_identity = tor_identity_path.as_ref().and_then(load_json);
let node_identity = node_identity_path.as_ref().and_then(load_json).map(Arc::new);
let shutdown = Shutdown::new();
let temp_dir = Builder::new().prefix("stress-test").tempdir().unwrap();
let (comms_node, protocol_notif, inbound_rx, outbound_tx) = node::create(
node_identity,
temp_dir.as_ref(),
public_ip,
port,
tor_identity.clone(),
is_tcp,
shutdown.to_signal(),
)
.await?;
if let Some(node_identity_path) = node_identity_path.as_ref() {
save_json(comms_node.node_identity_ref(), node_identity_path)?;
}
if !is_tcp && let Some(tor_identity_path) = tor_identity_path.as_ref() {
save_json(&tor_identity.unwrap(), tor_identity_path)?;
}
println!("Stress test service started!");
let (handle, requester) = service::start_service(comms_node, protocol_notif, inbound_rx, outbound_tx, shutdown);
let mut last_peer = peer.as_ref().and_then(parse_from_short_str);
loop {
let (peer, selected_protocol) = user_prompt(last_peer.take())?;
last_peer = Some(peer.clone());
let (reply_tx, reply_rx) = oneshot::channel();
requester
.send(StressTestServiceRequest::BeginProtocol(
peer,
selected_protocol,
reply_tx,
))
.await
.unwrap();
let ctrl_c = tokio::signal::ctrl_c();
futures::pin_mut!(ctrl_c);
match future::select(reply_rx, ctrl_c).await {
Either::Left((result, _)) => {
println!("Stress test complete: {result:?}");
},
Either::Right((_, _)) => {
println!("SIGINT caught. Waiting for service to exit");
break;
},
}
}
println!("Stress test service is shutting down...");
requester.send(StressTestServiceRequest::Shutdown).await.unwrap();
time::timeout(Duration::from_secs(2), handle).await???;
Ok(())
}
fn save_json<P: AsRef<Path>, T: MessageFormat>(obj: &T, path: P) -> Result<(), Error> {
let json = obj.to_json()?;
std::fs::write(path, json).map_err(Into::into)
}
fn load_json<P: AsRef<Path>, T: MessageFormat>(path: P) -> Option<T> {
if path.as_ref().exists() {
let contents = std::fs::read_to_string(path).unwrap();
Some(T::from_json(&contents).unwrap())
} else {
None
}
}