actionqueue_cli/cmd/
stats.rs1use serde::Serialize;
4use serde_json::json;
5
6use crate::args::{StatsArgs, StatsOutputFormat};
7use crate::cmd::{resolve_data_dir, CliError, CommandOutput};
8
9pub fn run(args: StatsArgs) -> Result<CommandOutput, CliError> {
11 let data_dir = resolve_data_dir(args.data_dir.as_deref());
12 let recovery = actionqueue_storage::recovery::bootstrap::load_projection_from_storage(
13 &data_dir,
14 )
15 .map_err(|error| {
16 CliError::runtime(
17 "storage_bootstrap_failed",
18 format!("unable to load storage projection: {error}"),
19 )
20 })?;
21
22 let projection = recovery.projection;
23 let summary = StatsSummary::from_projection(&projection);
24
25 match args.format {
26 StatsOutputFormat::Json => Ok(CommandOutput::Json(json!({
27 "command": "stats",
28 "data_dir": data_dir.display().to_string(),
29 "summary": summary,
30 }))),
31 StatsOutputFormat::Text => Ok(CommandOutput::Text(format!(
32 "command=stats\ndata_dir={}\ntotal_tasks={}\ntotal_runs={}\nlatest_sequence={}\\
33 nruns_scheduled={}\nruns_ready={}\nruns_leased={}\nruns_running={}\\
34 nruns_retry_wait={}\nruns_completed={}\nruns_failed={}\nruns_canceled={}\\
35 nattempts_total={}",
36 data_dir.display(),
37 summary.total_tasks,
38 summary.total_runs,
39 summary.latest_sequence,
40 summary.runs_by_state.scheduled,
41 summary.runs_by_state.ready,
42 summary.runs_by_state.leased,
43 summary.runs_by_state.running,
44 summary.runs_by_state.retry_wait,
45 summary.runs_by_state.completed,
46 summary.runs_by_state.failed,
47 summary.runs_by_state.canceled,
48 summary.attempts_total,
49 ))),
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
55pub struct RunsByState {
56 pub scheduled: usize,
58 pub ready: usize,
60 pub leased: usize,
62 pub running: usize,
64 pub retry_wait: usize,
66 pub completed: usize,
68 pub failed: usize,
70 pub canceled: usize,
72}
73
74impl RunsByState {
75 const fn zero() -> Self {
76 Self {
77 scheduled: 0,
78 ready: 0,
79 leased: 0,
80 running: 0,
81 retry_wait: 0,
82 completed: 0,
83 failed: 0,
84 canceled: 0,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
91pub struct StatsSummary {
92 pub total_tasks: usize,
94 pub total_runs: usize,
96 pub latest_sequence: u64,
98 pub runs_by_state: RunsByState,
100 pub attempts_total: u64,
102}
103
104impl StatsSummary {
105 pub fn from_projection(
107 projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
108 ) -> Self {
109 let mut runs_by_state = RunsByState::zero();
110 let mut attempts_total = 0u64;
111
112 for run in projection.run_instances() {
113 match run.state() {
114 actionqueue_core::run::state::RunState::Scheduled => runs_by_state.scheduled += 1,
115 actionqueue_core::run::state::RunState::Ready => runs_by_state.ready += 1,
116 actionqueue_core::run::state::RunState::Leased => runs_by_state.leased += 1,
117 actionqueue_core::run::state::RunState::Running => runs_by_state.running += 1,
118 actionqueue_core::run::state::RunState::RetryWait => runs_by_state.retry_wait += 1,
119 actionqueue_core::run::state::RunState::Suspended => {}
120 actionqueue_core::run::state::RunState::Completed => runs_by_state.completed += 1,
121 actionqueue_core::run::state::RunState::Failed => runs_by_state.failed += 1,
122 actionqueue_core::run::state::RunState::Canceled => runs_by_state.canceled += 1,
123 }
124 attempts_total = attempts_total.saturating_add(u64::from(run.attempt_count()));
125 }
126
127 Self {
128 total_tasks: projection.task_count(),
129 total_runs: projection.run_count(),
130 latest_sequence: projection.latest_sequence(),
131 runs_by_state,
132 attempts_total,
133 }
134 }
135}