use crate::controllers::{CompositeController, Controller};
use balter_core::{LatencyConfig, RunStatistics, ScenarioConfig};
#[cfg(feature = "rt")]
use balter_runtime::runtime::{RuntimeMessage, BALTER_OUT};
use std::{
future::Future,
num::NonZeroU32,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
mod sampler;
use sampler::ConcurrentSampler;
#[pin_project::pin_project]
pub struct Scenario<T> {
func: T,
runner_fut: Option<Pin<Box<dyn Future<Output = RunStatistics> + Send>>>,
config: ScenarioConfig,
}
impl<T> Scenario<T> {
#[doc(hidden)]
pub fn new(name: &str, func: T) -> Self {
Self {
func,
runner_fut: None,
config: ScenarioConfig::new(name),
}
}
}
impl<T, F> Future for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
type Output = RunStatistics;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.runner_fut.is_none() {
let func = self.func.clone();
let config = self.config.clone();
self.runner_fut = Some(Box::pin(async move { run_scenario(func, config).await }));
}
if let Some(runner) = &mut self.runner_fut {
runner.as_mut().poll(cx)
} else {
unreachable!()
}
}
}
pub trait ConfigurableScenario<T: Send>: Future<Output = T> + Sized + Send {
fn error_rate(self, error_rate: f64) -> Self;
fn tps(self, tps: u32) -> Self;
fn latency(self, latency: Duration, quantile: f64) -> Self;
fn duration(self, duration: Duration) -> Self;
}
impl<T, F> ConfigurableScenario<RunStatistics> for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
fn tps(mut self, tps: u32) -> Self {
self.config.max_tps =
Some(NonZeroU32::new(tps).expect("TPS provided must be non-zero. Given: {tps}"));
self
}
fn error_rate(mut self, error_rate: f64) -> Self {
if !(0. ..=1.).contains(&error_rate) {
panic!(
"Specified error rate must be between 0 and 1. Value provided was {error_rate}."
);
}
self.config.error_rate = Some(error_rate);
self
}
fn latency(mut self, latency: Duration, quantile: f64) -> Self {
if !(0. ..=1.).contains(&quantile) {
panic!("Specified quantile must be between 0 and 1. Value provided was {quantile}.");
}
self.config.latency = Some(LatencyConfig::new(latency, quantile));
self
}
fn duration(mut self, duration: Duration) -> Self {
self.config.duration = Some(duration);
self
}
}
#[cfg(feature = "rt")]
mod runtime {
use super::*;
use balter_runtime::DistributedScenario;
impl<T, F> DistributedScenario for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
#[allow(unused)]
fn set_config(
&self,
config: ScenarioConfig,
) -> Pin<Box<dyn DistributedScenario<Output = Self::Output>>> {
Box::pin(Scenario {
func: self.func.clone(),
runner_fut: None,
config,
})
}
}
}
#[instrument(name="scenario", skip_all, fields(name=config.name))]
pub(crate) async fn run_scenario<T, F>(scenario: T, config: ScenarioConfig) -> RunStatistics
where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
if config.is_unconfigured() {
debug!(
"Not load testing {} with config {:?}, because it has no work to do.",
config.name, &config
);
return RunStatistics::default();
}
info!("Running {} with config {:?}", config.name, &config);
let start = Instant::now();
let mut controllers = CompositeController::new(&config);
let mut sampler = ConcurrentSampler::new(&config.name, scenario, controllers.initial_tps());
loop {
if let (stable, Some(samples)) = sampler.get_samples().await {
if let Some(duration) = config.duration {
if start.elapsed() > duration {
break;
}
}
let new_goal_tps = controllers.limit(samples, stable);
if new_goal_tps < sampler.goal_tps() || stable {
sampler.set_goal_tps(new_goal_tps);
}
}
}
let sampler_stats = sampler.wait_for_shutdown().await;
#[cfg(feature = "rt")]
signal_completion().await;
info!("Scenario complete");
RunStatistics {
concurrency: sampler_stats.concurrency,
goal_tps: sampler_stats.goal_tps.get(),
actual_tps: sampler_stats.final_sample_set.mean_tps(),
latency_p50: sampler_stats.final_sample_set.latency(0.5),
latency_p90: sampler_stats.final_sample_set.latency(0.9),
latency_p99: sampler_stats.final_sample_set.latency(0.99),
error_rate: sampler_stats.final_sample_set.mean_err(),
tps_limited: sampler_stats.tps_limited,
}
}
#[allow(unused)]
#[cfg(feature = "rt")]
async fn distribute_work(_config: &ScenarioConfig, _elapsed: Duration, _self_tps: f64) {
todo!()
}
#[cfg(feature = "rt")]
async fn signal_completion() {
let (ref tx, _) = *BALTER_OUT;
let _ = tx.send(RuntimeMessage::Finished).await;
}