crows_cli/
output.rs

1use crows_utils::services::{CoordinatorClient, IterationInfo, RequestInfo, RunId, RunInfo};
2use crows_utils::{process_info_handle, InfoHandle};
3
4use std::collections::HashMap;
5use std::io::Stdout;
6use std::io::{stdout, Write};
7use std::time::Duration;
8
9use anyhow::anyhow;
10use crossterm::{
11    cursor::{self, MoveTo, MoveToNextLine, MoveUp},
12    execute,
13    style::Print,
14    terminal::{self, Clear, ClearType, ScrollUp},
15};
16
17#[derive(Default)]
18pub struct SummaryStats {
19    pub avg: Duration,
20    pub min: Duration,
21    pub med: Duration,
22    pub max: Duration,
23    pub p90: Duration,
24    pub p95: Duration,
25    pub fail_rate: f64,
26    pub success_count: usize,
27    pub fail_count: usize,
28    pub total: usize,
29}
30
31#[derive(Default)]
32pub struct WorkerState {
33    pub active_instances: isize,
34    pub capacity: isize,
35    pub done: bool,
36}
37
38#[derive(Default)]
39pub struct BarData {
40    pub worker_name: String,
41    pub active_vus: usize,
42    pub all_vus: usize,
43    pub duration: Duration,
44    pub left: Duration,
45    pub done: bool,
46}
47
48pub trait LatencyInfo {
49    fn latency(&self) -> f64;
50    fn successful(&self) -> bool;
51}
52
53impl LatencyInfo for RequestInfo {
54    fn latency(&self) -> f64 {
55        self.latency.as_secs_f64()
56    }
57
58    fn successful(&self) -> bool {
59        self.successful
60    }
61}
62
63impl LatencyInfo for IterationInfo {
64    fn latency(&self) -> f64 {
65        self.latency.as_secs_f64()
66    }
67
68    fn successful(&self) -> bool {
69        true
70    }
71}
72
73pub fn print(
74    stdout: &mut Stdout,
75    progress_lines: u16,
76    lines: Vec<String>,
77    bars: &HashMap<String, BarData>,
78    last: bool,
79) -> anyhow::Result<()> {
80    let (_, height) = terminal::size()?;
81
82    execute!(
83        stdout,
84        MoveTo(0, height - progress_lines as u16),
85        Clear(ClearType::FromCursorDown),
86        MoveUp(1),
87    )?;
88
89    for line in lines {
90        let (_, y) = cursor::position()?;
91        execute!(stdout, Print(line))?;
92        let (_, new_y) = cursor::position()?;
93        let n = new_y - y;
94        execute!(stdout, ScrollUp(n), MoveUp(n))?;
95    }
96
97    execute!(
98        stdout,
99        MoveTo(0, height - progress_lines as u16 + 1),
100        Clear(ClearType::FromCursorDown),
101        MoveUp(1),
102    )?;
103
104    for (_, bar) in bars {
105        if bar.done {
106            execute!(
107                stdout,
108                Print(format!("{}: Done", bar.worker_name,)),
109                MoveToNextLine(1),
110            )?;
111        } else {
112            let progress_percentage = bar.duration.as_secs_f64()
113                / (bar.duration.as_secs_f64() + bar.left.as_secs_f64())
114                * 100 as f64;
115            execute!(
116                stdout,
117                Print(format!(
118                    "{}: [{: <25}] {:.2}% ({}/{})",
119                    bar.worker_name,
120                    "*".repeat((progress_percentage as usize) / 4),
121                    progress_percentage,
122                    bar.active_vus,
123                    bar.all_vus,
124                )),
125                MoveToNextLine(1),
126            )?;
127        }
128    }
129    if last {
130        execute!(stdout, Print("\n"),)?;
131    }
132
133    stdout.flush()?;
134
135    Ok(())
136}
137
138pub fn format_duration(duration: Duration) -> String {
139    let secs = duration.as_secs();
140    let total_millis = duration.as_millis();
141    let total_micros = duration.as_micros();
142    let nanos = duration.subsec_nanos();
143
144    if secs > 0 {
145        format!("{:.2}s", secs as f64 + nanos as f64 / 1_000_000_000.0)
146    } else if total_millis > 0 {
147        format!(
148            "{:.2}ms",
149            total_millis as f64 + (nanos % 1_000_000) as f64 / 1_000_000.0
150        )
151    } else if total_micros > 0 {
152        format!(
153            "{:.2}µs",
154            total_micros as f64 + (nanos % 1_000) as f64 / 1_000.0
155        )
156    } else {
157        format!("{}ns", nanos)
158    }
159}
160
161fn calculate_avg(latencies: &[f64]) -> f64 {
162    latencies.iter().sum::<f64>() / latencies.len() as f64
163}
164
165fn calculate_min(latencies: &[f64]) -> f64 {
166    *latencies
167        .iter()
168        .min_by(|a, b| a.partial_cmp(b).unwrap())
169        .unwrap()
170}
171
172fn calculate_max(latencies: &[f64]) -> f64 {
173    *latencies
174        .iter()
175        .max_by(|a, b| a.partial_cmp(b).unwrap())
176        .unwrap()
177}
178
179fn calculate_percentile(latencies: &[f64], percentile: f64) -> f64 {
180    let idx = (percentile / 100.0 * latencies.len() as f64).ceil() as usize - 1;
181    latencies[idx]
182}
183
184fn calculate_median(latencies: &[f64]) -> f64 {
185    let mid = latencies.len() / 2;
186    if latencies.len() % 2 == 0 {
187        (latencies[mid - 1] + latencies[mid]) / 2.0
188    } else {
189        latencies[mid]
190    }
191}
192
193pub fn calculate_summary<T>(latencies: &Vec<T>) -> SummaryStats
194where
195    T: LatencyInfo,
196{
197    let mut latencies_sorted: Vec<f64> = latencies.iter().map(|l| l.latency()).collect();
198    latencies_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
199
200    let fail_count = latencies.iter().filter(|l| !l.successful()).count();
201    let success_count = latencies.iter().filter(|l| l.successful()).count();
202    let fail_rate = fail_count as f64 / latencies.len() as f64;
203
204    SummaryStats {
205        avg: Duration::from_secs_f64(calculate_avg(&latencies_sorted)),
206        min: Duration::from_secs_f64(calculate_min(&latencies_sorted)),
207        max: Duration::from_secs_f64(calculate_max(&latencies_sorted)),
208        med: Duration::from_secs_f64(calculate_median(&latencies_sorted)),
209        p90: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 90.0)),
210        p95: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 95.0)),
211        total: latencies.len(),
212        fail_rate,
213        success_count,
214        fail_count,
215    }
216}
217
218pub trait ProgressFetcher {
219    #[allow(async_fn_in_trait)]
220    async fn get_run_status(
221        &mut self,
222        id: RunId,
223    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>>;
224}
225
226impl ProgressFetcher for CoordinatorClient {
227    async fn get_run_status(
228        &mut self,
229        id: RunId,
230    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>> {
231        CoordinatorClient::get_run_status(self, id).await
232    }
233}
234
235pub struct LocalProgressFetcher {
236    info_handle: InfoHandle,
237    worker_name: String,
238}
239
240impl LocalProgressFetcher {
241    pub fn new(info_handle: InfoHandle, worker_name: String) -> Self {
242        Self {
243            info_handle,
244            worker_name,
245        }
246    }
247}
248
249impl ProgressFetcher for LocalProgressFetcher {
250    async fn get_run_status(
251        &mut self,
252        _: RunId,
253    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>> {
254        let run_info = process_info_handle(&mut self.info_handle).await;
255        Ok(Some(vec![(self.worker_name.clone(), run_info)].into_iter().collect()))
256    }
257}
258
259pub async fn drive_progress<T>(
260    client: &mut T,
261    run_id: &RunId,
262    worker_names: Vec<String>,
263) -> anyhow::Result<()>
264where
265    T: ProgressFetcher,
266{
267    let mut stdout = stdout();
268
269    let progress_lines = worker_names.len() as u16;
270
271    let mut all_request_stats: Vec<RequestInfo> = Vec::new();
272    let mut all_iteration_stats: Vec<IterationInfo> = Vec::new();
273    let mut worker_states: HashMap<String, WorkerState> = HashMap::new();
274    let mut bars = HashMap::new();
275
276    for name in worker_names {
277        worker_states.insert(name.clone(), Default::default());
278
279        bars.insert(
280            name.clone(),
281            BarData {
282                worker_name: name.clone(),
283                left: Duration::from_secs(1),
284                ..Default::default()
285            },
286        );
287    }
288
289    loop {
290        let mut lines = Vec::new();
291        let result = client.get_run_status(run_id.clone()).await.unwrap();
292
293        if worker_states.values().all(|s| s.done) {
294            break;
295        }
296
297        for (worker_name, run_info) in result.unwrap().iter() {
298            let state = worker_states
299                .get_mut(worker_name)
300                .ok_or(anyhow!("Couldn't findt the worker"))?;
301            state.active_instances += run_info.active_instances_delta;
302            state.capacity += run_info.capacity_delta;
303
304            all_request_stats.extend(run_info.request_stats.clone());
305            all_iteration_stats.extend(run_info.iteration_stats.clone());
306
307            for log_line in &run_info.stdout {
308                lines.push(format!(
309                    "[INFO][{worker_name}] {}",
310                    String::from_utf8_lossy(log_line)
311                ));
312            }
313            for log_line in &run_info.stderr {
314                lines.push(format!(
315                    "[ERROR][{worker_name}] {}",
316                    String::from_utf8_lossy(log_line)
317                ));
318            }
319
320            if run_info.done {
321                state.done = true;
322            }
323
324            let bar = bars
325                .get_mut(worker_name)
326                .ok_or(anyhow!("Couldn't find bar data for worker {worker_name}"))?;
327            bar.active_vus = state.active_instances as usize;
328            bar.all_vus = state.capacity as usize;
329            if let Some(duration) = run_info.elapsed {
330                bar.duration = duration;
331            }
332            if let Some(left) = run_info.left {
333                bar.left = left;
334            }
335            bar.done = state.done;
336        }
337
338        print(&mut stdout, progress_lines, lines, &bars, false).unwrap();
339        tokio::time::sleep(Duration::from_millis(250)).await;
340    }
341
342    let request_summary = calculate_summary(&all_request_stats);
343    let iteration_summary = calculate_summary(&all_iteration_stats);
344
345    let mut lines = Vec::new();
346    lines.push(format!("\n\nSummary:\n"));
347    lines.push(format!(
348        "http_req_duration..........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n",
349        format_duration(request_summary.avg),
350        format_duration(request_summary.min),
351        format_duration(request_summary.med),
352        format_duration(request_summary.max),
353        format_duration(request_summary.p90),
354        format_duration(request_summary.p95)
355    ));
356    lines.push(format!(
357        "http_req_failed............: {:.2}%\t✓ {}\t✗ {}\n",
358        request_summary.fail_rate, request_summary.success_count, request_summary.fail_count
359    ));
360    lines.push(format!(
361        "http_reqs..................: {}\n",
362        request_summary.total
363    ));
364    lines.push(format!(
365        "iteration_duration.........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n",
366        format_duration(iteration_summary.avg),
367        format_duration(iteration_summary.min),
368        format_duration(iteration_summary.med),
369        format_duration(iteration_summary.max),
370        format_duration(iteration_summary.p90),
371        format_duration(iteration_summary.p95)
372    ));
373    lines.push(format!(
374        "iterations.................: {}\n",
375        iteration_summary.total
376    ));
377    lines.push(format!("\n\n"));
378
379    print(&mut stdout, progress_lines, lines, &bars, true)?;
380
381    Ok(())
382}