1use actionqueue_core::mutation::AttemptResultKind;
8
9pub fn update(state: &crate::http::RouterStateInner) {
11 let collectors = state.metrics.collectors();
12 let counts = count_attempt_results(state);
13
14 collectors.attempts_total().with_label_values(&["success"]).set(counts.success as f64);
15 collectors.attempts_total().with_label_values(&["failure"]).set(counts.failure as f64);
16 collectors.attempts_total().with_label_values(&["timeout"]).set(counts.timeout as f64);
17}
18
19#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
20struct AttemptResultCounts {
21 success: u64,
22 failure: u64,
23 timeout: u64,
24}
25
26fn count_attempt_results(state: &crate::http::RouterStateInner) -> AttemptResultCounts {
27 let mut counts = AttemptResultCounts::default();
28
29 let projection = match state.shared_projection.read() {
30 Ok(guard) => guard,
31 Err(_) => {
32 tracing::error!("shared projection RwLock poisoned — skipping attempt metrics update");
33 return AttemptResultCounts::default();
34 }
35 };
36 for run in projection.run_instances() {
37 if let Some(attempts) = projection.get_attempt_history(&run.id()) {
38 for attempt in attempts {
39 match attempt.result() {
40 Some(AttemptResultKind::Success) => counts.success += 1,
41 Some(AttemptResultKind::Failure) => counts.failure += 1,
42 Some(AttemptResultKind::Timeout) => counts.timeout += 1,
43 Some(AttemptResultKind::Suspended) => {}
44 None => {}
45 }
46 }
47 }
48 }
49
50 counts
51}
52
53#[cfg(test)]
54mod tests {
55 use std::sync::Arc;
56
57 use actionqueue_core::ids::{AttemptId, RunId, TaskId};
58 use actionqueue_core::mutation::AttemptResultKind;
59 use actionqueue_core::run::run_instance::RunInstance;
60 use actionqueue_core::run::state::RunState;
61 use actionqueue_core::task::constraints::TaskConstraints;
62 use actionqueue_core::task::metadata::TaskMetadata;
63 use actionqueue_core::task::run_policy::RunPolicy;
64 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
65 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
66 use actionqueue_storage::recovery::reducer::ReplayReducer;
67 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
68 use actionqueue_storage::wal::WalAppendTelemetry;
69
70 use super::update;
71 use crate::bootstrap::{ReadyStatus, RouterConfig};
72 use crate::metrics::registry::MetricsRegistry;
73 use crate::time::clock::{MockClock, SharedDaemonClock};
74
75 fn build_task_spec(task_id: TaskId) -> TaskSpec {
76 TaskSpec::new(
77 task_id,
78 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
79 RunPolicy::Once,
80 TaskConstraints::default(),
81 TaskMetadata::default(),
82 )
83 .expect("task spec should be valid")
84 }
85
86 fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
87 let event = WalEvent::new(sequence, event);
88 reducer.apply(&event).expect("event should apply");
89 }
90
91 fn run_instance_scheduled(run_id: RunId, task_id: TaskId, at: u64) -> RunInstance {
92 RunInstance::new_scheduled_with_id(run_id, task_id, at, at)
93 .expect("run instance should be valid")
94 }
95
96 fn transition_to_running(reducer: &mut ReplayReducer, run_id: RunId, mut sequence: u64) -> u64 {
97 apply_event(
98 reducer,
99 sequence,
100 WalEventType::RunStateChanged {
101 run_id,
102 previous_state: RunState::Scheduled,
103 new_state: RunState::Ready,
104 timestamp: sequence + 1_000,
105 },
106 );
107 sequence += 1;
108 apply_event(
109 reducer,
110 sequence,
111 WalEventType::RunStateChanged {
112 run_id,
113 previous_state: RunState::Ready,
114 new_state: RunState::Leased,
115 timestamp: sequence + 1_000,
116 },
117 );
118 sequence += 1;
119 apply_event(
120 reducer,
121 sequence,
122 WalEventType::RunStateChanged {
123 run_id,
124 previous_state: RunState::Leased,
125 new_state: RunState::Running,
126 timestamp: sequence + 1_000,
127 },
128 );
129 sequence + 1
130 }
131
132 fn build_state(
133 projection: ReplayReducer,
134 metrics: Arc<MetricsRegistry>,
135 ) -> crate::http::RouterStateInner {
136 let clock: SharedDaemonClock = Arc::new(MockClock::new(1_700_000_000));
137 crate::http::RouterStateInner::new(
138 RouterConfig { control_enabled: false, metrics_enabled: true },
139 Arc::new(std::sync::RwLock::new(projection)),
140 crate::http::RouterObservability {
141 metrics,
142 wal_append_telemetry: WalAppendTelemetry::new(),
143 clock,
144 recovery_observations: RecoveryObservations::zero(),
145 },
146 ReadyStatus::ready(),
147 )
148 }
149
150 #[test]
151 fn update_maps_success_failure_timeout_and_ignores_unfinished_attempts() {
152 let task_id = TaskId::new();
153 let mut projection = ReplayReducer::new();
154 apply_event(
155 &mut projection,
156 1,
157 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
158 );
159
160 let run_success = RunId::new();
162 let attempt_success = AttemptId::new();
163 apply_event(
164 &mut projection,
165 2,
166 WalEventType::RunCreated {
167 run_instance: run_instance_scheduled(run_success, task_id, 10),
168 },
169 );
170 let mut seq = transition_to_running(&mut projection, run_success, 3);
171 apply_event(
172 &mut projection,
173 seq,
174 WalEventType::AttemptStarted {
175 run_id: run_success,
176 attempt_id: attempt_success,
177 timestamp: 2_000,
178 },
179 );
180 seq += 1;
181 apply_event(
182 &mut projection,
183 seq,
184 WalEventType::AttemptFinished {
185 run_id: run_success,
186 attempt_id: attempt_success,
187 result: AttemptResultKind::Success,
188 error: None,
189 output: None,
190 timestamp: 2_100,
191 },
192 );
193
194 let run_failure = RunId::new();
196 let attempt_failure = AttemptId::new();
197 seq += 1;
198 apply_event(
199 &mut projection,
200 seq,
201 WalEventType::RunCreated {
202 run_instance: run_instance_scheduled(run_failure, task_id, 11),
203 },
204 );
205 seq = transition_to_running(&mut projection, run_failure, seq + 1);
206 apply_event(
207 &mut projection,
208 seq,
209 WalEventType::AttemptStarted {
210 run_id: run_failure,
211 attempt_id: attempt_failure,
212 timestamp: 3_000,
213 },
214 );
215 seq += 1;
216 apply_event(
217 &mut projection,
218 seq,
219 WalEventType::AttemptFinished {
220 run_id: run_failure,
221 attempt_id: attempt_failure,
222 result: AttemptResultKind::Failure,
223 error: Some("boom".to_string()),
224 output: None,
225 timestamp: 3_100,
226 },
227 );
228
229 let run_timeout = RunId::new();
231 let attempt_timeout = AttemptId::new();
232 seq += 1;
233 apply_event(
234 &mut projection,
235 seq,
236 WalEventType::RunCreated {
237 run_instance: run_instance_scheduled(run_timeout, task_id, 12),
238 },
239 );
240 seq = transition_to_running(&mut projection, run_timeout, seq + 1);
241 apply_event(
242 &mut projection,
243 seq,
244 WalEventType::AttemptStarted {
245 run_id: run_timeout,
246 attempt_id: attempt_timeout,
247 timestamp: 4_000,
248 },
249 );
250 seq += 1;
251 apply_event(
252 &mut projection,
253 seq,
254 WalEventType::AttemptFinished {
255 run_id: run_timeout,
256 attempt_id: attempt_timeout,
257 result: AttemptResultKind::Timeout,
258 error: Some("attempt timed out after 5s".to_string()),
259 output: None,
260 timestamp: 4_100,
261 },
262 );
263
264 let run_unfinished = RunId::new();
266 let attempt_unfinished = AttemptId::new();
267 seq += 1;
268 apply_event(
269 &mut projection,
270 seq,
271 WalEventType::RunCreated {
272 run_instance: run_instance_scheduled(run_unfinished, task_id, 13),
273 },
274 );
275 seq = transition_to_running(&mut projection, run_unfinished, seq + 1);
276 apply_event(
277 &mut projection,
278 seq,
279 WalEventType::AttemptStarted {
280 run_id: run_unfinished,
281 attempt_id: attempt_unfinished,
282 timestamp: 5_000,
283 },
284 );
285
286 let metrics =
287 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
288 let state = build_state(projection, Arc::clone(&metrics));
289 update(&state);
290
291 assert_eq!(
292 metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
293 1.0
294 );
295 assert_eq!(
296 metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
297 1.0
298 );
299 assert_eq!(
300 metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
301 1.0
302 );
303 }
304
305 #[test]
306 fn update_overwrites_attempt_counters_when_projection_counts_decrease() {
307 let task_id = TaskId::new();
308 let metrics =
309 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
310
311 let mut first_projection = ReplayReducer::new();
313 apply_event(
314 &mut first_projection,
315 1,
316 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
317 );
318
319 let run_success_a = RunId::new();
320 let run_success_b = RunId::new();
321 let run_failure = RunId::new();
322 let run_timeout = RunId::new();
323
324 let mut seq = 2;
325 for (run_id, result, error) in [
326 (run_success_a, AttemptResultKind::Success, None),
327 (run_success_b, AttemptResultKind::Success, None),
328 (run_failure, AttemptResultKind::Failure, Some("synthetic failure".to_string())),
329 (
330 run_timeout,
331 AttemptResultKind::Timeout,
332 Some("attempt timed out after 3s".to_string()),
333 ),
334 ] {
335 let attempt_id = AttemptId::new();
336 apply_event(
337 &mut first_projection,
338 seq,
339 WalEventType::RunCreated {
340 run_instance: run_instance_scheduled(run_id, task_id, 10 + seq),
341 },
342 );
343 seq = transition_to_running(&mut first_projection, run_id, seq + 1);
344 apply_event(
345 &mut first_projection,
346 seq,
347 WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 1_000 + seq },
348 );
349 seq += 1;
350 apply_event(
351 &mut first_projection,
352 seq,
353 WalEventType::AttemptFinished {
354 run_id,
355 attempt_id,
356 result,
357 error,
358 output: None,
359 timestamp: 2_000 + seq,
360 },
361 );
362 seq += 1;
363 }
364
365 let first_state = build_state(first_projection, Arc::clone(&metrics));
366 update(&first_state);
367 assert_eq!(
368 metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
369 2.0
370 );
371 assert_eq!(
372 metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
373 1.0
374 );
375 assert_eq!(
376 metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
377 1.0
378 );
379
380 let mut second_projection = ReplayReducer::new();
382 apply_event(
383 &mut second_projection,
384 1,
385 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
386 );
387 let run_success_only = RunId::new();
388 let attempt_success_only = AttemptId::new();
389 apply_event(
390 &mut second_projection,
391 2,
392 WalEventType::RunCreated {
393 run_instance: run_instance_scheduled(run_success_only, task_id, 99),
394 },
395 );
396 let next_seq = transition_to_running(&mut second_projection, run_success_only, 3);
397 apply_event(
398 &mut second_projection,
399 next_seq,
400 WalEventType::AttemptStarted {
401 run_id: run_success_only,
402 attempt_id: attempt_success_only,
403 timestamp: 3_000,
404 },
405 );
406 apply_event(
407 &mut second_projection,
408 next_seq + 1,
409 WalEventType::AttemptFinished {
410 run_id: run_success_only,
411 attempt_id: attempt_success_only,
412 result: AttemptResultKind::Success,
413 error: None,
414 output: None,
415 timestamp: 3_100,
416 },
417 );
418
419 let second_state = build_state(second_projection, Arc::clone(&metrics));
420 update(&second_state);
421
422 assert_eq!(
423 metrics.collectors().attempts_total().with_label_values(&["success"]).get(),
424 1.0
425 );
426 assert_eq!(
427 metrics.collectors().attempts_total().with_label_values(&["failure"]).get(),
428 0.0
429 );
430 assert_eq!(
431 metrics.collectors().attempts_total().with_label_values(&["timeout"]).get(),
432 0.0
433 );
434 }
435}