1use std::time::{Duration, Instant};
11use std::{
12 sync::atomic::{AtomicU64, Ordering},
13 sync::Arc,
14};
15
16use actionqueue_core::budget::BudgetConsumption;
17use actionqueue_core::ids::{AttemptId, RunId};
18
19use crate::handler::{AttemptMetadata, ExecutorHandler, HandlerInput, HandlerOutput};
20use crate::retry::{decide_retry_transition, RetryDecision, RetryDecisionError};
21use crate::timeout::{TimeoutClassification, TimeoutClock, TimeoutFailure, TimeoutGuard};
22use crate::types::{ExecutorRequest, ExecutorResponse};
23
24const DEFAULT_MAX_CANCELLATION_POLL_LATENCY: Duration = Duration::from_millis(250);
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct TimeoutCadencePolicy {
29 max_cancellation_poll_latency: Duration,
30}
31
32impl TimeoutCadencePolicy {
33 pub fn new(max_cancellation_poll_latency: Duration) -> Self {
35 Self { max_cancellation_poll_latency }
36 }
37
38 pub fn max_cancellation_poll_latency(&self) -> Duration {
40 self.max_cancellation_poll_latency
41 }
42}
43
44impl Default for TimeoutCadencePolicy {
45 fn default() -> Self {
46 Self::new(DEFAULT_MAX_CANCELLATION_POLL_LATENCY)
47 }
48}
49
50#[derive(Debug, Clone, Copy)]
51struct AttemptTimeoutClock<'a, T> {
52 timer: &'a T,
53}
54
55impl<'a, T> TimeoutClock for AttemptTimeoutClock<'a, T>
56where
57 T: AttemptTimer,
58{
59 type Mark = T::Mark;
60
61 fn mark_now(&self) -> Self::Mark {
62 self.timer.start()
63 }
64
65 fn elapsed_since(&self, mark: Self::Mark) -> Duration {
66 self.timer.elapsed_since(mark)
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum AttemptOutcomeKind {
73 Success,
75 RetryableFailure,
77 TerminalFailure,
79 Timeout,
81 Suspended,
83}
84
85impl AttemptOutcomeKind {
86 pub fn from_response(response: &ExecutorResponse) -> Self {
87 match response {
88 ExecutorResponse::Success { .. } => Self::Success,
89 ExecutorResponse::RetryableFailure { .. } => Self::RetryableFailure,
90 ExecutorResponse::TerminalFailure { .. } => Self::TerminalFailure,
91 ExecutorResponse::Timeout { .. } => Self::Timeout,
92 ExecutorResponse::Suspended { .. } => Self::Suspended,
93 }
94 }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct RetryDecisionInput {
100 pub run_id: RunId,
102 pub attempt_id: AttemptId,
104 pub attempt_number: u32,
106 pub max_attempts: u32,
108 pub outcome_kind: AttemptOutcomeKind,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum TimeoutCooperation {
115 NotApplicable,
117 Cooperative,
119 CooperativeThresholdBreach,
121 NonCooperative,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127#[must_use = "metrics snapshot should be inspected or reported"]
128pub struct TimeoutCooperationMetricsSnapshot {
129 pub cooperative: u64,
131 pub cooperative_threshold_breach: u64,
133 pub non_cooperative: u64,
135 pub not_applicable: u64,
137}
138
139#[derive(Debug, Clone, Default)]
141pub struct TimeoutCooperationMetrics {
142 cooperative: Arc<AtomicU64>,
143 cooperative_threshold_breach: Arc<AtomicU64>,
144 non_cooperative: Arc<AtomicU64>,
145 not_applicable: Arc<AtomicU64>,
146}
147
148impl TimeoutCooperationMetrics {
149 pub fn record(&self, cooperation: TimeoutCooperation) {
151 match cooperation {
152 TimeoutCooperation::Cooperative => {
153 self.cooperative.fetch_add(1, Ordering::SeqCst);
154 }
155 TimeoutCooperation::CooperativeThresholdBreach => {
156 self.cooperative_threshold_breach.fetch_add(1, Ordering::SeqCst);
157 }
158 TimeoutCooperation::NonCooperative => {
159 self.non_cooperative.fetch_add(1, Ordering::SeqCst);
160 }
161 TimeoutCooperation::NotApplicable => {
162 self.not_applicable.fetch_add(1, Ordering::SeqCst);
163 }
164 }
165 }
166
167 pub fn snapshot(&self) -> TimeoutCooperationMetricsSnapshot {
169 TimeoutCooperationMetricsSnapshot {
170 cooperative: self.cooperative.load(Ordering::SeqCst),
171 cooperative_threshold_breach: self.cooperative_threshold_breach.load(Ordering::SeqCst),
172 non_cooperative: self.non_cooperative.load(Ordering::SeqCst),
173 not_applicable: self.not_applicable.load(Ordering::SeqCst),
174 }
175 }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct TimeoutEnforcementReport {
181 pub cancellation_requested: bool,
183 pub cancellation_observed: bool,
185 pub cancellation_observation_latency: Option<Duration>,
187 pub cadence_threshold: Duration,
189 pub watchdog_joined: bool,
191 pub cooperation: TimeoutCooperation,
193}
194
195#[derive(Debug, Clone, PartialEq, Eq)]
197#[must_use = "attempt outcome should be inspected for state transition decisions"]
198pub struct AttemptOutcomeRecord {
199 pub run_id: RunId,
201 pub attempt_id: AttemptId,
203 pub response: ExecutorResponse,
205 pub elapsed: Duration,
207 pub timeout_classification: TimeoutClassification,
209 pub timeout_enforcement: TimeoutEnforcementReport,
211 pub retry_decision_input: RetryDecisionInput,
213 pub retry_decision: Result<RetryDecision, RetryDecisionError>,
218 pub consumption: Vec<BudgetConsumption>,
220}
221
222pub trait AttemptTimer {
224 type Mark: Copy;
226
227 fn start(&self) -> Self::Mark;
229
230 fn elapsed_since(&self, mark: Self::Mark) -> Duration;
232}
233
234#[derive(Debug, Clone, Copy, Default)]
236pub struct SystemAttemptTimer;
237
238impl AttemptTimer for SystemAttemptTimer {
239 type Mark = Instant;
240
241 fn start(&self) -> Self::Mark {
242 Instant::now()
243 }
244
245 fn elapsed_since(&self, mark: Self::Mark) -> Duration {
246 mark.elapsed()
247 }
248}
249
250#[derive(Debug, Clone)]
252pub struct AttemptRunner<H, T = SystemAttemptTimer> {
253 handler: H,
254 timer: T,
255 timeout_metrics: TimeoutCooperationMetrics,
256 timeout_cadence_policy: TimeoutCadencePolicy,
257}
258
259impl<H> AttemptRunner<H, SystemAttemptTimer>
260where
261 H: ExecutorHandler,
262{
263 pub fn new(handler: H) -> Self {
265 Self {
266 handler,
267 timer: SystemAttemptTimer,
268 timeout_metrics: TimeoutCooperationMetrics::default(),
269 timeout_cadence_policy: TimeoutCadencePolicy::default(),
270 }
271 }
272}
273
274impl<H, T> AttemptRunner<H, T>
275where
276 H: ExecutorHandler,
277 T: AttemptTimer,
278{
279 pub fn with_timer(handler: H, timer: T) -> Self {
281 Self {
282 handler,
283 timer,
284 timeout_metrics: TimeoutCooperationMetrics::default(),
285 timeout_cadence_policy: TimeoutCadencePolicy::default(),
286 }
287 }
288
289 pub fn with_timer_and_metrics(
291 handler: H,
292 timer: T,
293 timeout_metrics: TimeoutCooperationMetrics,
294 ) -> Self {
295 Self {
296 handler,
297 timer,
298 timeout_metrics,
299 timeout_cadence_policy: TimeoutCadencePolicy::default(),
300 }
301 }
302
303 pub fn with_timer_metrics_and_cadence_policy(
305 handler: H,
306 timer: T,
307 timeout_metrics: TimeoutCooperationMetrics,
308 timeout_cadence_policy: TimeoutCadencePolicy,
309 ) -> Self {
310 Self { handler, timer, timeout_metrics, timeout_cadence_policy }
311 }
312
313 pub fn timeout_metrics(&self) -> &TimeoutCooperationMetrics {
315 &self.timeout_metrics
316 }
317
318 pub fn timeout_cadence_policy(&self) -> TimeoutCadencePolicy {
320 self.timeout_cadence_policy
321 }
322
323 pub fn run_attempt(&self, mut request: ExecutorRequest) -> AttemptOutcomeRecord {
325 let run_id = request.run_id;
326 let attempt_id = request.attempt_id;
327 let attempt_number = request.attempt_number;
328 let max_attempts = request.constraints.max_attempts();
329 let timeout_secs = request.constraints.timeout_secs();
330
331 let clock = AttemptTimeoutClock { timer: &self.timer };
332 let payload = request.payload;
333 let safety_level = request.constraints.safety_level();
334 let metadata = AttemptMetadata { max_attempts, attempt_number, timeout_secs, safety_level };
335 let submission = request.submission.take();
336 let children = request.children.take();
337 let external_ctx = request.cancellation_context.take();
338 let guard = TimeoutGuard::with_clock(clock);
339 let make_handler_call = |cancellation_context: &crate::handler::CancellationContext| {
340 self.handler.execute(crate::handler::ExecutorContext {
341 input: HandlerInput {
342 run_id,
343 attempt_id,
344 payload,
345 metadata,
346 cancellation_context: cancellation_context.clone(),
347 },
348 submission,
349 children,
350 })
351 };
352 let guarded = if let Some(ctx) = external_ctx {
353 guard.execute_with_external_cancellation(timeout_secs, ctx, make_handler_call)
354 } else {
355 guard.execute_with_cancellation(timeout_secs, make_handler_call)
356 };
357 let crate::timeout::CancellableExecution {
358 value: handler_output,
359 elapsed,
360 timeout: timeout_classification,
361 cancel_requested,
362 cancellation_observed,
363 cancellation_observation_latency,
364 watchdog_joined,
365 watchdog_spawn_failed: _,
366 } = guarded;
367
368 let cooperation = classify_timeout_cooperation(
369 &timeout_classification,
370 cancellation_observed,
371 cancellation_observation_latency,
372 self.timeout_cadence_policy,
373 );
374 let timeout_enforcement = TimeoutEnforcementReport {
375 cancellation_requested: cancel_requested,
376 cancellation_observed,
377 cancellation_observation_latency,
378 cadence_threshold: self.timeout_cadence_policy.max_cancellation_poll_latency(),
379 watchdog_joined,
380 cooperation,
381 };
382 self.timeout_metrics.record(timeout_enforcement.cooperation);
383
384 let (response, consumption) = classify_response(handler_output, &timeout_classification);
385 let retry_decision_input = RetryDecisionInput {
386 run_id,
387 attempt_id,
388 attempt_number,
389 max_attempts,
390 outcome_kind: AttemptOutcomeKind::from_response(&response),
391 };
392 let retry_decision = decide_retry_transition(&retry_decision_input);
393
394 AttemptOutcomeRecord {
395 run_id,
396 attempt_id,
397 response,
398 elapsed,
399 timeout_classification,
400 timeout_enforcement,
401 retry_decision_input,
402 retry_decision,
403 consumption,
404 }
405 }
406}
407
408fn classify_response(
409 output: HandlerOutput,
410 timeout: &TimeoutClassification,
411) -> (ExecutorResponse, Vec<BudgetConsumption>) {
412 let consumption = output.consumption().to_vec();
413
414 if let TimeoutClassification::TimedOut(TimeoutFailure { timeout_secs, .. }) = timeout {
417 if !matches!(output, HandlerOutput::Suspended { .. }) {
418 return (ExecutorResponse::Timeout { timeout_secs: *timeout_secs }, consumption);
419 }
420 }
421
422 let response = match output {
423 HandlerOutput::Success { output, .. } => ExecutorResponse::Success { output },
424 HandlerOutput::RetryableFailure { error, .. } => {
425 ExecutorResponse::RetryableFailure { error }
426 }
427 HandlerOutput::TerminalFailure { error, .. } => ExecutorResponse::TerminalFailure { error },
428 HandlerOutput::Suspended { output, .. } => ExecutorResponse::Suspended { output },
429 };
430 (response, consumption)
431}
432
433fn classify_timeout_cooperation(
434 timeout_classification: &TimeoutClassification,
435 cancellation_observed: bool,
436 cancellation_observation_latency: Option<Duration>,
437 timeout_cadence_policy: TimeoutCadencePolicy,
438) -> TimeoutCooperation {
439 if !timeout_classification.is_timed_out() {
440 return TimeoutCooperation::NotApplicable;
441 }
442
443 if !cancellation_observed {
444 return TimeoutCooperation::NonCooperative;
445 }
446
447 match cancellation_observation_latency {
448 Some(latency) if latency > timeout_cadence_policy.max_cancellation_poll_latency() => {
449 TimeoutCooperation::CooperativeThresholdBreach
450 }
451 Some(_) => TimeoutCooperation::Cooperative,
452 None => TimeoutCooperation::NonCooperative,
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::sync::Mutex;
459 use std::time::Duration;
460
461 use actionqueue_core::ids::{AttemptId, RunId};
462 use actionqueue_core::task::constraints::TaskConstraints;
463
464 use super::{
465 AttemptOutcomeKind, AttemptRunner, AttemptTimer, TimeoutCadencePolicy, TimeoutCooperation,
466 TimeoutCooperationMetrics, TimeoutCooperationMetricsSnapshot,
467 };
468 use crate::handler::{ExecutorContext, ExecutorHandler, HandlerInput, HandlerOutput};
469 use crate::retry::RetryDecision;
470 use crate::timeout::{TimeoutClassification, TimeoutFailure, TimeoutReasonCode};
471 use crate::types::ExecutorRequest;
472
473 #[derive(Debug, Clone, Copy)]
474 struct FixedTimer {
475 elapsed: Duration,
476 }
477
478 impl AttemptTimer for FixedTimer {
479 type Mark = ();
480
481 fn start(&self) -> Self::Mark {}
482
483 fn elapsed_since(&self, _mark: Self::Mark) -> Duration {
484 self.elapsed
485 }
486 }
487
488 struct RecordingHandler {
489 output: HandlerOutput,
490 input: Mutex<Option<HandlerInput>>,
491 }
492
493 impl RecordingHandler {
494 fn new(output: HandlerOutput) -> Self {
495 Self { output, input: Mutex::new(None) }
496 }
497 }
498
499 impl ExecutorHandler for RecordingHandler {
500 fn execute(&self, ctx: ExecutorContext) -> HandlerOutput {
501 let input = ctx.input;
502 *self.input.lock().unwrap() = Some(input);
503 self.output.clone()
504 }
505 }
506
507 #[test]
508 fn run_attempt_propagates_ids_and_maps_success() {
509 let run_id = RunId::new();
510 let attempt_id = AttemptId::new();
511 let handler = RecordingHandler::new(HandlerOutput::Success {
512 output: Some(vec![1, 2, 3]),
513 consumption: vec![],
514 });
515 let runner =
516 AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_millis(5) });
517
518 let request = ExecutorRequest {
519 run_id,
520 attempt_id,
521 payload: vec![9, 8, 7],
522 constraints: TaskConstraints::new(3, Some(60), None)
523 .expect("test constraints must be valid"),
524 attempt_number: 2,
525 submission: None,
526 children: None,
527 cancellation_context: None,
528 };
529
530 let record = runner.run_attempt(request);
531
532 assert_eq!(record.run_id, run_id);
533 assert_eq!(record.attempt_id, attempt_id);
534 assert_eq!(
535 record.response,
536 crate::types::ExecutorResponse::Success { output: Some(vec![1, 2, 3]) }
537 );
538 assert_eq!(record.retry_decision, Ok(RetryDecision::Complete));
539 assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::Success);
540 assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NotApplicable);
541 assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
542 assert_eq!(
543 record.timeout_enforcement.cadence_threshold,
544 TimeoutCadencePolicy::default().max_cancellation_poll_latency()
545 );
546 assert_eq!(
547 record.timeout_classification,
548 TimeoutClassification::CompletedInTime {
549 timeout_secs: 60,
550 elapsed: Duration::from_millis(5),
551 reason_code: TimeoutReasonCode::WithinLimit,
552 }
553 );
554
555 let captured =
556 runner.handler.input.lock().unwrap().clone().expect("handler should capture input");
557
558 assert_eq!(captured.run_id, run_id);
559 assert_eq!(captured.attempt_id, attempt_id);
560 assert_eq!(captured.metadata.max_attempts, 3);
561 assert_eq!(captured.metadata.attempt_number, 2);
562 assert_eq!(captured.metadata.timeout_secs, Some(60));
563 }
564
565 #[test]
566 fn run_attempt_marks_timeout_when_elapsed_exceeds_limit() {
567 let handler = RecordingHandler::new(HandlerOutput::Success {
568 output: Some(vec![42]),
569 consumption: vec![],
570 });
571 let runner =
572 AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_secs(2) });
573
574 let request = ExecutorRequest {
575 run_id: RunId::new(),
576 attempt_id: AttemptId::new(),
577 payload: vec![],
578 constraints: TaskConstraints::new(2, Some(1), None)
579 .expect("test constraints must be valid"),
580 attempt_number: 1,
581 submission: None,
582 children: None,
583 cancellation_context: None,
584 };
585
586 let record = runner.run_attempt(request);
587
588 assert_eq!(record.response, crate::types::ExecutorResponse::Timeout { timeout_secs: 1 });
589 assert_eq!(record.retry_decision, Ok(RetryDecision::Retry));
590 assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::Timeout);
591 assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NonCooperative);
592 assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
593 assert_eq!(
594 record.timeout_enforcement.cadence_threshold,
595 TimeoutCadencePolicy::default().max_cancellation_poll_latency()
596 );
597 assert_eq!(
598 record.timeout_classification,
599 TimeoutClassification::TimedOut(crate::timeout::TimeoutFailure {
600 timeout_secs: 1,
601 elapsed: Duration::from_secs(2),
602 reason_code: TimeoutReasonCode::DeadlineExceeded,
603 })
604 );
605 }
606
607 #[test]
608 fn run_attempt_preserves_retryable_failure_without_timeout() {
609 let handler = RecordingHandler::new(HandlerOutput::RetryableFailure {
610 error: "transient error".to_string(),
611 consumption: vec![],
612 });
613 let runner =
614 AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_millis(1) });
615
616 let request = ExecutorRequest {
617 run_id: RunId::new(),
618 attempt_id: AttemptId::new(),
619 payload: vec![],
620 constraints: TaskConstraints::new(5, Some(30), None)
621 .expect("test constraints must be valid"),
622 attempt_number: 3,
623 submission: None,
624 children: None,
625 cancellation_context: None,
626 };
627
628 let record = runner.run_attempt(request);
629
630 assert_eq!(
631 record.response,
632 crate::types::ExecutorResponse::RetryableFailure {
633 error: "transient error".to_string(),
634 }
635 );
636 assert_eq!(record.retry_decision, Ok(RetryDecision::Retry));
637 assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::RetryableFailure);
638 assert_eq!(record.retry_decision_input.attempt_number, 3);
639 assert_eq!(record.retry_decision_input.max_attempts, 5);
640 assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NotApplicable);
641 assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
642 }
643
644 #[test]
645 fn run_attempt_emits_timeout_cooperation_metric_once() {
646 let metrics = TimeoutCooperationMetrics::default();
647 let runner = AttemptRunner::with_timer_and_metrics(
648 RecordingHandler::new(HandlerOutput::Success {
649 output: Some(vec![7]),
650 consumption: vec![],
651 }),
652 FixedTimer { elapsed: Duration::from_secs(3) },
653 metrics.clone(),
654 );
655
656 let request = ExecutorRequest {
657 run_id: RunId::new(),
658 attempt_id: AttemptId::new(),
659 payload: vec![],
660 constraints: TaskConstraints::new(2, Some(1), None)
661 .expect("test constraints must be valid"),
662 attempt_number: 1,
663 submission: None,
664 children: None,
665 cancellation_context: None,
666 };
667
668 let _record = runner.run_attempt(request);
669
670 assert_eq!(
671 metrics.snapshot(),
672 TimeoutCooperationMetricsSnapshot {
673 cooperative: 0,
674 cooperative_threshold_breach: 0,
675 non_cooperative: 1,
676 not_applicable: 0,
677 }
678 );
679 }
680
681 #[test]
682 fn timeout_cooperation_marks_threshold_breach_when_observation_latency_exceeds_policy() {
683 let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
684 timeout_secs: 1,
685 elapsed: Duration::from_secs(2),
686 reason_code: TimeoutReasonCode::DeadlineExceeded,
687 });
688
689 let cooperation = super::classify_timeout_cooperation(
690 &timeout,
691 true,
692 Some(Duration::from_millis(15)),
693 TimeoutCadencePolicy::new(Duration::from_millis(5)),
694 );
695
696 assert_eq!(cooperation, TimeoutCooperation::CooperativeThresholdBreach);
697 }
698
699 #[test]
700 fn timeout_cooperation_marks_cooperative_when_observation_latency_is_within_policy() {
701 let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
702 timeout_secs: 1,
703 elapsed: Duration::from_secs(2),
704 reason_code: TimeoutReasonCode::DeadlineExceeded,
705 });
706
707 let cooperation = super::classify_timeout_cooperation(
708 &timeout,
709 true,
710 Some(Duration::from_millis(5)),
711 TimeoutCadencePolicy::new(Duration::from_millis(10)),
712 );
713
714 assert_eq!(cooperation, TimeoutCooperation::Cooperative);
715 }
716
717 #[test]
718 fn classify_response_suspended_takes_priority_over_timeout() {
719 let output = HandlerOutput::Suspended { output: Some(vec![1, 2, 3]), consumption: vec![] };
720 let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
721 timeout_secs: 10,
722 elapsed: Duration::from_secs(15),
723 reason_code: TimeoutReasonCode::DeadlineExceeded,
724 });
725 let (response, _) = super::classify_response(output, &timeout);
726 assert!(
727 matches!(response, crate::types::ExecutorResponse::Suspended { .. }),
728 "Suspended should take priority over Timeout, got: {response:?}"
729 );
730 }
731}