Skip to main content

actionqueue_daemon/metrics/
attempts.rs

1//! Attempt metrics population from authoritative projection.
2//!
3//! This updater derives `actionqueue_attempts_total{result=...}` gauge directly
4//! from replay projection attempt history, using `.set()` for deterministic
5//! absolute values per scrape.
6
7use actionqueue_core::mutation::AttemptResultKind;
8
9/// Recomputes attempt outcome gauges from authoritative projection truth.
10pub fn update(state: &crate::http::RouterStateInner) {
11    let collectors = state.metrics.collectors();
12    let counts = count_attempt_results(state);
13
14    collectors.attempts_total().with_label_values(&["success"]).set(counts.success as f64);
15    collectors.attempts_total().with_label_values(&["failure"]).set(counts.failure as f64);
16    collectors.attempts_total().with_label_values(&["timeout"]).set(counts.timeout as f64);
17}
18
19#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
20struct AttemptResultCounts {
21    success: u64,
22    failure: u64,
23    timeout: u64,
24}
25
26fn count_attempt_results(state: &crate::http::RouterStateInner) -> AttemptResultCounts {
27    let mut counts = AttemptResultCounts::default();
28
29    let projection = match state.shared_projection.read() {
30        Ok(guard) => guard,
31        Err(_) => {
32            tracing::error!("shared projection RwLock poisoned — skipping attempt metrics update");
33            return AttemptResultCounts::default();
34        }
35    };
36    for run in projection.run_instances() {
37        if let Some(attempts) = projection.get_attempt_history(&run.id()) {
38            for attempt in attempts {
39                match attempt.result() {
40                    Some(AttemptResultKind::Success) => counts.success += 1,
41                    Some(AttemptResultKind::Failure) => counts.failure += 1,
42                    Some(AttemptResultKind::Timeout) => counts.timeout += 1,
43                    Some(AttemptResultKind::Suspended) => {}
44                    None => {}
45                }
46            }
47        }
48    }
49
50    counts
51}
52
53#[cfg(test)]
54mod tests {
55    use std::sync::Arc;
56
57    use actionqueue_core::ids::{AttemptId, RunId, TaskId};
58    use actionqueue_core::mutation::AttemptResultKind;
59    use actionqueue_core::run::run_instance::RunInstance;
60    use actionqueue_core::run::state::RunState;
61    use actionqueue_core::task::constraints::TaskConstraints;
62    use actionqueue_core::task::metadata::TaskMetadata;
63    use actionqueue_core::task::run_policy::RunPolicy;
64    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
65    use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
66    use actionqueue_storage::recovery::reducer::ReplayReducer;
67    use actionqueue_storage::wal::event::{WalEvent, WalEventType};
68    use actionqueue_storage::wal::WalAppendTelemetry;
69
70    use super::update;
71    use crate::bootstrap::{ReadyStatus, RouterConfig};
72    use crate::metrics::registry::MetricsRegistry;
73    use crate::time::clock::{MockClock, SharedDaemonClock};
74
75    fn build_task_spec(task_id: TaskId) -> TaskSpec {
76        TaskSpec::new(
77            task_id,
78            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
79            RunPolicy::Once,
80            TaskConstraints::default(),
81            TaskMetadata::default(),
82        )
83        .expect("task spec should be valid")
84    }
85
86    fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
87        let event = WalEvent::new(sequence, event);
88        reducer.apply(&event).expect("event should apply");
89    }
90
91    fn run_instance_scheduled(run_id: RunId, task_id: TaskId, at: u64) -> RunInstance {
92        RunInstance::new_scheduled_with_id(run_id, task_id, at, at)
93            .expect("run instance should be valid")
94    }
95
96    fn transition_to_running(reducer: &mut ReplayReducer, run_id: RunId, mut sequence: u64) -> u64 {
97        apply_event(
98            reducer,
99            sequence,
100            WalEventType::RunStateChanged {
101                run_id,
102                previous_state: RunState::Scheduled,
103                new_state: RunState::Ready,
104                timestamp: sequence + 1_000,
105            },
106        );
107        sequence += 1;
108        apply_event(
109            reducer,
110            sequence,
111            WalEventType::RunStateChanged {
112                run_id,
113                previous_state: RunState::Ready,
114                new_state: RunState::Leased,
115                timestamp: sequence + 1_000,
116            },
117        );
118        sequence += 1;
119        apply_event(
120            reducer,
121            sequence,
122            WalEventType::RunStateChanged {
123                run_id,
124                previous_state: RunState::Leased,
125                new_state: RunState::Running,
126                timestamp: sequence + 1_000,
127            },
128        );
129        sequence + 1
130    }
131
132    fn build_state(
133        projection: ReplayReducer,
134        metrics: Arc<MetricsRegistry>,
135    ) -> crate::http::RouterStateInner {
136        let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
137        crate::http::RouterStateInner::new(
138            RouterConfig { control_enabled: false, metrics_enabled: true },
139            Arc::new(std::sync::RwLock::new(projection)),
140            crate::http::RouterObservability {
141                metrics,
142                wal_append_telemetry: WalAppendTelemetry::new(),
143                clock,
144                recovery_observations: RecoveryObservations::zero(),
145            },
146            ReadyStatus::ready(),
147        )
148    }
149
150    #[test]
151    fn update_maps_success_failure_timeout_and_ignores_unfinished_attempts() {
152        let task_id = TaskId::new();
153        let mut projection = ReplayReducer::new();
154        apply_event(
155            &mut projection,
156            1,
157            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
158        );
159
160        // Success run
161        let run_success = RunId::new();
162        let attempt_success = AttemptId::new();
163        apply_event(
164            &mut projection,
165            2,
166            WalEventType::RunCreated {
167                run_instance: run_instance_scheduled(run_success, task_id, 10),
168            },
169        );
170        let mut seq = transition_to_running(&mut projection, run_success, 3);
171        apply_event(
172            &mut projection,
173            seq,
174            WalEventType::AttemptStarted {
175                run_id: run_success,
176                attempt_id: attempt_success,
177                timestamp: 2_000,
178            },
179        );
180        seq += 1;
181        apply_event(
182            &mut projection,
183            seq,
184            WalEventType::AttemptFinished {
185                run_id: run_success,
186                attempt_id: attempt_success,
187                result: AttemptResultKind::Success,
188                error: None,
189                output: None,
190                timestamp: 2_100,
191            },
192        );
193
194        // Failure run
195        let run_failure = RunId::new();
196        let attempt_failure = AttemptId::new();
197        seq += 1;
198        apply_event(
199            &mut projection,
200            seq,
201            WalEventType::RunCreated {
202                run_instance: run_instance_scheduled(run_failure, task_id, 11),
203            },
204        );
205        seq = transition_to_running(&mut projection, run_failure, seq + 1);
206        apply_event(
207            &mut projection,
208            seq,
209            WalEventType::AttemptStarted {
210                run_id: run_failure,
211                attempt_id: attempt_failure,
212                timestamp: 3_000,
213            },
214        );
215        seq += 1;
216        apply_event(
217            &mut projection,
218            seq,
219            WalEventType::AttemptFinished {
220                run_id: run_failure,
221                attempt_id: attempt_failure,
222                result: AttemptResultKind::Failure,
223                error: Some("boom".to_string()),
224                output: None,
225                timestamp: 3_100,
226            },
227        );
228
229        // Timeout run
230        let run_timeout = RunId::new();
231        let attempt_timeout = AttemptId::new();
232        seq += 1;
233        apply_event(
234            &mut projection,
235            seq,
236            WalEventType::RunCreated {
237                run_instance: run_instance_scheduled(run_timeout, task_id, 12),
238            },
239        );
240        seq = transition_to_running(&mut projection, run_timeout, seq + 1);
241        apply_event(
242            &mut projection,
243            seq,
244            WalEventType::AttemptStarted {
245                run_id: run_timeout,
246                attempt_id: attempt_timeout,
247                timestamp: 4_000,
248            },
249        );
250        seq += 1;
251        apply_event(
252            &mut projection,
253            seq,
254            WalEventType::AttemptFinished {
255                run_id: run_timeout,
256                attempt_id: attempt_timeout,
257                result: AttemptResultKind::Timeout,
258                error: Some("attempt timed out after 5s".to_string()),
259                output: None,
260                timestamp: 4_100,
261            },
262        );
263
264        // Unfinished attempt should not count
265        let run_unfinished = RunId::new();
266        let attempt_unfinished = AttemptId::new();
267        seq += 1;
268        apply_event(
269            &mut projection,
270            seq,
271            WalEventType::RunCreated {
272                run_instance: run_instance_scheduled(run_unfinished, task_id, 13),
273            },
274        );
275        seq = transition_to_running(&mut projection, run_unfinished, seq + 1);
276        apply_event(
277            &mut projection,
278            seq,
279            WalEventType::AttemptStarted {
280                run_id: run_unfinished,
281                attempt_id: attempt_unfinished,
282                timestamp: 5_000,
283            },
284        );
285
286        let metrics =
287            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
288        let state = build_state(projection, Arc::clone(&metrics));
289        update(&state);
290
291        assert_eq!(
292            metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
293            1.0
294        );
295        assert_eq!(
296            metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
297            1.0
298        );
299        assert_eq!(
300            metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
301            1.0
302        );
303    }
304
305    #[test]
306    fn update_overwrites_attempt_counters_when_projection_counts_decrease() {
307        let task_id = TaskId::new();
308        let metrics =
309            Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
310
311        // First snapshot: success=2, failure=1, timeout=1
312        let mut first_projection = ReplayReducer::new();
313        apply_event(
314            &mut first_projection,
315            1,
316            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
317        );
318
319        let run_success_a = RunId::new();
320        let run_success_b = RunId::new();
321        let run_failure = RunId::new();
322        let run_timeout = RunId::new();
323
324        let mut seq = 2;
325        for (run_id, result, error) in [
326            (run_success_a, AttemptResultKind::Success, None),
327            (run_success_b, AttemptResultKind::Success, None),
328            (run_failure, AttemptResultKind::Failure, Some("synthetic failure".to_string())),
329            (
330                run_timeout,
331                AttemptResultKind::Timeout,
332                Some("attempt timed out after 3s".to_string()),
333            ),
334        ] {
335            let attempt_id = AttemptId::new();
336            apply_event(
337                &mut first_projection,
338                seq,
339                WalEventType::RunCreated {
340                    run_instance: run_instance_scheduled(run_id, task_id, 10 + seq),
341                },
342            );
343            seq = transition_to_running(&mut first_projection, run_id, seq + 1);
344            apply_event(
345                &mut first_projection,
346                seq,
347                WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 1_000 + seq },
348            );
349            seq += 1;
350            apply_event(
351                &mut first_projection,
352                seq,
353                WalEventType::AttemptFinished {
354                    run_id,
355                    attempt_id,
356                    result,
357                    error,
358                    output: None,
359                    timestamp: 2_000 + seq,
360                },
361            );
362            seq += 1;
363        }
364
365        let first_state = build_state(first_projection, Arc::clone(&metrics));
366        update(&first_state);
367        assert_eq!(
368            metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
369            2.0
370        );
371        assert_eq!(
372            metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
373            1.0
374        );
375        assert_eq!(
376            metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
377            1.0
378        );
379
380        // Second snapshot in same process: success=1, failure=0, timeout=0
381        let mut second_projection = ReplayReducer::new();
382        apply_event(
383            &mut second_projection,
384            1,
385            WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
386        );
387        let run_success_only = RunId::new();
388        let attempt_success_only = AttemptId::new();
389        apply_event(
390            &mut second_projection,
391            2,
392            WalEventType::RunCreated {
393                run_instance: run_instance_scheduled(run_success_only, task_id, 99),
394            },
395        );
396        let next_seq = transition_to_running(&mut second_projection, run_success_only, 3);
397        apply_event(
398            &mut second_projection,
399            next_seq,
400            WalEventType::AttemptStarted {
401                run_id: run_success_only,
402                attempt_id: attempt_success_only,
403                timestamp: 3_000,
404            },
405        );
406        apply_event(
407            &mut second_projection,
408            next_seq + 1,
409            WalEventType::AttemptFinished {
410                run_id: run_success_only,
411                attempt_id: attempt_success_only,
412                result: AttemptResultKind::Success,
413                error: None,
414                output: None,
415                timestamp: 3_100,
416            },
417        );
418
419        let second_state = build_state(second_projection, Arc::clone(&metrics));
420        update(&second_state);
421
422        assert_eq!(
423            metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
424            1.0
425        );
426        assert_eq!(
427            metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
428            0.0
429        );
430        assert_eq!(
431            metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
432            0.0
433        );
434    }
435}