use std::path::Path;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tracing::{debug, info, warn};
use crate::config::ExternalFrameworkConfig;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ExternalMetricsOutput {
pub framework: String,
pub cold_start_us: u64,
pub first_llm_call_epoch_ns: u64,
pub loop_overhead: ExternalDurationStats,
#[serde(skip_serializing_if = "Option::is_none")]
pub peak_rss_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub throughput_agents_per_sec: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub token_overhead: Option<ExternalTokenOverhead>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ExternalDurationStats {
pub min_us: u64,
pub max_us: u64,
pub mean_us: u64,
pub median_us: u64,
pub p95_us: u64,
pub p99_us: u64,
pub count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ExternalTokenOverhead {
pub total_tokens: u64,
pub user_content_tokens: u64,
pub overhead_tokens: u64,
}
pub struct ExternalRunner {
timeout: Duration,
}
impl ExternalRunner {
pub fn new(timeout_secs: u64) -> Self {
Self { timeout: Duration::from_secs(timeout_secs) }
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub async fn run(
&self,
config: &ExternalFrameworkConfig,
workload_path: &str,
) -> crate::Result<ExternalMetricsOutput> {
info!(
framework = %config.name,
command = %config.command,
workload = %workload_path,
"starting external framework benchmark"
);
let start_epoch_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let mut cmd = Command::new(&config.command);
cmd.args(&config.args);
cmd.arg(workload_path);
cmd.env("BENCH_START_EPOCH_NS", start_epoch_ns.to_string());
for (key, value) in &config.env {
cmd.env(key, value);
}
if let Some(working_dir) = &config.working_dir {
cmd.current_dir(working_dir);
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
debug!(
framework = %config.name,
start_epoch_ns = start_epoch_ns,
timeout_secs = self.timeout.as_secs(),
"spawning external framework subprocess"
);
let output = match tokio::time::timeout(self.timeout, cmd.output()).await {
Ok(Ok(output)) => output,
Ok(Err(io_err)) => {
return Err(crate::BenchError::ExternalRunner {
framework: config.name.clone(),
reason: format!("failed to spawn subprocess: {io_err}"),
});
}
Err(_elapsed) => {
warn!(
framework = %config.name,
timeout_secs = self.timeout.as_secs(),
"external framework timed out"
);
return Err(crate::BenchError::ExternalTimeout {
framework: config.name.clone(),
timeout_secs: self.timeout.as_secs(),
});
}
};
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let exit_code = output.status.code().unwrap_or(-1);
warn!(
framework = %config.name,
exit_code = exit_code,
stderr = %stderr,
"external framework exited with non-zero status"
);
return Err(crate::BenchError::ExternalRunner {
framework: config.name.clone(),
reason: format!("subprocess exited with code {exit_code}: {}", stderr.trim()),
});
}
let stdout = String::from_utf8_lossy(&output.stdout);
let stdout_trimmed = stdout.trim();
debug!(
framework = %config.name,
stdout_len = stdout_trimmed.len(),
"parsing external framework EBP output"
);
let mut metrics: ExternalMetricsOutput =
serde_json::from_str(stdout_trimmed).map_err(|e| {
crate::BenchError::ExternalRunner {
framework: config.name.clone(),
reason: format!("failed to parse EBP JSON output: {e}"),
}
})?;
let computed_cold_start_ns = metrics.first_llm_call_epoch_ns.saturating_sub(start_epoch_ns);
let computed_cold_start_us = computed_cold_start_ns / 1000;
metrics.cold_start_us = computed_cold_start_us;
info!(
framework = %metrics.framework,
cold_start_us = computed_cold_start_us,
loop_overhead_mean_us = metrics.loop_overhead.mean_us,
"external framework benchmark completed"
);
Ok(metrics)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExternalConfigFile {
pub frameworks: Vec<ExternalFrameworkConfig>,
}
pub fn load_external_configs(path: &Path) -> crate::Result<Vec<ExternalFrameworkConfig>> {
let content = std::fs::read_to_string(path)?;
let config_file: ExternalConfigFile = serde_json::from_str(&content).map_err(|e| {
crate::BenchError::Serialization(format!(
"failed to parse external config file '{}': {e}",
path.display()
))
})?;
Ok(config_file.frameworks)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_external_metrics_output_deserialize() {
let json = r#"{
"framework": "langgraph",
"cold_start_us": 45000,
"first_llm_call_epoch_ns": 1705312800000045000,
"loop_overhead": {
"min_us": 120,
"max_us": 890,
"mean_us": 340,
"median_us": 310,
"p95_us": 780,
"p99_us": 870,
"count": 10
},
"throughput_agents_per_sec": 12.5,
"peak_rss_bytes": 52428800,
"token_overhead": {
"total_tokens": 1200,
"user_content_tokens": 950,
"overhead_tokens": 250
}
}"#;
let metrics: ExternalMetricsOutput = serde_json::from_str(json).unwrap();
assert_eq!(metrics.framework, "langgraph");
assert_eq!(metrics.cold_start_us, 45000);
assert_eq!(metrics.first_llm_call_epoch_ns, 1705312800000045000);
assert_eq!(metrics.loop_overhead.min_us, 120);
assert_eq!(metrics.loop_overhead.max_us, 890);
assert_eq!(metrics.loop_overhead.mean_us, 340);
assert_eq!(metrics.loop_overhead.median_us, 310);
assert_eq!(metrics.loop_overhead.p95_us, 780);
assert_eq!(metrics.loop_overhead.p99_us, 870);
assert_eq!(metrics.loop_overhead.count, 10);
assert_eq!(metrics.throughput_agents_per_sec, Some(12.5));
assert_eq!(metrics.peak_rss_bytes, Some(52428800));
let token_overhead = metrics.token_overhead.unwrap();
assert_eq!(token_overhead.total_tokens, 1200);
assert_eq!(token_overhead.user_content_tokens, 950);
assert_eq!(token_overhead.overhead_tokens, 250);
}
#[test]
fn test_external_metrics_output_deserialize_minimal() {
let json = r#"{
"framework": "crewai",
"cold_start_us": 120000,
"first_llm_call_epoch_ns": 1705312800000120000,
"loop_overhead": {
"min_us": 500,
"max_us": 2000,
"mean_us": 1000,
"median_us": 900,
"p95_us": 1800,
"p99_us": 1950,
"count": 5
}
}"#;
let metrics: ExternalMetricsOutput = serde_json::from_str(json).unwrap();
assert_eq!(metrics.framework, "crewai");
assert_eq!(metrics.cold_start_us, 120000);
assert_eq!(metrics.peak_rss_bytes, None);
assert_eq!(metrics.throughput_agents_per_sec, None);
assert_eq!(metrics.token_overhead, None);
}
#[test]
fn test_external_metrics_output_serialize_roundtrip() {
let metrics = ExternalMetricsOutput {
framework: "test-framework".to_string(),
cold_start_us: 5000,
first_llm_call_epoch_ns: 1000000005000000,
loop_overhead: ExternalDurationStats {
min_us: 100,
max_us: 500,
mean_us: 250,
median_us: 230,
p95_us: 450,
p99_us: 490,
count: 20,
},
peak_rss_bytes: Some(1024 * 1024 * 50),
throughput_agents_per_sec: Some(8.5),
token_overhead: Some(ExternalTokenOverhead {
total_tokens: 1000,
user_content_tokens: 800,
overhead_tokens: 200,
}),
};
let json = serde_json::to_string(&metrics).unwrap();
let deserialized: ExternalMetricsOutput = serde_json::from_str(&json).unwrap();
assert_eq!(metrics, deserialized);
}
#[test]
fn test_external_runner_new() {
let runner = ExternalRunner::new(120);
assert_eq!(runner.timeout(), Duration::from_secs(120));
}
#[test]
fn test_external_runner_default_timeout() {
let runner = ExternalRunner::new(300);
assert_eq!(runner.timeout(), Duration::from_secs(300));
}
#[test]
fn test_external_config_file_deserialize() {
let json = r#"{
"frameworks": [
{
"name": "adk-python",
"command": "python",
"args": ["-m", "adk_bench", "--workload"],
"workingDir": "../adk-python",
"env": [["GOOGLE_API_KEY", "test-key"]]
},
{
"name": "langgraph",
"command": "python",
"args": ["bench_runner.py"],
"env": []
}
]
}"#;
let config_file: ExternalConfigFile = serde_json::from_str(json).unwrap();
assert_eq!(config_file.frameworks.len(), 2);
assert_eq!(config_file.frameworks[0].name, "adk-python");
assert_eq!(config_file.frameworks[0].command, "python");
assert_eq!(config_file.frameworks[0].args, vec!["-m", "adk_bench", "--workload"]);
assert_eq!(
config_file.frameworks[0].working_dir,
Some(std::path::PathBuf::from("../adk-python"))
);
assert_eq!(config_file.frameworks[1].name, "langgraph");
assert_eq!(config_file.frameworks[1].working_dir, None);
}
#[test]
fn test_load_external_configs_file_not_found() {
let result = load_external_configs(Path::new("/nonexistent/path/config.json"));
assert!(result.is_err());
}
#[tokio::test]
async fn test_external_runner_spawn_failure() {
let runner = ExternalRunner::new(10);
let config = ExternalFrameworkConfig {
name: "nonexistent".to_string(),
command: "/this/command/does/not/exist/anywhere".to_string(),
args: vec![],
working_dir: None,
env: vec![],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
crate::BenchError::ExternalRunner { framework, reason } => {
assert_eq!(framework, "nonexistent");
assert!(reason.contains("failed to spawn subprocess"));
}
_ => panic!("expected ExternalRunner error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_external_runner_non_zero_exit() {
let runner = ExternalRunner::new(10);
let config = ExternalFrameworkConfig {
name: "failing-script".to_string(),
command: "sh".to_string(),
args: vec!["-c".to_string(), "exit 1".to_string()],
working_dir: None,
env: vec![],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
crate::BenchError::ExternalRunner { framework, .. } => {
assert_eq!(framework, "failing-script");
}
_ => panic!("expected ExternalRunner error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_external_runner_invalid_json() {
let runner = ExternalRunner::new(10);
let config = ExternalFrameworkConfig {
name: "bad-json".to_string(),
command: "echo".to_string(),
args: vec!["not valid json".to_string()],
working_dir: None,
env: vec![],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
crate::BenchError::ExternalRunner { framework, reason } => {
assert_eq!(framework, "bad-json");
assert!(reason.contains("failed to parse EBP JSON output"));
}
_ => panic!("expected ExternalRunner error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_external_runner_timeout() {
let runner = ExternalRunner::new(1); let config = ExternalFrameworkConfig {
name: "slow-script".to_string(),
command: "sh".to_string(),
args: vec!["-c".to_string(), "sleep 10; #".to_string()],
working_dir: None,
env: vec![],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
crate::BenchError::ExternalTimeout { framework, timeout_secs } => {
assert_eq!(framework, "slow-script");
assert_eq!(timeout_secs, 1);
}
_ => panic!("expected ExternalTimeout error, got: {err:?}"),
}
}
#[tokio::test]
async fn test_external_runner_valid_output() {
let ebp_json = r#"{"framework":"test","cold_start_us":1000,"first_llm_call_epoch_ns":99999999999999999,"loop_overhead":{"min_us":10,"max_us":100,"mean_us":50,"median_us":45,"p95_us":90,"p99_us":95,"count":5}}"#;
let runner = ExternalRunner::new(10);
let config = ExternalFrameworkConfig {
name: "test-framework".to_string(),
command: "sh".to_string(),
args: vec!["-c".to_string(), format!("echo '{}'; #", ebp_json)],
working_dir: None,
env: vec![],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_ok());
let metrics = result.unwrap();
assert_eq!(metrics.framework, "test");
assert_eq!(metrics.loop_overhead.min_us, 10);
assert_eq!(metrics.loop_overhead.count, 5);
assert_eq!(metrics.peak_rss_bytes, None);
assert_eq!(metrics.throughput_agents_per_sec, None);
assert_eq!(metrics.token_overhead, None);
}
#[tokio::test]
async fn test_external_runner_env_injection() {
let runner = ExternalRunner::new(10);
let config = ExternalFrameworkConfig {
name: "env-test".to_string(),
command: "sh".to_string(),
args: vec![
"-c".to_string(),
r#"FIRST_CALL=$(expr $BENCH_START_EPOCH_NS + 5000000); echo "{\"framework\":\"env-test\",\"cold_start_us\":0,\"first_llm_call_epoch_ns\":$FIRST_CALL,\"loop_overhead\":{\"min_us\":1,\"max_us\":2,\"mean_us\":1,\"median_us\":1,\"p95_us\":2,\"p99_us\":2,\"count\":1}}"; #"#.to_string(),
],
working_dir: None,
env: vec![("CUSTOM_VAR".to_string(), "hello".to_string())],
};
let result = runner.run(&config, "/tmp/workload.json").await;
assert!(result.is_ok(), "run failed: {:?}", result.unwrap_err());
let metrics = result.unwrap();
assert_eq!(metrics.framework, "env-test");
assert_eq!(metrics.cold_start_us, 5000);
}
}