Skip to main content

actionqueue_daemon/metrics/
runs.rs

1//! Run metrics population from authoritative projection and daemon clock.
2//!
3//! This module updates run-focused Prometheus metric families from the current
4//! authoritative in-memory projection and the router state's injected daemon
5//! clock. It does not introduce inferred counters or ad hoc wall-clock reads.
6
7use actionqueue_core::run::state::RunState;
8
9/// Recomputes run-focused metric families from the authoritative projection.
10///
11/// Update behavior is deterministic for each scrape:
12/// - Recounts all run states from `ReplayReducer::run_instances()`.
13/// - Overwrites `actionqueue_runs_total{state=...}` gauge samples.
14/// - Overwrites aggregate gauges `actionqueue_runs_ready` and
15///   `actionqueue_runs_running`.
16/// - Observes scheduling lag histogram samples for lag-eligible states only.
17pub fn update(state: &crate::http::RouterStateInner) {
18    let collectors = state.metrics.collectors();
19    let mut counts = RunStateCounts::default();
20
21    let now_unix_seconds = super::lag_now(state);
22
23    let projection = match state.shared_projection.read() {
24        Ok(guard) => guard,
25        Err(_) => {
26            tracing::error!("shared projection RwLock poisoned — skipping run metrics update");
27            return;
28        }
29    };
30    for run in projection.run_instances() {
31        let run_state = run.state();
32        counts.increment(run_state);
33
34        if is_lag_eligible(run_state) {
35            let lag_seconds = now_unix_seconds.saturating_sub(run.scheduled_at());
36            collectors.scheduling_lag_seconds().observe(lag_seconds as f64);
37        }
38    }
39
40    for run_state in ALL_RUN_STATES {
41        collectors
42            .runs_total()
43            .with_label_values(&[state_label(run_state)])
44            .set(counts.get(run_state) as f64);
45    }
46
47    collectors.runs_ready().set(counts.get(RunState::Ready) as f64);
48    collectors.runs_running().set(counts.get(RunState::Running) as f64);
49}
50
51const ALL_RUN_STATES: [RunState; 8] = [
52    RunState::Scheduled,
53    RunState::Ready,
54    RunState::Leased,
55    RunState::Running,
56    RunState::RetryWait,
57    RunState::Completed,
58    RunState::Failed,
59    RunState::Canceled,
60];
61
62#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
63struct RunStateCounts {
64    scheduled: u64,
65    ready: u64,
66    leased: u64,
67    running: u64,
68    retry_wait: u64,
69    completed: u64,
70    failed: u64,
71    canceled: u64,
72}
73
74impl RunStateCounts {
75    fn increment(&mut self, state: RunState) {
76        match state {
77            RunState::Scheduled => self.scheduled += 1,
78            RunState::Ready => self.ready += 1,
79            RunState::Leased => self.leased += 1,
80            RunState::Running => self.running += 1,
81            RunState::RetryWait => self.retry_wait += 1,
82            RunState::Suspended => {}
83            RunState::Completed => self.completed += 1,
84            RunState::Failed => self.failed += 1,
85            RunState::Canceled => self.canceled += 1,
86        }
87    }
88
89    fn get(&self, state: RunState) -> u64 {
90        match state {
91            RunState::Scheduled => self.scheduled,
92            RunState::Ready => self.ready,
93            RunState::Leased => self.leased,
94            RunState::Running => self.running,
95            RunState::RetryWait => self.retry_wait,
96            RunState::Suspended => 0,
97            RunState::Completed => self.completed,
98            RunState::Failed => self.failed,
99            RunState::Canceled => self.canceled,
100        }
101    }
102}
103
104fn state_label(state: RunState) -> &'static str {
105    match state {
106        RunState::Scheduled => "scheduled",
107        RunState::Ready => "ready",
108        RunState::Leased => "leased",
109        RunState::Running => "running",
110        RunState::RetryWait => "retry_wait",
111        RunState::Suspended => "suspended",
112        RunState::Completed => "completed",
113        RunState::Failed => "failed",
114        RunState::Canceled => "canceled",
115    }
116}
117
118fn is_lag_eligible(state: RunState) -> bool {
119    matches!(state, RunState::Ready | RunState::Leased | RunState::Running | RunState::RetryWait)
120}
121
122#[cfg(test)]
123mod tests {
124    use std::sync::Arc;
125
126    use actionqueue_core::ids::{RunId, TaskId};
127    use actionqueue_core::run::run_instance::RunInstance;
128    use actionqueue_core::task::constraints::TaskConstraints;
129    use actionqueue_core::task::metadata::TaskMetadata;
130    use actionqueue_core::task::run_policy::RunPolicy;
131    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
132    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
133    use actionqueue_storage::recovery::reducer::ReplayReducer;
134    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
135    use actionqueue_storage::wal::WalAppendTelemetry;
136
137    use super::{is_lag_eligible, state_label, update};
138    use crate::bootstrap::{ReadyStatus, RouterConfig};
139    use crate::metrics::registry::MetricsRegistry;
140    use crate::time::clock::{MockClock, SharedDaemonClock};
141
142    fn build_task_spec(task_id: TaskId) -> TaskSpec {
143        TaskSpec::new(
144            task_id,
145            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
146            RunPolicy::Once,
147            TaskConstraints::default(),
148            TaskMetadata::default(),
149        )
150        .expect("task spec should be valid")
151    }
152
153    fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
154        let event = WalEvent::new(sequence, event);
155        reducer.apply(&event).expect("event should apply");
156    }
157
158    fn run_instance_scheduled(run_id: RunId, task_id: TaskId, scheduled_at: u64) -> RunInstance {
159        RunInstance::new_scheduled_with_id(run_id, task_id, scheduled_at, scheduled_at)
160            .expect("run instance should be valid")
161    }
162
163    #[allow(clippy::too_many_arguments)] // Test helper with naturally many parameters
164    fn seed_run_state(
165        reducer: &mut ReplayReducer,
166        sequence: &mut u64,
167        run_id: RunId,
168        task_id: TaskId,
169        target_state: actionqueue_core::run::state::RunState,
170        scheduled_at: u64,
171    ) {
172        fn transition_state(
173            reducer: &mut ReplayReducer,
174            sequence: &mut u64,
175            run_id: RunId,
176            from: actionqueue_core::run::state::RunState,
177            to: actionqueue_core::run::state::RunState,
178        ) {
179            apply_event(
180                reducer,
181                *sequence,
182                WalEventType::RunStateChanged {
183                    run_id,
184                    previous_state: from,
185                    new_state: to,
186                    timestamp: *sequence + 1_000,
187                },
188            );
189            *sequence += 1;
190        }
191
192        apply_event(
193            reducer,
194            *sequence,
195            WalEventType::RunCreated {
196                run_instance: run_instance_scheduled(run_id, task_id, scheduled_at),
197            },
198        );
199        *sequence += 1;
200
201        use actionqueue_core::run::state::RunState;
202        match target_state {
203            RunState::Scheduled => {}
204            RunState::Ready => {
205                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready)
206            }
207            RunState::Leased => {
208                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
209                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
210            }
211            RunState::Running => {
212                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
213                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
214                transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
215            }
216            RunState::RetryWait => {
217                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
218                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
219                transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
220                transition_state(reducer, sequence, run_id, RunState::Running, RunState::RetryWait);
221            }
222            RunState::Completed => {
223                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
224                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
225                transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
226                transition_state(reducer, sequence, run_id, RunState::Running, RunState::Completed);
227            }
228            RunState::Failed => {
229                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
230                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
231                transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
232                transition_state(reducer, sequence, run_id, RunState::Running, RunState::Failed);
233            }
234            RunState::Canceled => {
235                apply_event(
236                    reducer,
237                    *sequence,
238                    WalEventType::RunCanceled { run_id, timestamp: *sequence + 1_000 },
239                );
240                *sequence += 1;
241            }
242            RunState::Suspended => {
243                transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
244                transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
245                transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
246                transition_state(reducer, sequence, run_id, RunState::Running, RunState::Suspended);
247            }
248        }
249    }
250
251    fn build_state(
252        projection: ReplayReducer,
253        metrics: Arc<MetricsRegistry>,
254        now_unix_seconds: u64,
255    ) -> crate::http::RouterStateInner {
256        let clock: SharedDaemonClock = Arc::new(MockClock::new(now_unix_seconds));
257        crate::http::RouterStateInner::new(
258            RouterConfig { control_enabled: false, metrics_enabled: false },
259            Arc::new(std::sync::RwLock::new(projection)),
260            crate::http::RouterObservability {
261                metrics,
262                wal_append_telemetry: WalAppendTelemetry::new(),
263                clock,
264                recovery_observations: RecoveryObservations::zero(),
265            },
266            ReadyStatus::ready(),
267        )
268    }
269
270    fn run_total_value(
271        metrics: &MetricsRegistry,
272        state: actionqueue_core::run::state::RunState,
273    ) -> f64 {
274        metrics.collectors().runs_total().with_label_values(&[state_label(state)]).get()
275    }
276
277    #[test]
278    fn state_to_label_mapping_covers_all_run_states() {
279        use actionqueue_core::run::state::RunState;
280
281        assert_eq!(state_label(RunState::Scheduled), "scheduled");
282        assert_eq!(state_label(RunState::Ready), "ready");
283        assert_eq!(state_label(RunState::Leased), "leased");
284        assert_eq!(state_label(RunState::Running), "running");
285        assert_eq!(state_label(RunState::RetryWait), "retry_wait");
286        assert_eq!(state_label(RunState::Completed), "completed");
287        assert_eq!(state_label(RunState::Failed), "failed");
288        assert_eq!(state_label(RunState::Canceled), "canceled");
289    }
290
291    #[test]
292    fn lag_eligibility_includes_only_non_terminal_readiness_path_states() {
293        use actionqueue_core::run::state::RunState;
294
295        assert!(!is_lag_eligible(RunState::Scheduled));
296        assert!(is_lag_eligible(RunState::Ready));
297        assert!(is_lag_eligible(RunState::Leased));
298        assert!(is_lag_eligible(RunState::Running));
299        assert!(is_lag_eligible(RunState::RetryWait));
300        assert!(!is_lag_eligible(RunState::Completed));
301        assert!(!is_lag_eligible(RunState::Failed));
302        assert!(!is_lag_eligible(RunState::Canceled));
303    }
304
305    #[test]
306    fn update_overwrites_gauges_for_repeated_projection_snapshots() {
307        use actionqueue_core::run::state::RunState;
308
309        let task_id = TaskId::new();
310        let mut projection_one = ReplayReducer::new();
311        apply_event(
312            &mut projection_one,
313            1,
314            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
315        );
316        let mut sequence = 2;
317        seed_run_state(
318            &mut projection_one,
319            &mut sequence,
320            RunId::new(),
321            task_id,
322            RunState::Ready,
323            100,
324        );
325        seed_run_state(
326            &mut projection_one,
327            &mut sequence,
328            RunId::new(),
329            task_id,
330            RunState::Completed,
331            100,
332        );
333
334        let metrics =
335            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
336        let state_one = build_state(projection_one, Arc::clone(&metrics), 200);
337        update(&state_one);
338
339        assert_eq!(run_total_value(&metrics, RunState::Ready), 1.0);
340        assert_eq!(run_total_value(&metrics, RunState::Completed), 1.0);
341
342        let mut projection_two = ReplayReducer::new();
343        apply_event(
344            &mut projection_two,
345            1,
346            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
347        );
348        let mut sequence = 2;
349        seed_run_state(
350            &mut projection_two,
351            &mut sequence,
352            RunId::new(),
353            task_id,
354            RunState::Running,
355            150,
356        );
357
358        let state_two = build_state(projection_two, Arc::clone(&metrics), 300);
359        update(&state_two);
360
361        assert_eq!(run_total_value(&metrics, RunState::Ready), 0.0);
362        assert_eq!(run_total_value(&metrics, RunState::Completed), 0.0);
363        assert_eq!(run_total_value(&metrics, RunState::Running), 1.0);
364    }
365
366    #[test]
367    fn ready_and_running_aggregates_match_state_counts() {
368        use actionqueue_core::run::state::RunState;
369
370        let task_id = TaskId::new();
371        let mut projection = ReplayReducer::new();
372        apply_event(
373            &mut projection,
374            1,
375            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
376        );
377
378        let mut sequence = 2;
379        seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 100);
380        seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 100);
381        seed_run_state(
382            &mut projection,
383            &mut sequence,
384            RunId::new(),
385            task_id,
386            RunState::Running,
387            100,
388        );
389        seed_run_state(
390            &mut projection,
391            &mut sequence,
392            RunId::new(),
393            task_id,
394            RunState::Running,
395            100,
396        );
397        seed_run_state(
398            &mut projection,
399            &mut sequence,
400            RunId::new(),
401            task_id,
402            RunState::Scheduled,
403            100,
404        );
405
406        let metrics =
407            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
408        let state = build_state(projection, Arc::clone(&metrics), 300);
409        update(&state);
410
411        assert_eq!(metrics.collectors().runs_ready().get(), 2.0);
412        assert_eq!(metrics.collectors().runs_running().get(), 2.0);
413        assert_eq!(run_total_value(&metrics, RunState::Ready), 2.0);
414        assert_eq!(run_total_value(&metrics, RunState::Running), 2.0);
415    }
416
417    #[test]
418    fn lag_observation_uses_saturating_subtraction() {
419        use actionqueue_core::run::state::RunState;
420
421        let task_id = TaskId::new();
422        let mut projection = ReplayReducer::new();
423        apply_event(
424            &mut projection,
425            1,
426            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
427        );
428
429        let mut sequence = 2;
430        seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 200);
431
432        let metrics =
433            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
434        let state = build_state(projection, Arc::clone(&metrics), 100);
435        update(&state);
436
437        assert_eq!(metrics.collectors().scheduling_lag_seconds().get_sample_count(), 1);
438        assert_eq!(metrics.collectors().scheduling_lag_seconds().get_sample_sum(), 0.0);
439    }
440
441    #[test]
442    fn lag_observation_uses_router_state_clock_value() {
443        use actionqueue_core::run::state::RunState;
444
445        let task_id = TaskId::new();
446        let mut projection = ReplayReducer::new();
447        apply_event(
448            &mut projection,
449            1,
450            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
451        );
452        let mut sequence = 2;
453        seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 40);
454
455        let metrics =
456            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
457
458        let state_at_50 = build_state(projection.clone(), Arc::clone(&metrics), 50);
459        update(&state_at_50);
460        let sample_count_after_first =
461            metrics.collectors().scheduling_lag_seconds().get_sample_count();
462        let sample_sum_after_first = metrics.collectors().scheduling_lag_seconds().get_sample_sum();
463
464        let state_at_90 = build_state(projection, Arc::clone(&metrics), 90);
465        update(&state_at_90);
466        let sample_count_after_second =
467            metrics.collectors().scheduling_lag_seconds().get_sample_count();
468        let sample_sum_after_second =
469            metrics.collectors().scheduling_lag_seconds().get_sample_sum();
470
471        assert_eq!(sample_count_after_first, 1);
472        assert_eq!(sample_sum_after_first, 10.0);
473        assert_eq!(sample_count_after_second, 2);
474        assert_eq!(sample_sum_after_second, 60.0);
475    }
476}