1use crate::cli::BenchArgs;
8use crate::commands::run::parse_duration;
9use crate::errors::CliError;
10use crate::output::terminal;
11use crate::output::{CommandResult, HumanRenderable, OutputContext};
12use serde::Serialize;
13use std::path::PathBuf;
14use std::time::Instant;
15
16#[derive(Debug, Serialize)]
18pub struct BenchReport {
19 pub flow_name: String,
21 pub warmup_secs: f64,
23 pub measurement_secs: f64,
25 pub throughput: ThroughputReport,
27 pub latency: LatencyReport,
29 pub per_component: Vec<ComponentBenchRow>,
31 pub resources: ResourceReport,
33 pub scheduling: SchedulingReport,
35 pub saved_to: Option<PathBuf>,
37}
38
39#[derive(Debug, Serialize)]
41pub struct ThroughputReport {
42 pub elements_per_sec: f64,
44 pub bytes_per_sec: f64,
46}
47
48#[derive(Debug, Serialize)]
50pub struct LatencyReport {
51 pub p50_us: f64,
53 pub p90_us: f64,
55 pub p95_us: f64,
57 pub p99_us: f64,
59 pub p999_us: f64,
61 pub max_us: f64,
63}
64
65#[derive(Debug, Serialize)]
67pub struct ComponentBenchRow {
68 pub component: String,
70 pub p50_us: f64,
72 pub p99_us: f64,
74}
75
76#[derive(Debug, Serialize)]
78pub struct ResourceReport {
79 pub buffer_allocs: u64,
81 pub pool_reuse_pct: f64,
83 pub total_copies: u64,
85 pub peak_memory_bytes: u64,
87}
88
89#[derive(Debug, Serialize)]
91pub struct SchedulingReport {
92 pub total_wakeups: u64,
94 pub backpressure_events: u64,
96 pub queue_peak: u64,
98 pub queue_capacity: u64,
100}
101
102impl HumanRenderable for BenchReport {
103 fn render_human(&self, ctx: &OutputContext) {
104 terminal::print_header(ctx, "Throughput");
105 terminal::print_kv(
106 ctx,
107 "Elements/s",
108 &format!("{:.0}", self.throughput.elements_per_sec),
109 );
110 terminal::print_kv(
111 ctx,
112 "Bytes/s",
113 &terminal::format_bytes(self.throughput.bytes_per_sec as u64),
114 );
115
116 terminal::print_header(ctx, "Latency (µs)");
117 terminal::print_kv(ctx, "p50", &format!("{:.1}", self.latency.p50_us));
118 terminal::print_kv(ctx, "p90", &format!("{:.1}", self.latency.p90_us));
119 terminal::print_kv(ctx, "p95", &format!("{:.1}", self.latency.p95_us));
120 terminal::print_kv(ctx, "p99", &format!("{:.1}", self.latency.p99_us));
121 terminal::print_kv(ctx, "p999", &format!("{:.1}", self.latency.p999_us));
122 terminal::print_kv(ctx, "max", &format!("{:.1}", self.latency.max_us));
123
124 if !self.per_component.is_empty() {
125 terminal::print_header(ctx, "Per-Component Latency (µs, p50)");
126 for row in &self.per_component {
127 terminal::print_kv(ctx, &row.component, &format!("{:.1}", row.p50_us));
128 }
129 }
130
131 terminal::print_header(ctx, "Resources");
132 terminal::print_kv(
133 ctx,
134 "Buffer allocs",
135 &format!("{}", self.resources.buffer_allocs),
136 );
137 terminal::print_kv(
138 ctx,
139 "Pool reuse rate",
140 &format!("{:.1}%", self.resources.pool_reuse_pct),
141 );
142 terminal::print_kv(
143 ctx,
144 "Total copies",
145 &format!("{}", self.resources.total_copies),
146 );
147 terminal::print_kv(
148 ctx,
149 "Peak memory",
150 &terminal::format_bytes(self.resources.peak_memory_bytes),
151 );
152
153 terminal::print_header(ctx, "Scheduling");
154 terminal::print_kv(
155 ctx,
156 "Total wakeups",
157 &format!("{}", self.scheduling.total_wakeups),
158 );
159 terminal::print_kv(
160 ctx,
161 "Backpressure events",
162 &format!("{}", self.scheduling.backpressure_events),
163 );
164 terminal::print_kv(
165 ctx,
166 "Queue peak",
167 &format!(
168 "{} / {}",
169 self.scheduling.queue_peak, self.scheduling.queue_capacity
170 ),
171 );
172
173 if let Some(path) = &self.saved_to {
174 eprintln!();
175 eprintln!(" Result saved to: {}", path.display());
176 }
177 }
178}
179
180pub async fn execute(
191 args: &BenchArgs,
192 ctx: &OutputContext,
193) -> Result<CommandResult<BenchReport>, CliError> {
194 let manifest_path = &args.manifest;
195
196 if !manifest_path.exists() {
197 return Err(CliError::Config {
198 detail: format!("Manifest not found: {}", manifest_path.display()),
199 file: Some(manifest_path.display().to_string()),
200 suggestion: "Run this command from a Torvyn project directory.".into(),
201 });
202 }
203
204 let warmup_dur = parse_duration(&args.warmup).map_err(|e| CliError::Config {
205 detail: format!("Invalid warmup duration: {e}"),
206 file: None,
207 suggestion: "Use a duration like '2s' or '5s'.".into(),
208 })?;
209
210 let bench_dur = parse_duration(&args.duration).map_err(|e| CliError::Config {
211 detail: format!("Invalid benchmark duration: {e}"),
212 file: None,
213 suggestion: "Use a duration like '10s' or '30s'.".into(),
214 })?;
215
216 let manifest_content = std::fs::read_to_string(manifest_path).map_err(|e| CliError::Io {
217 detail: e.to_string(),
218 path: Some(manifest_path.display().to_string()),
219 })?;
220
221 let manifest = torvyn_config::ComponentManifest::from_toml_str(
222 &manifest_content,
223 manifest_path.to_str().unwrap_or("Torvyn.toml"),
224 )
225 .map_err(|errors| CliError::Config {
226 detail: format!("Manifest has {} error(s)", errors.len()),
227 file: Some(manifest_path.display().to_string()),
228 suggestion: "Run `torvyn check` first.".into(),
229 })?;
230
231 let flow_name = args
232 .flow
233 .clone()
234 .or_else(|| manifest.flow.keys().next().cloned())
235 .ok_or_else(|| CliError::Config {
236 detail: "No flow defined in manifest".into(),
237 file: Some(manifest_path.display().to_string()),
238 suggestion: "Add a [flow.*] section.".into(),
239 })?;
240
241 if !ctx.quiet && ctx.format == crate::cli::OutputFormat::Human {
242 eprintln!(
243 "▶ Benchmarking flow \"{}\" (warmup: {:.0}s, duration: {:.0}s)",
244 flow_name,
245 warmup_dur.as_secs_f64(),
246 bench_dur.as_secs_f64(),
247 );
248 }
249
250 let obs_config = torvyn_config::ObservabilityConfig {
252 metrics_enabled: true,
253 ..Default::default()
254 };
255
256 let mut host = torvyn_host::HostBuilder::new()
257 .with_config_file(manifest_path)
258 .with_observability_config(obs_config)
259 .build()
260 .await
261 .map_err(|e| CliError::Runtime {
262 detail: format!("Failed to initialize host: {e}"),
263 context: None,
264 })?;
265
266 let _flow_id = host
268 .start_flow(&flow_name)
269 .await
270 .map_err(|e| CliError::Runtime {
271 detail: format!("Failed to start flow: {e}"),
272 context: Some(flow_name.clone()),
273 })?;
274
275 tokio::time::sleep(warmup_dur).await;
277
278 let bench_start = Instant::now();
280 tokio::time::sleep(bench_dur).await;
281 let bench_elapsed = bench_start.elapsed();
282
283 host.shutdown().await.ok();
285
286 let elapsed_secs = bench_elapsed.as_secs_f64();
287
288 let report = BenchReport {
290 flow_name: flow_name.clone(),
291 warmup_secs: warmup_dur.as_secs_f64(),
292 measurement_secs: elapsed_secs,
293 throughput: ThroughputReport {
294 elements_per_sec: 0.0,
295 bytes_per_sec: 0.0,
296 },
297 latency: LatencyReport {
298 p50_us: 0.0,
299 p90_us: 0.0,
300 p95_us: 0.0,
301 p99_us: 0.0,
302 p999_us: 0.0,
303 max_us: 0.0,
304 },
305 per_component: vec![],
306 resources: ResourceReport {
307 buffer_allocs: 0,
308 pool_reuse_pct: 0.0,
309 total_copies: 0,
310 peak_memory_bytes: 0,
311 },
312 scheduling: SchedulingReport {
313 total_wakeups: 0,
314 backpressure_events: 0,
315 queue_peak: 0,
316 queue_capacity: 64,
317 },
318 saved_to: None,
319 };
320
321 let project_dir = manifest_path.parent().unwrap_or(std::path::Path::new("."));
323 let bench_dir = project_dir.join(".torvyn").join("bench");
324 std::fs::create_dir_all(&bench_dir).ok();
325
326 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H-%M-%S").to_string();
327 let report_path = bench_dir.join(format!("{timestamp}.json"));
328
329 let saved_to = if let Ok(json) = serde_json::to_string_pretty(&report) {
330 if std::fs::write(&report_path, &json).is_ok() {
331 Some(report_path)
332 } else {
333 None
334 }
335 } else {
336 None
337 };
338
339 let mut final_report = report;
340 final_report.saved_to = saved_to;
341
342 Ok(CommandResult {
343 success: true,
344 command: "bench".into(),
345 data: final_report,
346 warnings: vec![],
347 })
348}