Skip to main content

actionqueue_cli/cmd/
stats.rs

1//! Stats command execution path.
2
3use serde::Serialize;
4use serde_json::json;
5
6use crate::args::{StatsArgs, StatsOutputFormat};
7use crate::cmd::{resolve_data_dir, CliError, CommandOutput};
8
9/// Executes stats command flow.
10pub fn run(args: StatsArgs) -> Result<CommandOutput, CliError> {
11    let data_dir = resolve_data_dir(args.data_dir.as_deref());
12    let recovery = actionqueue_storage::recovery::bootstrap::load_projection_from_storage(
13        &data_dir,
14    )
15    .map_err(|error| {
16        CliError::runtime(
17            "storage_bootstrap_failed",
18            format!("unable to load storage projection: {error}"),
19        )
20    })?;
21
22    let projection = recovery.projection;
23    let summary = StatsSummary::from_projection(&projection);
24
25    match args.format {
26        StatsOutputFormat::Json => Ok(CommandOutput::Json(json!({
27            "command": "stats",
28            "data_dir": data_dir.display().to_string(),
29            "summary": summary,
30        }))),
31        StatsOutputFormat::Text => Ok(CommandOutput::Text(format!(
32            "command=stats\ndata_dir={}\ntotal_tasks={}\ntotal_runs={}\nlatest_sequence={}\\
33             nruns_scheduled={}\nruns_ready={}\nruns_leased={}\nruns_running={}\\
34             nruns_retry_wait={}\nruns_completed={}\nruns_failed={}\nruns_canceled={}\\
35             nattempts_total={}",
36            data_dir.display(),
37            summary.total_tasks,
38            summary.total_runs,
39            summary.latest_sequence,
40            summary.runs_by_state.scheduled,
41            summary.runs_by_state.ready,
42            summary.runs_by_state.leased,
43            summary.runs_by_state.running,
44            summary.runs_by_state.retry_wait,
45            summary.runs_by_state.completed,
46            summary.runs_by_state.failed,
47            summary.runs_by_state.canceled,
48            summary.attempts_total,
49        ))),
50    }
51}
52
53/// Deterministic run-state counts.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
55pub struct RunsByState {
56    /// Count of `Scheduled` runs.
57    pub scheduled: usize,
58    /// Count of `Ready` runs.
59    pub ready: usize,
60    /// Count of `Leased` runs.
61    pub leased: usize,
62    /// Count of `Running` runs.
63    pub running: usize,
64    /// Count of `RetryWait` runs.
65    pub retry_wait: usize,
66    /// Count of `Completed` runs.
67    pub completed: usize,
68    /// Count of `Failed` runs.
69    pub failed: usize,
70    /// Count of `Canceled` runs.
71    pub canceled: usize,
72}
73
74impl RunsByState {
75    const fn zero() -> Self {
76        Self {
77            scheduled: 0,
78            ready: 0,
79            leased: 0,
80            running: 0,
81            retry_wait: 0,
82            completed: 0,
83            failed: 0,
84            canceled: 0,
85        }
86    }
87}
88
89/// Deterministic aggregate stats summary.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
91pub struct StatsSummary {
92    /// Number of tracked tasks.
93    pub total_tasks: usize,
94    /// Number of tracked runs.
95    pub total_runs: usize,
96    /// Latest projection sequence.
97    pub latest_sequence: u64,
98    /// Run counts by lifecycle state.
99    pub runs_by_state: RunsByState,
100    /// Total attempts across tracked runs.
101    pub attempts_total: u64,
102}
103
104impl StatsSummary {
105    /// Builds deterministic stats from authoritative projection state.
106    pub fn from_projection(
107        projection: &actionqueue_storage::recovery::reducer::ReplayReducer,
108    ) -> Self {
109        let mut runs_by_state = RunsByState::zero();
110        let mut attempts_total = 0u64;
111
112        for run in projection.run_instances() {
113            match run.state() {
114                actionqueue_core::run::state::RunState::Scheduled => runs_by_state.scheduled += 1,
115                actionqueue_core::run::state::RunState::Ready => runs_by_state.ready += 1,
116                actionqueue_core::run::state::RunState::Leased => runs_by_state.leased += 1,
117                actionqueue_core::run::state::RunState::Running => runs_by_state.running += 1,
118                actionqueue_core::run::state::RunState::RetryWait => runs_by_state.retry_wait += 1,
119                actionqueue_core::run::state::RunState::Suspended => {}
120                actionqueue_core::run::state::RunState::Completed => runs_by_state.completed += 1,
121                actionqueue_core::run::state::RunState::Failed => runs_by_state.failed += 1,
122                actionqueue_core::run::state::RunState::Canceled => runs_by_state.canceled += 1,
123            }
124            attempts_total = attempts_total.saturating_add(u64::from(run.attempt_count()));
125        }
126
127        Self {
128            total_tasks: projection.task_count(),
129            total_runs: projection.run_count(),
130            latest_sequence: projection.latest_sequence(),
131            runs_by_state,
132            attempts_total,
133        }
134    }
135}