balter_runtime/
runtime.rs1use crate::{
6 error::RuntimeError,
7 gossip::{gossip_task, peer_stream, Gossip},
8 server::server_task,
9 DistributedScenario,
10};
11use async_channel::{bounded, Receiver, Sender};
12use balter_core::{RunStatistics, ScenarioConfig};
13use clap::Parser;
14use lazy_static::lazy_static;
15#[doc(hidden)]
16pub use linkme::distributed_slice;
17use std::future::Future;
18use std::pin::Pin;
19use std::{collections::HashMap, net::SocketAddr};
20#[allow(unused)]
21use tracing::{debug, error, info, instrument, Instrument};
22
23mod message;
24
25pub use message::RuntimeMessage;
26
27lazy_static! {
29 pub static ref BALTER_OUT: (Sender<RuntimeMessage>, Receiver<RuntimeMessage>) =
31 bounded(10);
32}
33
34#[doc(hidden)]
37#[distributed_slice]
38pub static BALTER_SCENARIOS: [(
39 &'static str,
40 fn() -> Pin<Box<dyn DistributedScenario<Output = RunStatistics>>>,
41)];
42
43const DEFAULT_PORT: u16 = 7621;
44
45#[derive(Parser, Debug)]
46#[command(version = "0.1")]
47struct BalterCli {
48 #[arg(short, long, default_value_t = DEFAULT_PORT)]
49 port: u16,
50
51 #[arg(short('n'), long)]
52 peers: Vec<SocketAddr>,
53}
54
55pub struct BalterRuntime {
74 port: u16,
75 peers: Vec<SocketAddr>,
76}
77
78impl Default for BalterRuntime {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl BalterRuntime {
85 pub fn new() -> Self {
86 BalterRuntime {
87 port: DEFAULT_PORT,
88 peers: vec![],
89 }
90 }
91
92 pub fn with_args(mut self) -> Self {
104 let args = BalterCli::parse();
105 self.port = args.port;
106 self.peers = args.peers;
107 self
108 }
109
110 pub fn port(mut self, port: u16) -> Self {
111 self.port = port;
112 self
113 }
114
115 pub fn peers(mut self, peers: &[SocketAddr]) -> Self {
116 self.peers = peers.to_vec();
117 self
118 }
119
120 #[instrument(name="balter", skip_all, fields(port=self.port))]
121 pub async fn run(self) {
122 let gossip = Gossip::new(uuid::Uuid::new_v4(), self.port, spawn_scenario);
123
124 spawn_or_halt(server_task(self.port, gossip.clone())).await;
125 spawn_or_halt(gossip_task(gossip.clone())).await;
126 spawn_or_halt(helper_task(gossip.clone())).await;
127 }
128}
129
130pub(crate) fn spawn_scenario(config: ScenarioConfig) -> Result<(), RuntimeError> {
131 let scenarios: HashMap<_, _> = BALTER_SCENARIOS
133 .iter()
134 .enumerate()
135 .map(|(idx, (name, _))| (*name, idx))
136 .collect();
137
138 let idx = scenarios
139 .get(config.name.as_str())
140 .ok_or(RuntimeError::NoScenario)?;
141 info!("Running scenario {}.", &config.name);
142 let scenario = BALTER_SCENARIOS[*idx];
143 let fut = scenario.1().set_config(config);
144 tokio::spawn(
145 async move {
146 fut.await;
147 }
148 .in_current_span(),
149 );
150 Ok(())
151}
152
153async fn helper_task(gossip: Gossip) -> Result<(), RuntimeError> {
154 let (_, ref rx) = *BALTER_OUT;
155 let rx = rx.clone();
156 loop {
157 if let Ok(msg) = rx.recv().await {
158 match msg {
159 RuntimeMessage::Help(config) => {
160 let peer = {
162 let mut data = gossip.data.lock()?;
163 data.set_state_busy();
164 data.select_free_peer()
165 };
166 if let Some(peer) = peer {
167 let mut stream = peer_stream(&peer).await?;
168 let res = gossip.request_help(&mut stream, peer.addr, config).await;
169 if let Err(error) = res {
170 error!("Error in gossip protocol: {error:?}");
171 }
172 } else {
173 error!("No Peers available to help.");
174 }
176 }
177 RuntimeMessage::Finished => {
178 gossip.data.lock()?.set_state_free();
179 }
180 }
181 } else {
182 return Err(RuntimeError::ChannelClosed);
183 }
184 }
185}
186
187async fn spawn_or_halt<F, R, E>(fut: F)
188where
189 F: Future<Output = Result<R, E>> + Send + 'static,
190{
191 tokio::spawn(
192 async move {
193 let res = fut.await;
194 if res.is_err() {
195 error!("Failure in critical service. Shutting down.");
196 std::process::exit(1);
197 }
198 }
199 .in_current_span(),
200 );
201}