balter_runtime/
runtime.rs

1//! Default Balter Distributed Runtime
2//!
3//! This Runtime handles running scenarios and distributing workloads to peers. Currently this
4//! involves spinning up an API server and a gossip protocol task.
5use crate::{
6    error::RuntimeError,
7    gossip::{gossip_task, peer_stream, Gossip},
8    server::server_task,
9    DistributedScenario,
10};
11use async_channel::{bounded, Receiver, Sender};
12use balter_core::{RunStatistics, ScenarioConfig};
13use clap::Parser;
14use lazy_static::lazy_static;
15#[doc(hidden)]
16pub use linkme::distributed_slice;
17use std::future::Future;
18use std::pin::Pin;
19use std::{collections::HashMap, net::SocketAddr};
20#[allow(unused)]
21use tracing::{debug, error, info, instrument, Instrument};
22
23mod message;
24
25pub use message::RuntimeMessage;
26
27// TODO: This doesn't need to be a global, and can be threaded into each Scenario via task_local.
28lazy_static! {
29    /// Message queue for sending work to other peers
30    pub static ref BALTER_OUT: (Sender<RuntimeMessage>, Receiver<RuntimeMessage>) =
31        bounded(10);
32}
33
34/// An array created at link-time which stores the names of each scenario and their respective
35/// function pointer.
36#[doc(hidden)]
37#[distributed_slice]
38pub static BALTER_SCENARIOS: [(
39    &'static str,
40    fn() -> Pin<Box<dyn DistributedScenario<Output = RunStatistics>>>,
41)];
42
43const DEFAULT_PORT: u16 = 7621;
44
45#[derive(Parser, Debug)]
46#[command(version = "0.1")]
47struct BalterCli {
48    #[arg(short, long, default_value_t = DEFAULT_PORT)]
49    port: u16,
50
51    #[arg(short('n'), long)]
52    peers: Vec<SocketAddr>,
53}
54
55/// Default Balter distributed runtime. (requires `rt` feature)
56///
57/// Creates a background API server to handle HTTP requests (for kicking off scenarios) as well
58/// as a background task for handling the gossip protocol.
59///
60/// # Example
61///
62/// ```ignore
63/// use balter::prelude::*;
64///
65/// #[tokio::main]
66/// async fn main() {
67///     BalterRuntime::new()
68///         .with_args()
69///         .run()
70///         .await;
71/// }
72/// ```
73pub struct BalterRuntime {
74    port: u16,
75    peers: Vec<SocketAddr>,
76}
77
78impl Default for BalterRuntime {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl BalterRuntime {
85    pub fn new() -> Self {
86        BalterRuntime {
87            port: DEFAULT_PORT,
88            peers: vec![],
89        }
90    }
91
92    /// Use the default CLI arguments for Balter.
93    ///
94    /// `-p`, `--port` to set a custom port number (default `7621`)
95    ///
96    /// `-n`, `--peers` to provide addresses to peer servers to enable gossiping.
97    ///
98    /// # Example
99    /// ```ignore
100    /// $ ./my_load_test -p 2742
101    /// $ ./my_load_test -n 127.0.0.1:7621 -n 127.0.0.2:7621
102    /// ```
103    pub fn with_args(mut self) -> Self {
104        let args = BalterCli::parse();
105        self.port = args.port;
106        self.peers = args.peers;
107        self
108    }
109
110    pub fn port(mut self, port: u16) -> Self {
111        self.port = port;
112        self
113    }
114
115    pub fn peers(mut self, peers: &[SocketAddr]) -> Self {
116        self.peers = peers.to_vec();
117        self
118    }
119
120    #[instrument(name="balter", skip_all, fields(port=self.port))]
121    pub async fn run(self) {
122        let gossip = Gossip::new(uuid::Uuid::new_v4(), self.port, spawn_scenario);
123
124        spawn_or_halt(server_task(self.port, gossip.clone())).await;
125        spawn_or_halt(gossip_task(gossip.clone())).await;
126        spawn_or_halt(helper_task(gossip.clone())).await;
127    }
128}
129
130pub(crate) fn spawn_scenario(config: ScenarioConfig) -> Result<(), RuntimeError> {
131    // TODO: We probably don't want to rebuild this every time.
132    let scenarios: HashMap<_, _> = BALTER_SCENARIOS
133        .iter()
134        .enumerate()
135        .map(|(idx, (name, _))| (*name, idx))
136        .collect();
137
138    let idx = scenarios
139        .get(config.name.as_str())
140        .ok_or(RuntimeError::NoScenario)?;
141    info!("Running scenario {}.", &config.name);
142    let scenario = BALTER_SCENARIOS[*idx];
143    let fut = scenario.1().set_config(config);
144    tokio::spawn(
145        async move {
146            fut.await;
147        }
148        .in_current_span(),
149    );
150    Ok(())
151}
152
153async fn helper_task(gossip: Gossip) -> Result<(), RuntimeError> {
154    let (_, ref rx) = *BALTER_OUT;
155    let rx = rx.clone();
156    loop {
157        if let Ok(msg) = rx.recv().await {
158            match msg {
159                RuntimeMessage::Help(config) => {
160                    // TODO: The internal `data` probably shouldn't be exposed like this.
161                    let peer = {
162                        let mut data = gossip.data.lock()?;
163                        data.set_state_busy();
164                        data.select_free_peer()
165                    };
166                    if let Some(peer) = peer {
167                        let mut stream = peer_stream(&peer).await?;
168                        let res = gossip.request_help(&mut stream, peer.addr, config).await;
169                        if let Err(error) = res {
170                            error!("Error in gossip protocol: {error:?}");
171                        }
172                    } else {
173                        error!("No Peers available to help.");
174                        // TODO: Implement some form of retry/auto-scaling
175                    }
176                }
177                RuntimeMessage::Finished => {
178                    gossip.data.lock()?.set_state_free();
179                }
180            }
181        } else {
182            return Err(RuntimeError::ChannelClosed);
183        }
184    }
185}
186
187async fn spawn_or_halt<F, R, E>(fut: F)
188where
189    F: Future<Output = Result<R, E>> + Send + 'static,
190{
191    tokio::spawn(
192        async move {
193            let res = fut.await;
194            if res.is_err() {
195                error!("Failure in critical service. Shutting down.");
196                std::process::exit(1);
197            }
198        }
199        .in_current_span(),
200    );
201}