use crate::{
gossip::{gossip_task, GossipData},
scenario::{ScenarioConfig, DistributedScenario},
server::server_task,
stats::RunStatistics,
};
use async_channel::{bounded, Receiver, Sender};
use clap::Parser;
use lazy_static::lazy_static;
#[doc(hidden)]
pub use linkme::distributed_slice;
use std::{collections::HashMap, net::SocketAddr};
#[allow(unused)]
use tracing::{debug, error, info, instrument, Instrument};
use std::pin::Pin;
use std::future::Future;
lazy_static! {
pub(crate) static ref BALTER_IN: (Sender<ScenarioConfig>, Receiver<ScenarioConfig>) =
bounded(10);
pub(crate) static ref BALTER_OUT: (Sender<ScenarioConfig>, Receiver<ScenarioConfig>) =
bounded(10);
}
#[doc(hidden)]
#[distributed_slice]
pub static BALTER_SCENARIOS: [(&'static str, fn() -> Pin<Box<dyn DistributedScenario<Output=RunStatistics>>>)];
const DEFAULT_PORT: u16 = 7621;
#[derive(Parser, Debug)]
#[command(version = "0.1")]
struct BalterCli {
#[arg(short, long, default_value_t = DEFAULT_PORT)]
port: u16,
#[arg(short('n'), long)]
peers: Vec<SocketAddr>,
}
pub struct BalterRuntime {
port: u16,
peers: Vec<SocketAddr>,
}
impl BalterRuntime {
pub fn new() -> Self {
BalterRuntime {
port: DEFAULT_PORT,
peers: vec![],
}
}
pub fn with_args(mut self) -> Self {
let args = BalterCli::parse();
self.port = args.port;
self.peers = args.peers;
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn peers(mut self, peers: &[SocketAddr]) -> Self {
self.peers = peers.to_vec();
self
}
pub async fn run(self) {
let scenarios: HashMap<_, _> = BALTER_SCENARIOS
.iter()
.enumerate()
.map(|(idx, (name, _))| (*name, idx))
.collect();
run(scenarios, &self).await.unwrap();
}
}
impl Default for BalterRuntime {
fn default() -> Self {
Self::new()
}
}
#[instrument(name="balter", skip_all, fields(port=balter.port))]
async fn run(scenarios: HashMap<&'static str, usize>, balter: &BalterRuntime) -> Result<(), ()> {
let port = balter.port;
let gossip_data = GossipData::new(&balter.peers, balter.port).shared();
let gd = gossip_data.clone();
spawn_or_halt(server_task(port, gd)).await;
let gd = gossip_data.clone();
spawn_or_halt(gossip_task(gd)).await;
let (_, ref rx) = *BALTER_IN;
let rx = rx.clone();
loop {
if let Ok(config) = rx.recv().await {
if let Some(idx) = scenarios.get(config.name.as_str()) {
info!("Running scenario {}.", &config.name);
let scenario = BALTER_SCENARIOS[*idx];
let fut = scenario.1().set_config(config);
tokio::spawn(
async move {
fut.await;
}
.in_current_span(),
);
} else {
error!("No scenario with name \"{}\" exists.", &config.name);
}
}
}
}
async fn spawn_or_halt<F, R, E>(fut: F)
where F: Future<Output=Result<R, E>> + Send + 'static
{
tokio::spawn(async move {
let res = fut.await;
if res.is_err() {
error!("Failure in critical service. Shutting down.");
std::process::exit(1);
}
}.in_current_span());
}