use chrono::{DateTime, Utc};
use feldera_rest_api::Client;
use feldera_rest_api::types::{CompilationProfile, Configuration, ProgramConfig};
use feldera_types::error::ErrorResponse;
use log::{error, info, warn};
use progenitor_client::Error as ProgenitorError;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{cmp, collections::HashMap, error::Error};
use tabled::builder::Builder;
use tabled::settings::Style;
use tokio::time::{Duration, Instant, sleep};
mod api;
use crate::bench::api::types::JsonUpdateStartPoint;
use crate::handle_errors_fatal;
use crate::{
bench::api::types::{Adapter, GitHash, JsonNewRun, JsonReportSettings, NameId, ResourceId},
cli::{BenchmarkArgs, OutputFormat, PipelineAction},
};
use api::Client as BenchClient;
pub fn human_readable_bytes(n: i64) -> String {
const DELIMITER: f64 = 1024_f64;
const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"];
let n: f64 = n as f64;
assert!(n >= 0f64, "No negative bytes");
let n = n.abs();
if n < 1_f64 {
return format!("{} {}", n, "B");
}
let exp = cmp::min(
(n.ln() / DELIMITER.ln()).floor() as i32,
(UNITS.len() - 1) as i32,
);
let bytes = format!("{:.2}", n / DELIMITER.powi(exp))
.parse::<f64>()
.unwrap()
* 1_f64;
let unit = UNITS[exp as usize];
format!("{} {}", bytes, unit)
}
#[derive(Debug)]
struct RawMetrics {
rss_bytes: i64,
uptime_msecs: i64,
incarnation_uuid: String,
storage_bytes: i64,
buffered_input_records: i64,
total_processed_records: i64,
input_bytes: i64,
input_errors: bool,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct InputMetrics {
buffered_records: i64,
end_of_input: bool,
num_parse_errors: i64,
num_transport_errors: i64,
total_bytes: i64,
total_records: i64,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct InputConnector {
barrier: bool,
config: Map<String, Value>,
endpoint_name: String,
fatal_error: Option<String>,
metrics: InputMetrics,
paused: bool,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct PipelineStats {
global_metrics: Map<String, Value>,
inputs: Vec<InputConnector>,
outputs: Vec<Value>,
suspend_error: Option<Value>,
}
impl From<&feldera_rest_api::types::ControllerStatus> for RawMetrics {
fn from(stats: &feldera_rest_api::types::ControllerStatus) -> Self {
let global_metrics = &stats.global_metrics;
let input_errors = stats.inputs.iter().any(|input| {
input.fatal_error.is_some()
|| input.metrics.num_parse_errors > 0
|| input.metrics.num_transport_errors > 0
});
let input_bytes = stats
.inputs
.iter()
.map(|input| input.metrics.total_bytes)
.sum();
Self {
rss_bytes: global_metrics.rss_bytes,
uptime_msecs: global_metrics.uptime_msecs,
incarnation_uuid: global_metrics.incarnation_uuid.to_string(),
storage_bytes: global_metrics.storage_bytes,
buffered_input_records: global_metrics.buffered_input_records,
total_processed_records: global_metrics.total_processed_records,
input_bytes,
input_errors,
}
}
}
#[derive(Debug, Serialize)]
struct Metric<T> {
value: T,
#[serde(skip_serializing_if = "Option::is_none")]
lower_value: Option<T>,
#[serde(skip_serializing_if = "Option::is_none")]
upper_value: Option<T>,
}
impl<T> Metric<T> {
fn simple(value: T) -> Self {
Self {
value,
lower_value: None,
upper_value: None,
}
}
}
#[derive(Debug, Serialize)]
struct PipelineMetrics {
throughput: Metric<i64>,
memory: Metric<i64>,
storage: Metric<i64>,
uptime: Metric<i64>,
#[serde(
rename = "state-amplification",
skip_serializing_if = "Option::is_none"
)]
state_amplification: Option<Metric<f64>>,
#[serde(rename = "buffered-input-records")]
buffered_input_records: Metric<i64>,
}
impl PipelineMetrics {
fn new(metrics: Vec<RawMetrics>) -> Self {
if metrics.is_empty() {
eprintln!("No measurements were recorded? Maybe try to increase `duration`.");
std::process::exit(1);
}
let last = metrics.last().unwrap();
let throughput =
(last.total_processed_records as f64 / (last.uptime_msecs as f64 / 1000.0)) as i64;
let memory = Metric {
value: metrics
.iter()
.map(|m| m.rss_bytes)
.max_by(|a, b| a.cmp(b))
.unwrap(),
lower_value: Some(
metrics
.iter()
.map(|m| m.rss_bytes)
.min_by(|a, b| a.cmp(b))
.unwrap(),
),
upper_value: None,
};
let storage = Metric {
value: metrics
.iter()
.map(|m| m.storage_bytes)
.max_by(|a, b| a.cmp(b))
.unwrap(),
lower_value: Some(
metrics
.iter()
.map(|m| m.storage_bytes)
.min_by(|a, b| a.cmp(b))
.unwrap(),
),
upper_value: None,
};
let buffered_input_records = Metric {
value: metrics
.iter()
.map(|m| m.buffered_input_records)
.sum::<i64>()
/ metrics.iter().len() as i64,
lower_value: metrics
.iter()
.map(|m| m.buffered_input_records)
.min_by(|a, b| a.cmp(b)),
upper_value: metrics
.iter()
.map(|m| m.buffered_input_records)
.max_by(|a, b| a.cmp(b)),
};
if buffered_input_records.lower_value.unwrap_or_default() == 0 {
eprintln!(
"Input buffering was 0 for {} samples, we're likely not sending enough data to the pipeline for evaluating its true performance.",
metrics
.iter()
.map(|m| m.buffered_input_records)
.filter(|b| *b == 0)
.count()
);
}
let input_bytes = metrics.iter().last().map(|m| m.input_bytes).unwrap();
let state_amplification = if input_bytes > 0 {
Some(Metric::simple(storage.value as f64 / input_bytes as f64))
} else {
None
};
Self {
throughput: Metric::simple(throughput),
memory,
storage,
state_amplification,
buffered_input_records,
uptime: Metric::simple(last.uptime_msecs),
}
}
}
#[derive(Debug, Serialize)]
struct Benchmark {
name: String,
#[serde(flatten)]
metrics: PipelineMetrics,
}
impl Benchmark {
fn new(name: String, metrics: PipelineMetrics) -> Self {
Self { name, metrics }
}
fn as_map(&self) -> Map<String, Value> {
let mut map = Map::new();
map.insert(
self.name.clone(),
serde_json::to_value(&self.metrics).unwrap(),
);
map
}
fn format_as_text(&self) -> String {
let mut rows = vec![];
rows.push([
"Metric".to_string(),
"Value".to_string(),
"Lower".to_string(),
"Upper".to_string(),
]);
rows.push([
"Throughput (records/s)".to_string(),
format!("{}", self.metrics.throughput.value),
"-".to_string(),
"-".to_string(),
]);
rows.push([
"Memory".to_string(),
human_readable_bytes(self.metrics.memory.value),
human_readable_bytes(self.metrics.memory.lower_value.unwrap()),
"-".to_string(),
]);
rows.push([
"Storage".to_string(),
human_readable_bytes(self.metrics.storage.value),
human_readable_bytes(self.metrics.storage.lower_value.unwrap()),
"-".to_string(),
]);
rows.push([
"Uptime [ms]".to_string(),
self.metrics.uptime.value.to_string(),
"-".to_string(),
"-".to_string(),
]);
if let Some(amp) = &self.metrics.state_amplification {
rows.push([
"State Amplification".to_string(),
format!("{:.2}", amp.value),
"-".to_string(),
"-".to_string(),
]);
}
format!(
"Benchmark Results:\n{}",
Builder::from_iter(rows).build().with(Style::rounded())
)
}
fn format(&self, format: OutputFormat) -> String {
match format {
OutputFormat::Json => serde_json::to_string_pretty(&self.as_map()).unwrap(),
OutputFormat::Text => self.format_as_text(),
OutputFormat::ArrowIpc
| OutputFormat::Parquet
| OutputFormat::Prometheus
| OutputFormat::Hash => {
warn!(
"Format '{}' is not supported for benchmark results, falling back to text format",
format
);
self.format_as_text()
}
}
}
}
async fn collect_metrics(
client: &Client,
pipeline_name: &str,
duration: Option<u64>,
needs_commit: bool,
) -> Vec<feldera_rest_api::types::ControllerStatus> {
enum PipelineStatus<T> {
Ingesting,
Committing(T),
}
let mut metrics = Vec::new();
let start_time = Instant::now();
let duration = duration.map(Duration::from_secs);
let mut status = PipelineStatus::Ingesting;
loop {
match client
.get_pipeline_stats()
.pipeline_name(pipeline_name.to_string())
.send()
.await
{
Ok(stats) => {
let stats = stats.into_inner();
metrics.push(stats.clone());
info!("Collected metrics at {}s", start_time.elapsed().as_secs());
match status {
PipelineStatus::Ingesting => {
if stats.global_metrics.pipeline_complete {
println!("Pipeline completed, stopping benchmark");
if needs_commit {
status = PipelineStatus::Committing(Box::pin(
client
.commit_transaction()
.pipeline_name(pipeline_name)
.send(),
));
} else {
break;
}
}
if let Some(duration) = duration
&& start_time.elapsed() >= duration
{
println!("Reached duration limit of {}s", duration.as_secs());
if needs_commit {
status = PipelineStatus::Committing(Box::pin(
client
.commit_transaction()
.pipeline_name(pipeline_name)
.send(),
));
} else {
break;
}
}
sleep(Duration::from_secs(1)).await;
}
PipelineStatus::Committing(ref mut fut) => {
tokio::select! {
res = fut => {
if let Err(e) = res {
eprintln!("Failed to commit transaction: {e}");
std::process::exit(1);
}
break;
}
_ = sleep(Duration::from_secs(1)) => {
}
}
}
}
}
Err(e) => {
eprintln!("Failed to collect metrics: {}", e);
std::process::exit(1);
}
}
}
metrics
}
async fn transform_to_bmf(
client: &Client,
name: String,
format: OutputFormat,
metrics: Vec<feldera_rest_api::types::ControllerStatus>,
) -> Benchmark {
let raw_metrics: Vec<RawMetrics> = metrics.iter().map(RawMetrics::from).collect();
if let Some(first) = raw_metrics.first() {
for metric in &raw_metrics {
if metric.incarnation_uuid != first.incarnation_uuid {
error!(
"Inconsistent incarnation_uuid detected during benchmark, did the program restart while measuring?"
);
std::process::exit(1);
}
}
}
if raw_metrics.iter().any(|m| m.input_errors) {
let _r = Box::pin(crate::pipeline(
format,
PipelineAction::Logs { name, watch: false },
client.clone(),
))
.await;
eprintln!("Detected errors in input connectors during result processing, abort benchmark.");
std::process::exit(1);
}
Benchmark::new(name, PipelineMetrics::new(raw_metrics))
}
pub(crate) async fn bench(client: Client, format: OutputFormat, args: BenchmarkArgs) {
let pipeline_name = args.name.clone();
println!("Benchmarking {pipeline_name}");
let recompile = !args.no_recompile;
let pipeline = match client
.get_pipeline()
.pipeline_name(pipeline_name.clone())
.send()
.await
{
Ok(p) => p,
Err(_) => {
let _ =
handle_errors_fatal(client.baseurl().clone(), "Failed to get pipeline status", 1);
unreachable!()
}
};
let compilation_profile = pipeline
.as_ref()
.program_config
.as_ref()
.and_then(|cp| cp.profile)
.unwrap_or(CompilationProfile::Optimized);
if compilation_profile != CompilationProfile::Optimized
&& compilation_profile != CompilationProfile::OptimizedSymbols
{
warn!(
"Compilation profile was set to `{compilation_profile:?}`. This is most likely not what you want to benchmark with. Set it to `optimized` or `optimized_symbols` for best performance.",
);
}
if let Some(runtime_config) = &pipeline.runtime_config
&& runtime_config.storage.is_none()
{
warn!("Storage is not enabled for this pipeline. This may affect benchmark performance.");
}
let _ = Box::pin(crate::pipeline(
OutputFormat::Text,
PipelineAction::Stop {
name: pipeline_name.clone(),
no_wait: false,
checkpoint: false,
},
client.clone(),
))
.await;
let _ = Box::pin(crate::pipeline(
OutputFormat::Text,
PipelineAction::Start {
name: pipeline_name.clone(),
recompile,
no_wait: false,
initial: "running".to_string(),
bootstrap_policy: "allow".to_string(),
no_dismiss_error: false,
},
client.clone(),
))
.await;
let need_commit = if !args.no_transaction {
match client
.start_transaction()
.pipeline_name(pipeline_name.clone())
.send()
.await
{
Ok(_) => {
info!("Transaction started for pipeline '{}'", pipeline_name);
true
}
Err(e) => {
warn!(
"Failed to start transaction for pipeline '{}' due to {e}. This may affect benchmark results.",
pipeline_name
);
false
}
}
} else {
false
};
let start_time: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
println!("Collecting metrics...");
let metrics = collect_metrics(&client, &pipeline_name, args.duration, need_commit).await;
let end_time: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
let _ = Box::pin(crate::pipeline(
OutputFormat::Text,
PipelineAction::Stop {
name: pipeline_name.clone(),
no_wait: true,
checkpoint: false,
},
client.clone(),
))
.await;
let bmf_benchmark = transform_to_bmf(&client, pipeline_name, format, metrics).await;
println!("{}", bmf_benchmark.format(format));
if args.upload {
let feldera_instance_config = get_config(&client)
.await
.map_err(handle_errors_fatal(
client.baseurl().clone(),
"Unable to fetch config of feldera instance",
1,
))
.unwrap();
let feldera_testbed = Url::parse(client.baseurl())
.ok()
.and_then(|url| url.host_str().map(|s| s.to_string()))
.unwrap_or_else(|| client.baseurl().to_string());
upload_result(
args,
feldera_testbed,
feldera_instance_config,
pipeline.program_config.clone(),
bmf_benchmark,
start_time,
end_time,
)
.await
.unwrap_or_else(|e| {
error!("Failed to upload benchmark result: {}", e);
std::process::exit(1);
});
}
}
async fn get_config(client: &Client) -> Result<Configuration, ProgenitorError<ErrorResponse>> {
let config = client.get_config().send().await?;
let config_map = config.into_inner();
Ok(config_map)
}
async fn upload_result(
args: BenchmarkArgs,
feldera_testbed: String,
feldera_instance_config: Configuration,
program_config: Option<ProgramConfig>,
benchmark_data: Benchmark,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> Result<(), Box<dyn Error>> {
let mut headers = reqwest::header::HeaderMap::new();
if let Some(token) = &args.benchmark_token {
let authorization_header = format!("Bearer {}", token);
headers.insert(
reqwest::header::AUTHORIZATION,
authorization_header.parse().unwrap(),
);
} else {
warn!("No benchmark token provided, trying to upload without one.");
}
let client_with_benchmark_auth = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(15))
.timeout(Duration::from_secs(15))
.default_headers(headers)
.build()
.unwrap();
let client = BenchClient::new_with_client(&args.benchmark_host, client_with_benchmark_auth);
let mut run_context: HashMap<String, String> = HashMap::new();
run_context.insert(
"bencher.dev/v0/repo/name".to_string(),
format!("feldera {}", feldera_instance_config.edition),
);
run_context.insert(
"bencher.dev/v0/branch/hash".to_string(),
feldera_instance_config.revision.clone(),
);
run_context.insert(
"bencher.dev/v0/branch/ref/name".to_string(),
args.branch.clone(),
);
match feldera_instance_config.edition.as_str() {
"Open source" => {
run_context.insert(
"bencher.dev/v0/repo/hash".to_string(),
"de8879fbda0c9e9392e3b94064c683a1b4bae216".to_string(),
);
}
"Enterprise" => {
run_context.insert(
"bencher.dev/v0/repo/hash".to_string(),
"751db38ff821d73bcc67c836af421d76d4d42bdd".to_string(),
);
}
_ => {
warn!(
"Unknown edition '{}', not setting repo hash",
feldera_instance_config.edition
);
}
}
let git_hash: Option<GitHash> = match (
program_config.and_then(|pc| pc.runtime_version.clone()),
feldera_instance_config.runtime_revision,
) {
(Some(r), _) => Some(GitHash(r)),
(None, r) if !r.is_empty() => Some(GitHash(r.clone())),
_ => None,
};
let project = ResourceId(args.project);
let testbed = NameId(feldera_testbed);
let result = benchmark_data.format(OutputFormat::Json);
let settings: JsonReportSettings = JsonReportSettings::builder()
.adapter(Adapter::Json)
.try_into()?;
let branch = NameId(args.branch);
let json_start_point = args.start_point.map(|st| {
JsonUpdateStartPoint::builder()
.branch(NameId(st))
.clone_thresholds(args.start_point_clone_thresholds)
.hash(args.start_point_hash.map(GitHash))
.max_versions(args.start_point_max_versions)
.reset(args.start_point_reset)
.try_into()
.expect("Unable to build JsonUpdateStartPoint")
});
let thresholds = None;
let run: JsonNewRun = JsonNewRun::builder()
.branch(branch)
.project(project)
.context(Some(run_context.into()))
.start_time(start_time)
.end_time(end_time)
.hash(git_hash)
.results(vec![result])
.settings(settings)
.start_point(json_start_point)
.testbed(testbed)
.thresholds(thresholds)
.try_into()?;
let r = client.run_post().body(run).send().await?;
if r.status().is_success() {
info!("Benchmark result uploaded successfully.");
} else {
warn!("Failed to upload benchmark result: {}", r.status());
}
Ok(())
}