Skip to main content

actionqueue_executor_local/
attempt_runner.rs

1//! Attempt runner for local executor orchestration.
2//!
3//! The runner composes three concerns for a single attempt execution:
4//! - handler invocation with explicit run/attempt identity,
5//! - timeout classification and enforcement with explicit cancellation-poll cadence evidence,
6//! - retry-decision input generation from the terminal attempt outcome.
7//!
8//! This module intentionally does not mutate run derivation/accounting state.
9
10use std::time::{Duration, Instant};
11use std::{
12    sync::atomic::{AtomicU64, Ordering},
13    sync::Arc,
14};
15
16use actionqueue_core::budget::BudgetConsumption;
17use actionqueue_core::ids::{AttemptId, RunId};
18
19use crate::handler::{AttemptMetadata, ExecutorHandler, HandlerInput, HandlerOutput};
20use crate::retry::{decide_retry_transition, RetryDecision, RetryDecisionError};
21use crate::timeout::{TimeoutClassification, TimeoutClock, TimeoutFailure, TimeoutGuard};
22use crate::types::{ExecutorRequest, ExecutorResponse};
23
24const DEFAULT_MAX_CANCELLATION_POLL_LATENCY: Duration = Duration::from_millis(250);
25
26/// Policy surface controlling timeout cancellation-poll cadence enforcement.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct TimeoutCadencePolicy {
29    max_cancellation_poll_latency: Duration,
30}
31
32impl TimeoutCadencePolicy {
33    /// Creates a cadence policy with an explicit max cancellation-poll latency.
34    pub fn new(max_cancellation_poll_latency: Duration) -> Self {
35        Self { max_cancellation_poll_latency }
36    }
37
38    /// Returns the maximum allowed latency from cancellation request to first observed poll.
39    pub fn max_cancellation_poll_latency(&self) -> Duration {
40        self.max_cancellation_poll_latency
41    }
42}
43
44impl Default for TimeoutCadencePolicy {
45    fn default() -> Self {
46        Self::new(DEFAULT_MAX_CANCELLATION_POLL_LATENCY)
47    }
48}
49
50#[derive(Debug, Clone, Copy)]
51struct AttemptTimeoutClock<'a, T> {
52    timer: &'a T,
53}
54
55impl<'a, T> TimeoutClock for AttemptTimeoutClock<'a, T>
56where
57    T: AttemptTimer,
58{
59    type Mark = T::Mark;
60
61    fn mark_now(&self) -> Self::Mark {
62        self.timer.start()
63    }
64
65    fn elapsed_since(&self, mark: Self::Mark) -> Duration {
66        self.timer.elapsed_since(mark)
67    }
68}
69
70/// Classification of a terminal attempt outcome for retry-decision inputs.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum AttemptOutcomeKind {
73    /// The attempt completed successfully.
74    Success,
75    /// The attempt failed and may be retried.
76    RetryableFailure,
77    /// The attempt failed permanently.
78    TerminalFailure,
79    /// The attempt exceeded timeout constraints.
80    Timeout,
81    /// The attempt was voluntarily suspended (budget exhaustion / preemption).
82    Suspended,
83}
84
85impl AttemptOutcomeKind {
86    pub fn from_response(response: &ExecutorResponse) -> Self {
87        match response {
88            ExecutorResponse::Success { .. } => Self::Success,
89            ExecutorResponse::RetryableFailure { .. } => Self::RetryableFailure,
90            ExecutorResponse::TerminalFailure { .. } => Self::TerminalFailure,
91            ExecutorResponse::Timeout { .. } => Self::Timeout,
92            ExecutorResponse::Suspended { .. } => Self::Suspended,
93        }
94    }
95}
96
97/// Retry input payload derived from one completed attempt.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct RetryDecisionInput {
100    /// Run identifier for the attempt.
101    pub run_id: RunId,
102    /// Attempt identifier for the attempt.
103    pub attempt_id: AttemptId,
104    /// Attempt number for this execution (1-indexed).
105    pub attempt_number: u32,
106    /// Hard cap for attempts from task constraints snapshot.
107    pub max_attempts: u32,
108    /// Terminal outcome classification for retry policy evaluation.
109    pub outcome_kind: AttemptOutcomeKind,
110}
111
112/// Cooperative-cancellation status for timeout enforcement.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum TimeoutCooperation {
115    /// Timeout cancellation did not apply (timeout disabled or no timeout classification).
116    NotApplicable,
117    /// Timeout occurred and the handler observed cancellation within cadence policy.
118    Cooperative,
119    /// Timeout occurred and cancellation was observed, but beyond cadence threshold.
120    CooperativeThresholdBreach,
121    /// Timeout occurred but the handler never observed cancellation.
122    NonCooperative,
123}
124
125/// Snapshot of timeout-cooperation metric counters.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127#[must_use = "metrics snapshot should be inspected or reported"]
128pub struct TimeoutCooperationMetricsSnapshot {
129    /// Number of timeout outcomes where cancellation was observed cooperatively.
130    pub cooperative: u64,
131    /// Number of timeout outcomes where cancellation observation breached cadence threshold.
132    pub cooperative_threshold_breach: u64,
133    /// Number of timeout outcomes where cancellation was not observed.
134    pub non_cooperative: u64,
135    /// Number of outcomes where timeout cooperation did not apply.
136    pub not_applicable: u64,
137}
138
139/// Concrete timeout-cooperation metric sink for attempt outcomes.
140#[derive(Debug, Clone, Default)]
141pub struct TimeoutCooperationMetrics {
142    cooperative: Arc<AtomicU64>,
143    cooperative_threshold_breach: Arc<AtomicU64>,
144    non_cooperative: Arc<AtomicU64>,
145    not_applicable: Arc<AtomicU64>,
146}
147
148impl TimeoutCooperationMetrics {
149    /// Records one timeout-cooperation outcome.
150    pub fn record(&self, cooperation: TimeoutCooperation) {
151        match cooperation {
152            TimeoutCooperation::Cooperative => {
153                self.cooperative.fetch_add(1, Ordering::SeqCst);
154            }
155            TimeoutCooperation::CooperativeThresholdBreach => {
156                self.cooperative_threshold_breach.fetch_add(1, Ordering::SeqCst);
157            }
158            TimeoutCooperation::NonCooperative => {
159                self.non_cooperative.fetch_add(1, Ordering::SeqCst);
160            }
161            TimeoutCooperation::NotApplicable => {
162                self.not_applicable.fetch_add(1, Ordering::SeqCst);
163            }
164        }
165    }
166
167    /// Returns current metric counts.
168    pub fn snapshot(&self) -> TimeoutCooperationMetricsSnapshot {
169        TimeoutCooperationMetricsSnapshot {
170            cooperative: self.cooperative.load(Ordering::SeqCst),
171            cooperative_threshold_breach: self.cooperative_threshold_breach.load(Ordering::SeqCst),
172            non_cooperative: self.non_cooperative.load(Ordering::SeqCst),
173            not_applicable: self.not_applicable.load(Ordering::SeqCst),
174        }
175    }
176}
177
178/// Inspectable timeout-enforcement report for one attempt.
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct TimeoutEnforcementReport {
181    /// True if the timeout watchdog requested cancellation while work was active.
182    pub cancellation_requested: bool,
183    /// True if a handler poll observed cancellation.
184    pub cancellation_observed: bool,
185    /// Latency from cancellation request to first observed cancellation poll.
186    pub cancellation_observation_latency: Option<Duration>,
187    /// Cadence threshold used to classify timeout cooperation.
188    pub cadence_threshold: Duration,
189    /// True if a watchdog worker was started and deterministically joined.
190    pub watchdog_joined: bool,
191    /// Cooperative-cancellation interpretation for metrics and gate assertions.
192    pub cooperation: TimeoutCooperation,
193}
194
195/// Terminal record for one completed attempt execution.
196#[derive(Debug, Clone, PartialEq, Eq)]
197#[must_use = "attempt outcome should be inspected for state transition decisions"]
198pub struct AttemptOutcomeRecord {
199    /// Run identifier propagated through execution boundary.
200    pub run_id: RunId,
201    /// Attempt identifier propagated through execution boundary.
202    pub attempt_id: AttemptId,
203    /// Deterministic attempt response classification.
204    pub response: ExecutorResponse,
205    /// Measured attempt execution duration.
206    pub elapsed: Duration,
207    /// Explicit timeout classification with stable reason-code semantics.
208    pub timeout_classification: TimeoutClassification,
209    /// Timeout enforcement report including cadence-aware cooperation classification.
210    pub timeout_enforcement: TimeoutEnforcementReport,
211    /// Retry input derived from this exact attempt outcome.
212    pub retry_decision_input: RetryDecisionInput,
213    /// Retry transition decision derived via [`crate::retry::decide_retry_transition`].
214    ///
215    /// An error indicates invalid attempt-counter inputs (for example `N + 1`
216    /// attempts beyond the configured hard cap).
217    pub retry_decision: Result<RetryDecision, RetryDecisionError>,
218    /// Resource consumption reported by the handler for this attempt.
219    pub consumption: Vec<BudgetConsumption>,
220}
221
222/// Monotonic timer abstraction used for timeout enforcement.
223pub trait AttemptTimer {
224    /// Opaque mark captured before handler execution.
225    type Mark: Copy;
226
227    /// Captures a start mark.
228    fn start(&self) -> Self::Mark;
229
230    /// Returns elapsed duration since `mark`.
231    fn elapsed_since(&self, mark: Self::Mark) -> Duration;
232}
233
234/// System monotonic timer based on [`Instant`].
235#[derive(Debug, Clone, Copy, Default)]
236pub struct SystemAttemptTimer;
237
238impl AttemptTimer for SystemAttemptTimer {
239    type Mark = Instant;
240
241    fn start(&self) -> Self::Mark {
242        Instant::now()
243    }
244
245    fn elapsed_since(&self, mark: Self::Mark) -> Duration {
246        mark.elapsed()
247    }
248}
249
250/// Runs single-attempt executions by composing handler, timeout, and retry input derivation.
251#[derive(Debug, Clone)]
252pub struct AttemptRunner<H, T = SystemAttemptTimer> {
253    handler: H,
254    timer: T,
255    timeout_metrics: TimeoutCooperationMetrics,
256    timeout_cadence_policy: TimeoutCadencePolicy,
257}
258
259impl<H> AttemptRunner<H, SystemAttemptTimer>
260where
261    H: ExecutorHandler,
262{
263    /// Creates an attempt runner using the default system timer.
264    pub fn new(handler: H) -> Self {
265        Self {
266            handler,
267            timer: SystemAttemptTimer,
268            timeout_metrics: TimeoutCooperationMetrics::default(),
269            timeout_cadence_policy: TimeoutCadencePolicy::default(),
270        }
271    }
272}
273
274impl<H, T> AttemptRunner<H, T>
275where
276    H: ExecutorHandler,
277    T: AttemptTimer,
278{
279    /// Creates an attempt runner with an explicit timer implementation.
280    pub fn with_timer(handler: H, timer: T) -> Self {
281        Self {
282            handler,
283            timer,
284            timeout_metrics: TimeoutCooperationMetrics::default(),
285            timeout_cadence_policy: TimeoutCadencePolicy::default(),
286        }
287    }
288
289    /// Creates an attempt runner with explicit timer and timeout metric sink.
290    pub fn with_timer_and_metrics(
291        handler: H,
292        timer: T,
293        timeout_metrics: TimeoutCooperationMetrics,
294    ) -> Self {
295        Self {
296            handler,
297            timer,
298            timeout_metrics,
299            timeout_cadence_policy: TimeoutCadencePolicy::default(),
300        }
301    }
302
303    /// Creates an attempt runner with explicit timer, metric sink, and cadence policy.
304    pub fn with_timer_metrics_and_cadence_policy(
305        handler: H,
306        timer: T,
307        timeout_metrics: TimeoutCooperationMetrics,
308        timeout_cadence_policy: TimeoutCadencePolicy,
309    ) -> Self {
310        Self { handler, timer, timeout_metrics, timeout_cadence_policy }
311    }
312
313    /// Returns the timeout cooperation metric sink used by this runner.
314    pub fn timeout_metrics(&self) -> &TimeoutCooperationMetrics {
315        &self.timeout_metrics
316    }
317
318    /// Returns the timeout cadence policy used by this runner.
319    pub fn timeout_cadence_policy(&self) -> TimeoutCadencePolicy {
320        self.timeout_cadence_policy
321    }
322
323    /// Executes exactly one attempt and returns one terminal attempt outcome record.
324    pub fn run_attempt(&self, mut request: ExecutorRequest) -> AttemptOutcomeRecord {
325        let run_id = request.run_id;
326        let attempt_id = request.attempt_id;
327        let attempt_number = request.attempt_number;
328        let max_attempts = request.constraints.max_attempts();
329        let timeout_secs = request.constraints.timeout_secs();
330
331        let clock = AttemptTimeoutClock { timer: &self.timer };
332        let payload = request.payload;
333        let safety_level = request.constraints.safety_level();
334        let metadata = AttemptMetadata { max_attempts, attempt_number, timeout_secs, safety_level };
335        let submission = request.submission.take();
336        let children = request.children.take();
337        let external_ctx = request.cancellation_context.take();
338        let guard = TimeoutGuard::with_clock(clock);
339        let make_handler_call = |cancellation_context: &crate::handler::CancellationContext| {
340            self.handler.execute(crate::handler::ExecutorContext {
341                input: HandlerInput {
342                    run_id,
343                    attempt_id,
344                    payload,
345                    metadata,
346                    cancellation_context: cancellation_context.clone(),
347                },
348                submission,
349                children,
350            })
351        };
352        let guarded = if let Some(ctx) = external_ctx {
353            guard.execute_with_external_cancellation(timeout_secs, ctx, make_handler_call)
354        } else {
355            guard.execute_with_cancellation(timeout_secs, make_handler_call)
356        };
357        let crate::timeout::CancellableExecution {
358            value: handler_output,
359            elapsed,
360            timeout: timeout_classification,
361            cancel_requested,
362            cancellation_observed,
363            cancellation_observation_latency,
364            watchdog_joined,
365            watchdog_spawn_failed: _,
366        } = guarded;
367
368        let cooperation = classify_timeout_cooperation(
369            &timeout_classification,
370            cancellation_observed,
371            cancellation_observation_latency,
372            self.timeout_cadence_policy,
373        );
374        let timeout_enforcement = TimeoutEnforcementReport {
375            cancellation_requested: cancel_requested,
376            cancellation_observed,
377            cancellation_observation_latency,
378            cadence_threshold: self.timeout_cadence_policy.max_cancellation_poll_latency(),
379            watchdog_joined,
380            cooperation,
381        };
382        self.timeout_metrics.record(timeout_enforcement.cooperation);
383
384        let (response, consumption) = classify_response(handler_output, &timeout_classification);
385        let retry_decision_input = RetryDecisionInput {
386            run_id,
387            attempt_id,
388            attempt_number,
389            max_attempts,
390            outcome_kind: AttemptOutcomeKind::from_response(&response),
391        };
392        let retry_decision = decide_retry_transition(&retry_decision_input);
393
394        AttemptOutcomeRecord {
395            run_id,
396            attempt_id,
397            response,
398            elapsed,
399            timeout_classification,
400            timeout_enforcement,
401            retry_decision_input,
402            retry_decision,
403            consumption,
404        }
405    }
406}
407
408fn classify_response(
409    output: HandlerOutput,
410    timeout: &TimeoutClassification,
411) -> (ExecutorResponse, Vec<BudgetConsumption>) {
412    let consumption = output.consumption().to_vec();
413
414    // Timeout overrides all non-Suspended handler responses. A handler that
415    // explicitly suspends takes priority — budget-based suspension is voluntary.
416    if let TimeoutClassification::TimedOut(TimeoutFailure { timeout_secs, .. }) = timeout {
417        if !matches!(output, HandlerOutput::Suspended { .. }) {
418            return (ExecutorResponse::Timeout { timeout_secs: *timeout_secs }, consumption);
419        }
420    }
421
422    let response = match output {
423        HandlerOutput::Success { output, .. } => ExecutorResponse::Success { output },
424        HandlerOutput::RetryableFailure { error, .. } => {
425            ExecutorResponse::RetryableFailure { error }
426        }
427        HandlerOutput::TerminalFailure { error, .. } => ExecutorResponse::TerminalFailure { error },
428        HandlerOutput::Suspended { output, .. } => ExecutorResponse::Suspended { output },
429    };
430    (response, consumption)
431}
432
433fn classify_timeout_cooperation(
434    timeout_classification: &TimeoutClassification,
435    cancellation_observed: bool,
436    cancellation_observation_latency: Option<Duration>,
437    timeout_cadence_policy: TimeoutCadencePolicy,
438) -> TimeoutCooperation {
439    if !timeout_classification.is_timed_out() {
440        return TimeoutCooperation::NotApplicable;
441    }
442
443    if !cancellation_observed {
444        return TimeoutCooperation::NonCooperative;
445    }
446
447    match cancellation_observation_latency {
448        Some(latency) if latency > timeout_cadence_policy.max_cancellation_poll_latency() => {
449            TimeoutCooperation::CooperativeThresholdBreach
450        }
451        Some(_) => TimeoutCooperation::Cooperative,
452        None => TimeoutCooperation::NonCooperative,
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::sync::Mutex;
459    use std::time::Duration;
460
461    use actionqueue_core::ids::{AttemptId, RunId};
462    use actionqueue_core::task::constraints::TaskConstraints;
463
464    use super::{
465        AttemptOutcomeKind, AttemptRunner, AttemptTimer, TimeoutCadencePolicy, TimeoutCooperation,
466        TimeoutCooperationMetrics, TimeoutCooperationMetricsSnapshot,
467    };
468    use crate::handler::{ExecutorContext, ExecutorHandler, HandlerInput, HandlerOutput};
469    use crate::retry::RetryDecision;
470    use crate::timeout::{TimeoutClassification, TimeoutFailure, TimeoutReasonCode};
471    use crate::types::ExecutorRequest;
472
473    #[derive(Debug, Clone, Copy)]
474    struct FixedTimer {
475        elapsed: Duration,
476    }
477
478    impl AttemptTimer for FixedTimer {
479        type Mark = ();
480
481        fn start(&self) -> Self::Mark {}
482
483        fn elapsed_since(&self, _mark: Self::Mark) -> Duration {
484            self.elapsed
485        }
486    }
487
488    struct RecordingHandler {
489        output: HandlerOutput,
490        input: Mutex<Option<HandlerInput>>,
491    }
492
493    impl RecordingHandler {
494        fn new(output: HandlerOutput) -> Self {
495            Self { output, input: Mutex::new(None) }
496        }
497    }
498
499    impl ExecutorHandler for RecordingHandler {
500        fn execute(&self, ctx: ExecutorContext) -> HandlerOutput {
501            let input = ctx.input;
502            *self.input.lock().unwrap() = Some(input);
503            self.output.clone()
504        }
505    }
506
507    #[test]
508    fn run_attempt_propagates_ids_and_maps_success() {
509        let run_id = RunId::new();
510        let attempt_id = AttemptId::new();
511        let handler = RecordingHandler::new(HandlerOutput::Success {
512            output: Some(vec![1, 2, 3]),
513            consumption: vec![],
514        });
515        let runner =
516            AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_millis(5) });
517
518        let request = ExecutorRequest {
519            run_id,
520            attempt_id,
521            payload: vec![9, 8, 7],
522            constraints: TaskConstraints::new(3, Some(60), None)
523                .expect("test constraints must be valid"),
524            attempt_number: 2,
525            submission: None,
526            children: None,
527            cancellation_context: None,
528        };
529
530        let record = runner.run_attempt(request);
531
532        assert_eq!(record.run_id, run_id);
533        assert_eq!(record.attempt_id, attempt_id);
534        assert_eq!(
535            record.response,
536            crate::types::ExecutorResponse::Success { output: Some(vec![1, 2, 3]) }
537        );
538        assert_eq!(record.retry_decision, Ok(RetryDecision::Complete));
539        assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::Success);
540        assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NotApplicable);
541        assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
542        assert_eq!(
543            record.timeout_enforcement.cadence_threshold,
544            TimeoutCadencePolicy::default().max_cancellation_poll_latency()
545        );
546        assert_eq!(
547            record.timeout_classification,
548            TimeoutClassification::CompletedInTime {
549                timeout_secs: 60,
550                elapsed: Duration::from_millis(5),
551                reason_code: TimeoutReasonCode::WithinLimit,
552            }
553        );
554
555        let captured =
556            runner.handler.input.lock().unwrap().clone().expect("handler should capture input");
557
558        assert_eq!(captured.run_id, run_id);
559        assert_eq!(captured.attempt_id, attempt_id);
560        assert_eq!(captured.metadata.max_attempts, 3);
561        assert_eq!(captured.metadata.attempt_number, 2);
562        assert_eq!(captured.metadata.timeout_secs, Some(60));
563    }
564
565    #[test]
566    fn run_attempt_marks_timeout_when_elapsed_exceeds_limit() {
567        let handler = RecordingHandler::new(HandlerOutput::Success {
568            output: Some(vec![42]),
569            consumption: vec![],
570        });
571        let runner =
572            AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_secs(2) });
573
574        let request = ExecutorRequest {
575            run_id: RunId::new(),
576            attempt_id: AttemptId::new(),
577            payload: vec![],
578            constraints: TaskConstraints::new(2, Some(1), None)
579                .expect("test constraints must be valid"),
580            attempt_number: 1,
581            submission: None,
582            children: None,
583            cancellation_context: None,
584        };
585
586        let record = runner.run_attempt(request);
587
588        assert_eq!(record.response, crate::types::ExecutorResponse::Timeout { timeout_secs: 1 });
589        assert_eq!(record.retry_decision, Ok(RetryDecision::Retry));
590        assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::Timeout);
591        assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NonCooperative);
592        assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
593        assert_eq!(
594            record.timeout_enforcement.cadence_threshold,
595            TimeoutCadencePolicy::default().max_cancellation_poll_latency()
596        );
597        assert_eq!(
598            record.timeout_classification,
599            TimeoutClassification::TimedOut(crate::timeout::TimeoutFailure {
600                timeout_secs: 1,
601                elapsed: Duration::from_secs(2),
602                reason_code: TimeoutReasonCode::DeadlineExceeded,
603            })
604        );
605    }
606
607    #[test]
608    fn run_attempt_preserves_retryable_failure_without_timeout() {
609        let handler = RecordingHandler::new(HandlerOutput::RetryableFailure {
610            error: "transient error".to_string(),
611            consumption: vec![],
612        });
613        let runner =
614            AttemptRunner::with_timer(handler, FixedTimer { elapsed: Duration::from_millis(1) });
615
616        let request = ExecutorRequest {
617            run_id: RunId::new(),
618            attempt_id: AttemptId::new(),
619            payload: vec![],
620            constraints: TaskConstraints::new(5, Some(30), None)
621                .expect("test constraints must be valid"),
622            attempt_number: 3,
623            submission: None,
624            children: None,
625            cancellation_context: None,
626        };
627
628        let record = runner.run_attempt(request);
629
630        assert_eq!(
631            record.response,
632            crate::types::ExecutorResponse::RetryableFailure {
633                error: "transient error".to_string(),
634            }
635        );
636        assert_eq!(record.retry_decision, Ok(RetryDecision::Retry));
637        assert_eq!(record.retry_decision_input.outcome_kind, AttemptOutcomeKind::RetryableFailure);
638        assert_eq!(record.retry_decision_input.attempt_number, 3);
639        assert_eq!(record.retry_decision_input.max_attempts, 5);
640        assert_eq!(record.timeout_enforcement.cooperation, TimeoutCooperation::NotApplicable);
641        assert_eq!(record.timeout_enforcement.cancellation_observation_latency, None);
642    }
643
644    #[test]
645    fn run_attempt_emits_timeout_cooperation_metric_once() {
646        let metrics = TimeoutCooperationMetrics::default();
647        let runner = AttemptRunner::with_timer_and_metrics(
648            RecordingHandler::new(HandlerOutput::Success {
649                output: Some(vec![7]),
650                consumption: vec![],
651            }),
652            FixedTimer { elapsed: Duration::from_secs(3) },
653            metrics.clone(),
654        );
655
656        let request = ExecutorRequest {
657            run_id: RunId::new(),
658            attempt_id: AttemptId::new(),
659            payload: vec![],
660            constraints: TaskConstraints::new(2, Some(1), None)
661                .expect("test constraints must be valid"),
662            attempt_number: 1,
663            submission: None,
664            children: None,
665            cancellation_context: None,
666        };
667
668        let _record = runner.run_attempt(request);
669
670        assert_eq!(
671            metrics.snapshot(),
672            TimeoutCooperationMetricsSnapshot {
673                cooperative: 0,
674                cooperative_threshold_breach: 0,
675                non_cooperative: 1,
676                not_applicable: 0,
677            }
678        );
679    }
680
681    #[test]
682    fn timeout_cooperation_marks_threshold_breach_when_observation_latency_exceeds_policy() {
683        let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
684            timeout_secs: 1,
685            elapsed: Duration::from_secs(2),
686            reason_code: TimeoutReasonCode::DeadlineExceeded,
687        });
688
689        let cooperation = super::classify_timeout_cooperation(
690            &timeout,
691            true,
692            Some(Duration::from_millis(15)),
693            TimeoutCadencePolicy::new(Duration::from_millis(5)),
694        );
695
696        assert_eq!(cooperation, TimeoutCooperation::CooperativeThresholdBreach);
697    }
698
699    #[test]
700    fn timeout_cooperation_marks_cooperative_when_observation_latency_is_within_policy() {
701        let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
702            timeout_secs: 1,
703            elapsed: Duration::from_secs(2),
704            reason_code: TimeoutReasonCode::DeadlineExceeded,
705        });
706
707        let cooperation = super::classify_timeout_cooperation(
708            &timeout,
709            true,
710            Some(Duration::from_millis(5)),
711            TimeoutCadencePolicy::new(Duration::from_millis(10)),
712        );
713
714        assert_eq!(cooperation, TimeoutCooperation::Cooperative);
715    }
716
717    #[test]
718    fn classify_response_suspended_takes_priority_over_timeout() {
719        let output = HandlerOutput::Suspended { output: Some(vec![1, 2, 3]), consumption: vec![] };
720        let timeout = TimeoutClassification::TimedOut(TimeoutFailure {
721            timeout_secs: 10,
722            elapsed: Duration::from_secs(15),
723            reason_code: TimeoutReasonCode::DeadlineExceeded,
724        });
725        let (response, _) = super::classify_response(output, &timeout);
726        assert!(
727            matches!(response, crate::types::ExecutorResponse::Suspended { .. }),
728            "Suspended should take priority over Timeout, got: {response:?}"
729        );
730    }
731}