use crate::{
gossip::{gossip_task, GossipData},
scenario::{Scenario, ScenarioConfig},
server::server_task,
};
use async_channel::{bounded, Receiver, Sender};
use clap::Parser;
use lazy_static::lazy_static;
#[doc(hidden)]
pub use linkme::distributed_slice;
use std::{collections::HashMap, net::SocketAddr};
use tracing::{debug, error, info, instrument, Instrument};
lazy_static! {
pub(crate) static ref BALTER_IN: (Sender<ScenarioConfig>, Receiver<ScenarioConfig>) =
bounded(10);
pub(crate) static ref BALTER_OUT: (Sender<ScenarioConfig>, Receiver<ScenarioConfig>) =
bounded(10);
}
#[doc(hidden)]
#[distributed_slice]
pub static BALTER_SCENARIOS: [(&'static str, fn() -> Scenario)];
const DEFAULT_PORT: u16 = 7621;
#[derive(Parser, Debug)]
#[command(version = "0.1")]
struct BalterCli {
#[arg(short, long, default_value_t = DEFAULT_PORT)]
port: u16,
#[arg(short('n'), long)]
peers: Vec<SocketAddr>,
}
pub struct BalterRuntime {
port: u16,
peers: Vec<SocketAddr>,
}
impl BalterRuntime {
pub fn new() -> Self {
BalterRuntime {
port: DEFAULT_PORT,
peers: vec![],
}
}
pub fn with_args(mut self) -> Self {
let args = BalterCli::parse();
self.port = args.port;
self.peers = args.peers;
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn peers(mut self, peers: &[SocketAddr]) -> Self {
self.peers = peers.to_vec();
self
}
pub async fn run(self) {
let scenarios: HashMap<_, _> = BALTER_SCENARIOS
.iter()
.enumerate()
.map(|(idx, (name, _))| (*name, idx))
.collect();
debug!("Scenario mapping: {:?}", &scenarios);
run(scenarios, &self).await.unwrap();
}
}
impl Default for BalterRuntime {
fn default() -> Self {
Self::new()
}
}
#[instrument(name="balter", skip_all, fields(port=balter.port))]
async fn run(scenarios: HashMap<&'static str, usize>, balter: &BalterRuntime) -> Result<(), ()> {
let port = balter.port;
let gossip_data = GossipData::new(&balter.peers, balter.port).shared();
let gd = gossip_data.clone();
tokio::spawn(async move { server_task(port, gd).await }.in_current_span());
let gd = gossip_data.clone();
tokio::spawn(async move { gossip_task(gd).await }.in_current_span());
let (_, ref rx) = *BALTER_IN;
let rx = rx.clone();
loop {
if let Ok(config) = rx.recv().await {
if let Some(idx) = scenarios.get(config.name.as_str()) {
info!("Running scenario {}.", &config.name);
let scenario = BALTER_SCENARIOS[*idx];
tokio::spawn(
async move {
scenario.1().set_config(config).await;
}
.in_current_span(),
);
} else {
error!("No scenario with name \"{}\" exists.", &config.name);
}
}
}
}