1use actionqueue_core::run::state::RunState;
8
9pub fn update(state: &crate::http::RouterStateInner) {
18 let collectors = state.metrics.collectors();
19 let mut counts = RunStateCounts::default();
20
21 let now_unix_seconds = super::lag_now(state);
22
23 let projection = match state.shared_projection.read() {
24 Ok(guard) => guard,
25 Err(_) => {
26 tracing::error!("shared projection RwLock poisoned — skipping run metrics update");
27 return;
28 }
29 };
30 for run in projection.run_instances() {
31 let run_state = run.state();
32 counts.increment(run_state);
33
34 if is_lag_eligible(run_state) {
35 let lag_seconds = now_unix_seconds.saturating_sub(run.scheduled_at());
36 collectors.scheduling_lag_seconds().observe(lag_seconds as f64);
37 }
38 }
39
40 for run_state in ALL_RUN_STATES {
41 collectors
42 .runs_total()
43 .with_label_values(&[state_label(run_state)])
44 .set(counts.get(run_state) as f64);
45 }
46
47 collectors.runs_ready().set(counts.get(RunState::Ready) as f64);
48 collectors.runs_running().set(counts.get(RunState::Running) as f64);
49}
50
51const ALL_RUN_STATES: [RunState; 8] = [
52 RunState::Scheduled,
53 RunState::Ready,
54 RunState::Leased,
55 RunState::Running,
56 RunState::RetryWait,
57 RunState::Completed,
58 RunState::Failed,
59 RunState::Canceled,
60];
61
62#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
63struct RunStateCounts {
64 scheduled: u64,
65 ready: u64,
66 leased: u64,
67 running: u64,
68 retry_wait: u64,
69 completed: u64,
70 failed: u64,
71 canceled: u64,
72}
73
74impl RunStateCounts {
75 fn increment(&mut self, state: RunState) {
76 match state {
77 RunState::Scheduled => self.scheduled += 1,
78 RunState::Ready => self.ready += 1,
79 RunState::Leased => self.leased += 1,
80 RunState::Running => self.running += 1,
81 RunState::RetryWait => self.retry_wait += 1,
82 RunState::Suspended => {}
83 RunState::Completed => self.completed += 1,
84 RunState::Failed => self.failed += 1,
85 RunState::Canceled => self.canceled += 1,
86 }
87 }
88
89 fn get(&self, state: RunState) -> u64 {
90 match state {
91 RunState::Scheduled => self.scheduled,
92 RunState::Ready => self.ready,
93 RunState::Leased => self.leased,
94 RunState::Running => self.running,
95 RunState::RetryWait => self.retry_wait,
96 RunState::Suspended => 0,
97 RunState::Completed => self.completed,
98 RunState::Failed => self.failed,
99 RunState::Canceled => self.canceled,
100 }
101 }
102}
103
104fn state_label(state: RunState) -> &'static str {
105 match state {
106 RunState::Scheduled => "scheduled",
107 RunState::Ready => "ready",
108 RunState::Leased => "leased",
109 RunState::Running => "running",
110 RunState::RetryWait => "retry_wait",
111 RunState::Suspended => "suspended",
112 RunState::Completed => "completed",
113 RunState::Failed => "failed",
114 RunState::Canceled => "canceled",
115 }
116}
117
118fn is_lag_eligible(state: RunState) -> bool {
119 matches!(state, RunState::Ready | RunState::Leased | RunState::Running | RunState::RetryWait)
120}
121
122#[cfg(test)]
123mod tests {
124 use std::sync::Arc;
125
126 use actionqueue_core::ids::{RunId, TaskId};
127 use actionqueue_core::run::run_instance::RunInstance;
128 use actionqueue_core::task::constraints::TaskConstraints;
129 use actionqueue_core::task::metadata::TaskMetadata;
130 use actionqueue_core::task::run_policy::RunPolicy;
131 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
132 use actionqueue_storage::recovery::bootstrap::RecoveryObservations;
133 use actionqueue_storage::recovery::reducer::ReplayReducer;
134 use actionqueue_storage::wal::event::{WalEvent, WalEventType};
135 use actionqueue_storage::wal::WalAppendTelemetry;
136
137 use super::{is_lag_eligible, state_label, update};
138 use crate::bootstrap::{ReadyStatus, RouterConfig};
139 use crate::metrics::registry::MetricsRegistry;
140 use crate::time::clock::{MockClock, SharedDaemonClock};
141
142 fn build_task_spec(task_id: TaskId) -> TaskSpec {
143 TaskSpec::new(
144 task_id,
145 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
146 RunPolicy::Once,
147 TaskConstraints::default(),
148 TaskMetadata::default(),
149 )
150 .expect("task spec should be valid")
151 }
152
153 fn apply_event(reducer: &mut ReplayReducer, sequence: u64, event: WalEventType) {
154 let event = WalEvent::new(sequence, event);
155 reducer.apply(&event).expect("event should apply");
156 }
157
158 fn run_instance_scheduled(run_id: RunId, task_id: TaskId, scheduled_at: u64) -> RunInstance {
159 RunInstance::new_scheduled_with_id(run_id, task_id, scheduled_at, scheduled_at)
160 .expect("run instance should be valid")
161 }
162
163 #[allow(clippy::too_many_arguments)] fn seed_run_state(
165 reducer: &mut ReplayReducer,
166 sequence: &mut u64,
167 run_id: RunId,
168 task_id: TaskId,
169 target_state: actionqueue_core::run::state::RunState,
170 scheduled_at: u64,
171 ) {
172 fn transition_state(
173 reducer: &mut ReplayReducer,
174 sequence: &mut u64,
175 run_id: RunId,
176 from: actionqueue_core::run::state::RunState,
177 to: actionqueue_core::run::state::RunState,
178 ) {
179 apply_event(
180 reducer,
181 *sequence,
182 WalEventType::RunStateChanged {
183 run_id,
184 previous_state: from,
185 new_state: to,
186 timestamp: *sequence + 1_000,
187 },
188 );
189 *sequence += 1;
190 }
191
192 apply_event(
193 reducer,
194 *sequence,
195 WalEventType::RunCreated {
196 run_instance: run_instance_scheduled(run_id, task_id, scheduled_at),
197 },
198 );
199 *sequence += 1;
200
201 use actionqueue_core::run::state::RunState;
202 match target_state {
203 RunState::Scheduled => {}
204 RunState::Ready => {
205 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready)
206 }
207 RunState::Leased => {
208 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
209 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
210 }
211 RunState::Running => {
212 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
213 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
214 transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
215 }
216 RunState::RetryWait => {
217 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
218 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
219 transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
220 transition_state(reducer, sequence, run_id, RunState::Running, RunState::RetryWait);
221 }
222 RunState::Completed => {
223 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
224 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
225 transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
226 transition_state(reducer, sequence, run_id, RunState::Running, RunState::Completed);
227 }
228 RunState::Failed => {
229 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
230 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
231 transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
232 transition_state(reducer, sequence, run_id, RunState::Running, RunState::Failed);
233 }
234 RunState::Canceled => {
235 apply_event(
236 reducer,
237 *sequence,
238 WalEventType::RunCanceled { run_id, timestamp: *sequence + 1_000 },
239 );
240 *sequence += 1;
241 }
242 RunState::Suspended => {
243 transition_state(reducer, sequence, run_id, RunState::Scheduled, RunState::Ready);
244 transition_state(reducer, sequence, run_id, RunState::Ready, RunState::Leased);
245 transition_state(reducer, sequence, run_id, RunState::Leased, RunState::Running);
246 transition_state(reducer, sequence, run_id, RunState::Running, RunState::Suspended);
247 }
248 }
249 }
250
251 fn build_state(
252 projection: ReplayReducer,
253 metrics: Arc<MetricsRegistry>,
254 now_unix_seconds: u64,
255 ) -> crate::http::RouterStateInner {
256 let clock: SharedDaemonClock = Arc::new(MockClock::new(now_unix_seconds));
257 crate::http::RouterStateInner::new(
258 RouterConfig { control_enabled: false, metrics_enabled: false },
259 Arc::new(std::sync::RwLock::new(projection)),
260 crate::http::RouterObservability {
261 metrics,
262 wal_append_telemetry: WalAppendTelemetry::new(),
263 clock,
264 recovery_observations: RecoveryObservations::zero(),
265 },
266 ReadyStatus::ready(),
267 )
268 }
269
270 fn run_total_value(
271 metrics: &MetricsRegistry,
272 state: actionqueue_core::run::state::RunState,
273 ) -> f64 {
274 metrics.collectors().runs_total().with_label_values(&[state_label(state)]).get()
275 }
276
277 #[test]
278 fn state_to_label_mapping_covers_all_run_states() {
279 use actionqueue_core::run::state::RunState;
280
281 assert_eq!(state_label(RunState::Scheduled), "scheduled");
282 assert_eq!(state_label(RunState::Ready), "ready");
283 assert_eq!(state_label(RunState::Leased), "leased");
284 assert_eq!(state_label(RunState::Running), "running");
285 assert_eq!(state_label(RunState::RetryWait), "retry_wait");
286 assert_eq!(state_label(RunState::Completed), "completed");
287 assert_eq!(state_label(RunState::Failed), "failed");
288 assert_eq!(state_label(RunState::Canceled), "canceled");
289 }
290
291 #[test]
292 fn lag_eligibility_includes_only_non_terminal_readiness_path_states() {
293 use actionqueue_core::run::state::RunState;
294
295 assert!(!is_lag_eligible(RunState::Scheduled));
296 assert!(is_lag_eligible(RunState::Ready));
297 assert!(is_lag_eligible(RunState::Leased));
298 assert!(is_lag_eligible(RunState::Running));
299 assert!(is_lag_eligible(RunState::RetryWait));
300 assert!(!is_lag_eligible(RunState::Completed));
301 assert!(!is_lag_eligible(RunState::Failed));
302 assert!(!is_lag_eligible(RunState::Canceled));
303 }
304
305 #[test]
306 fn update_overwrites_gauges_for_repeated_projection_snapshots() {
307 use actionqueue_core::run::state::RunState;
308
309 let task_id = TaskId::new();
310 let mut projection_one = ReplayReducer::new();
311 apply_event(
312 &mut projection_one,
313 1,
314 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
315 );
316 let mut sequence = 2;
317 seed_run_state(
318 &mut projection_one,
319 &mut sequence,
320 RunId::new(),
321 task_id,
322 RunState::Ready,
323 100,
324 );
325 seed_run_state(
326 &mut projection_one,
327 &mut sequence,
328 RunId::new(),
329 task_id,
330 RunState::Completed,
331 100,
332 );
333
334 let metrics =
335 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
336 let state_one = build_state(projection_one, Arc::clone(&metrics), 200);
337 update(&state_one);
338
339 assert_eq!(run_total_value(&metrics, RunState::Ready), 1.0);
340 assert_eq!(run_total_value(&metrics, RunState::Completed), 1.0);
341
342 let mut projection_two = ReplayReducer::new();
343 apply_event(
344 &mut projection_two,
345 1,
346 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
347 );
348 let mut sequence = 2;
349 seed_run_state(
350 &mut projection_two,
351 &mut sequence,
352 RunId::new(),
353 task_id,
354 RunState::Running,
355 150,
356 );
357
358 let state_two = build_state(projection_two, Arc::clone(&metrics), 300);
359 update(&state_two);
360
361 assert_eq!(run_total_value(&metrics, RunState::Ready), 0.0);
362 assert_eq!(run_total_value(&metrics, RunState::Completed), 0.0);
363 assert_eq!(run_total_value(&metrics, RunState::Running), 1.0);
364 }
365
366 #[test]
367 fn ready_and_running_aggregates_match_state_counts() {
368 use actionqueue_core::run::state::RunState;
369
370 let task_id = TaskId::new();
371 let mut projection = ReplayReducer::new();
372 apply_event(
373 &mut projection,
374 1,
375 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
376 );
377
378 let mut sequence = 2;
379 seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 100);
380 seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 100);
381 seed_run_state(
382 &mut projection,
383 &mut sequence,
384 RunId::new(),
385 task_id,
386 RunState::Running,
387 100,
388 );
389 seed_run_state(
390 &mut projection,
391 &mut sequence,
392 RunId::new(),
393 task_id,
394 RunState::Running,
395 100,
396 );
397 seed_run_state(
398 &mut projection,
399 &mut sequence,
400 RunId::new(),
401 task_id,
402 RunState::Scheduled,
403 100,
404 );
405
406 let metrics =
407 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
408 let state = build_state(projection, Arc::clone(&metrics), 300);
409 update(&state);
410
411 assert_eq!(metrics.collectors().runs_ready().get(), 2.0);
412 assert_eq!(metrics.collectors().runs_running().get(), 2.0);
413 assert_eq!(run_total_value(&metrics, RunState::Ready), 2.0);
414 assert_eq!(run_total_value(&metrics, RunState::Running), 2.0);
415 }
416
417 #[test]
418 fn lag_observation_uses_saturating_subtraction() {
419 use actionqueue_core::run::state::RunState;
420
421 let task_id = TaskId::new();
422 let mut projection = ReplayReducer::new();
423 apply_event(
424 &mut projection,
425 1,
426 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
427 );
428
429 let mut sequence = 2;
430 seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 200);
431
432 let metrics =
433 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
434 let state = build_state(projection, Arc::clone(&metrics), 100);
435 update(&state);
436
437 assert_eq!(metrics.collectors().scheduling_lag_seconds().get_sample_count(), 1);
438 assert_eq!(metrics.collectors().scheduling_lag_seconds().get_sample_sum(), 0.0);
439 }
440
441 #[test]
442 fn lag_observation_uses_router_state_clock_value() {
443 use actionqueue_core::run::state::RunState;
444
445 let task_id = TaskId::new();
446 let mut projection = ReplayReducer::new();
447 apply_event(
448 &mut projection,
449 1,
450 WalEventType::TaskCreated { task_spec: build_task_spec(task_id), timestamp: 1 },
451 );
452 let mut sequence = 2;
453 seed_run_state(&mut projection, &mut sequence, RunId::new(), task_id, RunState::Ready, 40);
454
455 let metrics =
456 Arc::new(MetricsRegistry::new(None).expect("metrics registry should initialize"));
457
458 let state_at_50 = build_state(projection.clone(), Arc::clone(&metrics), 50);
459 update(&state_at_50);
460 let sample_count_after_first =
461 metrics.collectors().scheduling_lag_seconds().get_sample_count();
462 let sample_sum_after_first = metrics.collectors().scheduling_lag_seconds().get_sample_sum();
463
464 let state_at_90 = build_state(projection, Arc::clone(&metrics), 90);
465 update(&state_at_90);
466 let sample_count_after_second =
467 metrics.collectors().scheduling_lag_seconds().get_sample_count();
468 let sample_sum_after_second =
469 metrics.collectors().scheduling_lag_seconds().get_sample_sum();
470
471 assert_eq!(sample_count_after_first, 1);
472 assert_eq!(sample_sum_after_first, 10.0);
473 assert_eq!(sample_count_after_second, 2);
474 assert_eq!(sample_sum_after_second, 60.0);
475 }
476}