use crate::{
error::RuntimeError,
gossip::{gossip_task, peer_stream, Gossip},
server::server_task,
DistributedScenario,
};
use async_channel::{bounded, Receiver, Sender};
use balter_core::{RunStatistics, ScenarioConfig};
use clap::Parser;
use lazy_static::lazy_static;
#[doc(hidden)]
pub use linkme::distributed_slice;
use std::future::Future;
use std::pin::Pin;
use std::{collections::HashMap, net::SocketAddr};
#[allow(unused)]
use tracing::{debug, error, info, instrument, Instrument};
mod message;
pub use message::RuntimeMessage;
lazy_static! {
pub static ref BALTER_OUT: (Sender<RuntimeMessage>, Receiver<RuntimeMessage>) =
bounded(10);
}
#[doc(hidden)]
#[distributed_slice]
pub static BALTER_SCENARIOS: [(
&'static str,
fn() -> Pin<Box<dyn DistributedScenario<Output = RunStatistics>>>,
)];
const DEFAULT_PORT: u16 = 7621;
#[derive(Parser, Debug)]
#[command(version = "0.1")]
struct BalterCli {
#[arg(short, long, default_value_t = DEFAULT_PORT)]
port: u16,
#[arg(short('n'), long)]
peers: Vec<SocketAddr>,
}
pub struct BalterRuntime {
port: u16,
peers: Vec<SocketAddr>,
}
impl Default for BalterRuntime {
fn default() -> Self {
Self::new()
}
}
impl BalterRuntime {
pub fn new() -> Self {
BalterRuntime {
port: DEFAULT_PORT,
peers: vec![],
}
}
pub fn with_args(mut self) -> Self {
let args = BalterCli::parse();
self.port = args.port;
self.peers = args.peers;
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn peers(mut self, peers: &[SocketAddr]) -> Self {
self.peers = peers.to_vec();
self
}
#[instrument(name="balter", skip_all, fields(port=self.port))]
pub async fn run(self) {
let gossip = Gossip::new(uuid::Uuid::new_v4(), self.port, spawn_scenario);
spawn_or_halt(server_task(self.port, gossip.clone())).await;
spawn_or_halt(gossip_task(gossip.clone())).await;
spawn_or_halt(helper_task(gossip.clone())).await;
}
}
pub(crate) fn spawn_scenario(config: ScenarioConfig) -> Result<(), RuntimeError> {
let scenarios: HashMap<_, _> = BALTER_SCENARIOS
.iter()
.enumerate()
.map(|(idx, (name, _))| (*name, idx))
.collect();
let idx = scenarios
.get(config.name.as_str())
.ok_or(RuntimeError::NoScenario)?;
info!("Running scenario {}.", &config.name);
let scenario = BALTER_SCENARIOS[*idx];
let fut = scenario.1().set_config(config);
tokio::spawn(
async move {
fut.await;
}
.in_current_span(),
);
Ok(())
}
async fn helper_task(gossip: Gossip) -> Result<(), RuntimeError> {
let (_, ref rx) = *BALTER_OUT;
let rx = rx.clone();
loop {
if let Ok(msg) = rx.recv().await {
match msg {
RuntimeMessage::Help(config) => {
let peer = {
let mut data = gossip.data.lock()?;
data.set_state_busy();
data.select_free_peer()
};
if let Some(peer) = peer {
let mut stream = peer_stream(&peer).await?;
let res = gossip.request_help(&mut stream, peer.addr, config).await;
if let Err(error) = res {
error!("Error in gossip protocol: {error:?}");
}
} else {
error!("No Peers available to help.");
}
}
RuntimeMessage::Finished => {
gossip.data.lock()?.set_state_free();
}
}
} else {
return Err(RuntimeError::ChannelClosed);
}
}
}
async fn spawn_or_halt<F, R, E>(fut: F)
where
F: Future<Output = Result<R, E>> + Send + 'static,
{
tokio::spawn(
async move {
let res = fut.await;
if res.is_err() {
error!("Failure in critical service. Shutting down.");
std::process::exit(1);
}
}
.in_current_span(),
);
}