Skip to main content

aura_core/time/
timeout.rs

1//! Local timeout and backoff policy for owner-controlled deadlines.
2//!
3//! Aura treats wall clock as a local choice. This module uses physical time for
4//! local budgeting and retry policy, while keeping semantic ordering concerns in
5//! logical, order, or provenanced time domains.
6
7use super::{PhysicalTime, TimeDomain};
8use crate::{
9    effects::{BackoffStrategy, JitterMode, PhysicalTimeEffects, RetryPolicy, TimeError},
10    AuraError, ProtocolErrorCode,
11};
12use futures::{future::Either, pin_mut};
13use serde::{Deserialize, Serialize};
14use std::fmt;
15use std::future::Future;
16use std::time::Duration;
17
18/// Typed result for local timeout-budget policy.
19pub type TimeoutBudgetResult<T> = Result<T, TimeoutBudgetError>;
20
21/// Explicit mapping between timeout policy and Aura time semantics.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum TimeoutTimeSemantics {
24    /// Physical time drives local timeout budgets and retry delays.
25    LocalPhysicalBudget,
26    /// Logical time remains for causal/semantic ordering, not wall-clock timeouts.
27    LogicalSemanticOrdering,
28    /// Order time remains for privacy-preserving semantic ordering.
29    OrderSemanticOrdering,
30    /// Provenanced time remains for attested/consensus-backed semantic claims.
31    ProvenancedSemanticOrdering,
32}
33
34impl TimeoutTimeSemantics {
35    pub fn as_str(&self) -> &'static str {
36        match self {
37            Self::LocalPhysicalBudget => "local_physical_budget",
38            Self::LogicalSemanticOrdering => "logical_semantic_ordering",
39            Self::OrderSemanticOrdering => "order_semantic_ordering",
40            Self::ProvenancedSemanticOrdering => "provenanced_semantic_ordering",
41        }
42    }
43
44    pub fn local_time_domain(&self) -> Option<TimeDomain> {
45        match self {
46            Self::LocalPhysicalBudget => Some(TimeDomain::PhysicalClock),
47            Self::LogicalSemanticOrdering => Some(TimeDomain::LogicalClock),
48            Self::OrderSemanticOrdering => Some(TimeDomain::OrderClock),
49            Self::ProvenancedSemanticOrdering => None,
50        }
51    }
52
53    pub fn is_local_budget_domain(&self) -> bool {
54        matches!(self, Self::LocalPhysicalBudget)
55    }
56}
57
58/// Shared execution classes for timeout-policy scaling.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60pub enum TimeoutExecutionClass {
61    Production,
62    SimulationTest,
63    Harness,
64}
65
66impl TimeoutExecutionClass {
67    pub fn as_str(&self) -> &'static str {
68        match self {
69            Self::Production => "production",
70            Self::SimulationTest => "simulation_test",
71            Self::Harness => "harness",
72        }
73    }
74}
75
76/// Shared profile for scaling timeout and backoff policy by execution lane.
77///
78/// The semantic model stays the same across environments; only scale and
79/// deterministic jitter policy vary.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81pub struct TimeoutExecutionProfile {
82    class: TimeoutExecutionClass,
83    scale_percent: u32,
84    jitter: JitterMode,
85}
86
87impl TimeoutExecutionProfile {
88    pub fn new(
89        class: TimeoutExecutionClass,
90        scale_percent: u32,
91        jitter: JitterMode,
92    ) -> TimeoutBudgetResult<Self> {
93        if scale_percent == 0 {
94            return Err(TimeoutBudgetError::invalid_policy(
95                "timeout scale_percent must be greater than zero",
96            ));
97        }
98        Ok(Self {
99            class,
100            scale_percent,
101            jitter,
102        })
103    }
104
105    pub fn production() -> Self {
106        Self {
107            class: TimeoutExecutionClass::Production,
108            scale_percent: 100,
109            jitter: JitterMode::Deterministic,
110        }
111    }
112
113    pub fn simulation_test() -> Self {
114        Self {
115            class: TimeoutExecutionClass::SimulationTest,
116            scale_percent: 10,
117            jitter: JitterMode::None,
118        }
119    }
120
121    pub fn harness() -> Self {
122        Self {
123            class: TimeoutExecutionClass::Harness,
124            scale_percent: 25,
125            jitter: JitterMode::None,
126        }
127    }
128
129    pub fn class(&self) -> TimeoutExecutionClass {
130        self.class
131    }
132
133    pub fn scale_percent(&self) -> u32 {
134        self.scale_percent
135    }
136
137    pub fn jitter(&self) -> JitterMode {
138        self.jitter
139    }
140
141    pub fn scale_duration(&self, duration: Duration) -> TimeoutBudgetResult<Duration> {
142        let millis = duration_to_ms(duration)?;
143        let scaled = millis
144            .checked_mul(u64::from(self.scale_percent))
145            .ok_or_else(|| TimeoutBudgetError::invalid_policy("scaled timeout overflow"))?
146            / 100;
147        Ok(Duration::from_millis(scaled.max(1)))
148    }
149
150    pub fn apply_backoff(
151        &self,
152        backoff: &ExponentialBackoffPolicy,
153    ) -> TimeoutBudgetResult<ExponentialBackoffPolicy> {
154        ExponentialBackoffPolicy::new(
155            self.scale_duration(backoff.initial_delay())?,
156            self.scale_duration(backoff.max_delay())?,
157            self.jitter,
158        )
159    }
160
161    pub fn apply_retry_policy(
162        &self,
163        policy: &RetryBudgetPolicy,
164    ) -> TimeoutBudgetResult<RetryBudgetPolicy> {
165        let mut scaled =
166            RetryBudgetPolicy::new(policy.max_attempts(), self.apply_backoff(policy.backoff())?);
167        if let Some(timeout) = policy.per_attempt_timeout() {
168            scaled = scaled.with_per_attempt_timeout(self.scale_duration(timeout)?);
169        }
170        Ok(scaled)
171    }
172}
173
174/// Typed timeout/backoff failures for local owner policy.
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
176pub enum TimeoutBudgetError {
177    #[error("invalid timeout policy: {detail}")]
178    InvalidPolicy { detail: String },
179    #[error("time source unavailable: {detail}")]
180    TimeSourceUnavailable { detail: String },
181    #[error("local timeout budget exhausted at {observed_at_ms}ms (deadline {deadline_at_ms}ms)")]
182    DeadlineExceeded {
183        deadline_at_ms: u64,
184        observed_at_ms: u64,
185    },
186    #[error("retry attempt budget exhausted after {attempts_used} attempts (max {max_attempts})")]
187    AttemptBudgetExhausted {
188        max_attempts: u32,
189        attempts_used: u32,
190    },
191}
192
193impl TimeoutBudgetError {
194    pub fn invalid_policy(detail: impl Into<String>) -> Self {
195        Self::InvalidPolicy {
196            detail: detail.into(),
197        }
198    }
199
200    pub fn time_source_unavailable(detail: impl Into<String>) -> Self {
201        Self::TimeSourceUnavailable {
202            detail: detail.into(),
203        }
204    }
205
206    pub fn deadline_exceeded(deadline_at_ms: u64, observed_at_ms: u64) -> Self {
207        Self::DeadlineExceeded {
208            deadline_at_ms,
209            observed_at_ms,
210        }
211    }
212
213    pub fn attempt_budget_exhausted(max_attempts: u32, attempts_used: u32) -> Self {
214        Self::AttemptBudgetExhausted {
215            max_attempts,
216            attempts_used,
217        }
218    }
219}
220
221impl ProtocolErrorCode for TimeoutBudgetError {
222    fn code(&self) -> &'static str {
223        match self {
224            Self::InvalidPolicy { .. } => "invalid_timeout_policy",
225            Self::TimeSourceUnavailable { .. } => "time_source_unavailable",
226            Self::DeadlineExceeded { .. } => "deadline_exceeded",
227            Self::AttemptBudgetExhausted { .. } => "attempt_budget_exhausted",
228        }
229    }
230}
231
232impl From<TimeoutBudgetError> for AuraError {
233    fn from(value: TimeoutBudgetError) -> Self {
234        match value {
235            TimeoutBudgetError::InvalidPolicy { detail } => {
236                AuraError::invalid(format!("invalid_timeout_policy: {detail}"))
237            }
238            TimeoutBudgetError::TimeSourceUnavailable { detail } => {
239                AuraError::internal(format!("time_source_unavailable: {detail}"))
240            }
241            TimeoutBudgetError::DeadlineExceeded {
242                deadline_at_ms,
243                observed_at_ms,
244            } => AuraError::terminal(format!(
245                "deadline_exceeded: observed_at_ms={observed_at_ms} deadline_at_ms={deadline_at_ms}"
246            )),
247            TimeoutBudgetError::AttemptBudgetExhausted {
248                max_attempts,
249                attempts_used,
250            } => AuraError::terminal(format!(
251                "attempt_budget_exhausted: attempts_used={attempts_used} max_attempts={max_attempts}"
252            )),
253        }
254    }
255}
256
257/// Local operation deadline budget.
258///
259/// This uses physical time as a local owner choice for budgeting and timeout
260/// policy. It does not represent distributed semantic ordering.
261#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
262pub struct TimeoutBudget {
263    started_at_ms: u64,
264    deadline_at_ms: u64,
265}
266
267impl TimeoutBudget {
268    pub fn from_start_and_timeout(
269        started_at: &PhysicalTime,
270        timeout: Duration,
271    ) -> TimeoutBudgetResult<Self> {
272        let timeout_ms = duration_to_ms(timeout)?;
273        let deadline_at_ms = started_at
274            .ts_ms
275            .checked_add(timeout_ms)
276            .ok_or_else(|| TimeoutBudgetError::invalid_policy("timeout deadline overflow"))?;
277        Ok(Self {
278            started_at_ms: started_at.ts_ms,
279            deadline_at_ms,
280        })
281    }
282
283    pub fn started_at_ms(&self) -> u64 {
284        self.started_at_ms
285    }
286
287    pub fn deadline_at_ms(&self) -> u64 {
288        self.deadline_at_ms
289    }
290
291    pub fn timeout_ms(&self) -> u64 {
292        self.deadline_at_ms.saturating_sub(self.started_at_ms)
293    }
294
295    pub fn time_semantics(&self) -> TimeoutTimeSemantics {
296        TimeoutTimeSemantics::LocalPhysicalBudget
297    }
298
299    pub fn remaining_at(&self, now: &PhysicalTime) -> TimeoutBudgetResult<Duration> {
300        if now.ts_ms >= self.deadline_at_ms {
301            return Err(TimeoutBudgetError::deadline_exceeded(
302                self.deadline_at_ms,
303                now.ts_ms,
304            ));
305        }
306        Ok(Duration::from_millis(self.deadline_at_ms - now.ts_ms))
307    }
308
309    pub fn remaining_or_zero_at(&self, now: &PhysicalTime) -> Duration {
310        Duration::from_millis(self.deadline_at_ms.saturating_sub(now.ts_ms))
311    }
312
313    pub fn clamp_to_remaining(
314        &self,
315        now: &PhysicalTime,
316        requested: Duration,
317    ) -> TimeoutBudgetResult<Duration> {
318        let remaining = self.remaining_at(now)?;
319        Ok(remaining.min(requested))
320    }
321
322    pub fn child_budget(
323        &self,
324        now: &PhysicalTime,
325        requested: Duration,
326    ) -> TimeoutBudgetResult<Self> {
327        Self::from_start_and_timeout(now, self.clamp_to_remaining(now, requested)?)
328    }
329}
330
331/// Mutable retry-attempt budget.
332#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
333pub struct AttemptBudget {
334    max_attempts: u32,
335    attempts_used: u32,
336}
337
338impl AttemptBudget {
339    pub fn new(max_attempts: u32) -> Self {
340        Self {
341            max_attempts,
342            attempts_used: 0,
343        }
344    }
345
346    pub fn max_attempts(&self) -> u32 {
347        self.max_attempts
348    }
349
350    pub fn attempts_used(&self) -> u32 {
351        self.attempts_used
352    }
353
354    pub fn remaining_attempts(&self) -> u32 {
355        self.max_attempts.saturating_sub(self.attempts_used)
356    }
357
358    pub fn can_attempt(&self) -> bool {
359        self.attempts_used < self.max_attempts
360    }
361
362    pub fn record_attempt(&mut self) -> TimeoutBudgetResult<u32> {
363        if !self.can_attempt() {
364            return Err(TimeoutBudgetError::attempt_budget_exhausted(
365                self.max_attempts,
366                self.attempts_used,
367            ));
368        }
369        let attempt = self.attempts_used;
370        self.attempts_used = self
371            .attempts_used
372            .checked_add(1)
373            .ok_or_else(|| TimeoutBudgetError::invalid_policy("attempt counter overflow"))?;
374        Ok(attempt)
375    }
376}
377
378/// Bounded exponential backoff policy with explicit jitter handling.
379#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
380pub struct ExponentialBackoffPolicy {
381    initial_delay: Duration,
382    max_delay: Duration,
383    jitter: JitterMode,
384}
385
386impl ExponentialBackoffPolicy {
387    pub fn new(
388        initial_delay: Duration,
389        max_delay: Duration,
390        jitter: JitterMode,
391    ) -> TimeoutBudgetResult<Self> {
392        if initial_delay.is_zero() {
393            return Err(TimeoutBudgetError::invalid_policy(
394                "initial_delay must be greater than zero",
395            ));
396        }
397        if max_delay < initial_delay {
398            return Err(TimeoutBudgetError::invalid_policy(
399                "max_delay must be greater than or equal to initial_delay",
400            ));
401        }
402        Ok(Self {
403            initial_delay,
404            max_delay,
405            jitter,
406        })
407    }
408
409    pub fn initial_delay(&self) -> Duration {
410        self.initial_delay
411    }
412
413    pub fn max_delay(&self) -> Duration {
414        self.max_delay
415    }
416
417    pub fn jitter(&self) -> JitterMode {
418        self.jitter
419    }
420
421    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
422        let strategy = match self.jitter {
423            JitterMode::None => BackoffStrategy::Exponential,
424            JitterMode::Deterministic => BackoffStrategy::ExponentialWithJitter,
425        };
426        strategy.calculate_delay(attempt, self.initial_delay, self.max_delay)
427    }
428}
429
430/// Shared retry-policy vocabulary for local timeout budgeting.
431#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
432pub struct RetryBudgetPolicy {
433    max_attempts: u32,
434    per_attempt_timeout: Option<Duration>,
435    backoff: ExponentialBackoffPolicy,
436}
437
438impl RetryBudgetPolicy {
439    #[must_use]
440    pub fn new(max_attempts: u32, backoff: ExponentialBackoffPolicy) -> Self {
441        Self {
442            max_attempts,
443            per_attempt_timeout: None,
444            backoff,
445        }
446    }
447
448    #[must_use]
449    pub fn with_per_attempt_timeout(mut self, timeout: Duration) -> Self {
450        self.per_attempt_timeout = Some(timeout);
451        self
452    }
453
454    pub fn max_attempts(&self) -> u32 {
455        self.max_attempts
456    }
457
458    pub fn per_attempt_timeout(&self) -> Option<Duration> {
459        self.per_attempt_timeout
460    }
461
462    pub fn backoff(&self) -> &ExponentialBackoffPolicy {
463        &self.backoff
464    }
465
466    pub fn attempt_budget(&self) -> AttemptBudget {
467        AttemptBudget::new(self.max_attempts)
468    }
469
470    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
471        self.backoff.delay_for_attempt(attempt)
472    }
473
474    pub fn as_retry_policy(&self) -> RetryPolicy {
475        let mut policy = RetryPolicy::exponential()
476            .with_max_attempts(self.max_attempts)
477            .with_initial_delay(self.backoff.initial_delay())
478            .with_max_delay(self.backoff.max_delay())
479            .with_jitter(self.backoff.jitter());
480
481        if let Some(timeout) = self.per_attempt_timeout {
482            policy = policy.with_timeout(timeout);
483        }
484
485        policy
486    }
487}
488
489/// Typed result for an operation run under a timeout budget.
490#[derive(Debug, Clone, PartialEq, Eq)]
491pub enum TimeoutRunError<E> {
492    Timeout(TimeoutBudgetError),
493    Operation(E),
494}
495
496impl<E: fmt::Display> fmt::Display for TimeoutRunError<E> {
497    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498        match self {
499            Self::Timeout(error) => write!(f, "{error}"),
500            Self::Operation(error) => write!(f, "{error}"),
501        }
502    }
503}
504
505/// Typed result for an operation run under retry policy.
506#[derive(Debug, Clone, PartialEq, Eq)]
507pub enum RetryRunError<E> {
508    Timeout(TimeoutBudgetError),
509    AttemptsExhausted { attempts_used: u32, last_error: E },
510}
511
512impl<E: fmt::Display> fmt::Display for RetryRunError<E> {
513    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514        match self {
515            Self::Timeout(error) => write!(f, "{error}"),
516            Self::AttemptsExhausted {
517                attempts_used,
518                last_error,
519            } => write!(
520                f,
521                "retry attempts exhausted after {attempts_used} attempts: {last_error}"
522            ),
523        }
524    }
525}
526
527/// Run an async operation with a typed local timeout budget.
528pub async fn execute_with_timeout_budget<TTime, F, Fut, T, E>(
529    time: &TTime,
530    budget: &TimeoutBudget,
531    operation: F,
532) -> Result<T, TimeoutRunError<E>>
533where
534    TTime: PhysicalTimeEffects + Sync,
535    F: FnOnce() -> Fut,
536    Fut: Future<Output = Result<T, E>>,
537{
538    let now = current_physical_time(time)
539        .await
540        .map_err(TimeoutRunError::Timeout)?;
541    let remaining = budget
542        .remaining_at(&now)
543        .map_err(TimeoutRunError::Timeout)?;
544    let sleep_ms = duration_to_ms(remaining).map_err(TimeoutRunError::Timeout)?;
545
546    let operation_future = operation();
547    let sleep_future = time.sleep_ms(sleep_ms);
548    pin_mut!(operation_future);
549    pin_mut!(sleep_future);
550    match futures::future::select(operation_future, sleep_future).await {
551        Either::Left((result, _sleep_future)) => result.map_err(TimeoutRunError::Operation),
552        Either::Right((sleep, _operation_future)) => {
553            sleep.map_err(|error| TimeoutRunError::Timeout(time_error(error)))?;
554            let observed_at_ms = current_physical_time(time)
555                .await
556                .map(|time| time.ts_ms)
557                .unwrap_or(budget.deadline_at_ms());
558            Err(TimeoutRunError::Timeout(
559                TimeoutBudgetError::deadline_exceeded(budget.deadline_at_ms(), observed_at_ms),
560            ))
561        }
562    }
563}
564
565/// Run an async operation with typed retry and optional per-attempt timeout policy.
566pub async fn execute_with_retry_budget<TTime, F, Fut, T, E>(
567    time: &TTime,
568    policy: &RetryBudgetPolicy,
569    mut operation: F,
570) -> Result<T, RetryRunError<E>>
571where
572    TTime: PhysicalTimeEffects + Sync,
573    F: FnMut(u32) -> Fut,
574    Fut: Future<Output = Result<T, E>>,
575{
576    let mut attempts = policy.attempt_budget();
577
578    loop {
579        let attempt = attempts.record_attempt().map_err(RetryRunError::Timeout)?;
580
581        let result = if let Some(timeout) = policy.per_attempt_timeout() {
582            let now = current_physical_time(time)
583                .await
584                .map_err(RetryRunError::Timeout)?;
585            let budget = TimeoutBudget::from_start_and_timeout(&now, timeout)
586                .map_err(RetryRunError::Timeout)?;
587            execute_with_timeout_budget(time, &budget, || operation(attempt)).await
588        } else {
589            operation(attempt).await.map_err(TimeoutRunError::Operation)
590        };
591
592        match result {
593            Ok(value) => return Ok(value),
594            Err(TimeoutRunError::Timeout(error)) => return Err(RetryRunError::Timeout(error)),
595            Err(TimeoutRunError::Operation(error)) => {
596                if !attempts.can_attempt() {
597                    return Err(RetryRunError::AttemptsExhausted {
598                        attempts_used: attempts.attempts_used(),
599                        last_error: error,
600                    });
601                }
602
603                let delay_ms = duration_to_ms(policy.delay_for_attempt(attempt))
604                    .map_err(RetryRunError::Timeout)?;
605                time.sleep_ms(delay_ms)
606                    .await
607                    .map_err(|error| RetryRunError::Timeout(time_error(error)))?;
608            }
609        }
610    }
611}
612
613fn duration_to_ms(duration: Duration) -> TimeoutBudgetResult<u64> {
614    u64::try_from(duration.as_millis()).map_err(|_| {
615        TimeoutBudgetError::invalid_policy("duration does not fit in u64 milliseconds")
616    })
617}
618
619async fn current_physical_time<TTime: PhysicalTimeEffects + Sync>(
620    time: &TTime,
621) -> TimeoutBudgetResult<PhysicalTime> {
622    time.physical_time().await.map_err(time_error)
623}
624
625fn time_error(error: TimeError) -> TimeoutBudgetError {
626    TimeoutBudgetError::time_source_unavailable(error.to_string())
627}
628
629#[cfg(test)]
630#[allow(clippy::disallowed_types, clippy::expect_used, clippy::redundant_clone)]
631mod tests {
632    use super::{
633        execute_with_retry_budget, execute_with_timeout_budget, AttemptBudget,
634        ExponentialBackoffPolicy, RetryBudgetPolicy, RetryRunError, TimeoutBudget,
635        TimeoutBudgetError, TimeoutExecutionClass, TimeoutExecutionProfile, TimeoutRunError,
636        TimeoutTimeSemantics,
637    };
638    use crate::{
639        effects::{JitterMode, PhysicalTimeEffects, TimeError},
640        time::{PhysicalTime, TimeDomain},
641        ProtocolErrorCode,
642    };
643    use parking_lot::Mutex;
644    use std::time::Duration;
645    use std::{collections::VecDeque, sync::Arc};
646
647    fn physical_time(ts_ms: u64) -> PhysicalTime {
648        PhysicalTime {
649            ts_ms,
650            uncertainty: None,
651        }
652    }
653
654    #[derive(Debug, Clone, Copy)]
655    enum SleepBehavior {
656        Immediate,
657        YieldOnce,
658    }
659
660    #[derive(Clone)]
661    struct ScriptedTimeEffects {
662        times: Arc<Mutex<VecDeque<PhysicalTime>>>,
663        sleeps: Arc<Mutex<Vec<u64>>>,
664        sleep_behavior: SleepBehavior,
665    }
666
667    impl ScriptedTimeEffects {
668        fn new(
669            times: impl IntoIterator<Item = PhysicalTime>,
670            sleep_behavior: SleepBehavior,
671        ) -> Self {
672            Self {
673                times: Arc::new(Mutex::new(times.into_iter().collect())),
674                sleeps: Arc::new(Mutex::new(Vec::new())),
675                sleep_behavior,
676            }
677        }
678
679        fn sleep_calls(&self) -> Vec<u64> {
680            self.sleeps.lock().clone()
681        }
682    }
683
684    #[async_trait::async_trait]
685    impl PhysicalTimeEffects for ScriptedTimeEffects {
686        async fn physical_time(&self) -> Result<PhysicalTime, TimeError> {
687            self.times
688                .lock()
689                .pop_front()
690                .ok_or(TimeError::ServiceUnavailable)
691        }
692
693        async fn sleep_ms(&self, ms: u64) -> Result<(), TimeError> {
694            self.sleeps.lock().push(ms);
695            match self.sleep_behavior {
696                SleepBehavior::Immediate => Ok(()),
697                SleepBehavior::YieldOnce => {
698                    tokio::time::sleep(Duration::from_millis(1)).await;
699                    Ok(())
700                }
701            }
702        }
703    }
704
705    #[test]
706    fn timeout_budget_tracks_remaining_and_child_budget() {
707        let budget =
708            TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
709                .expect("budget");
710
711        assert_eq!(budget.started_at_ms(), 1_000);
712        assert_eq!(budget.deadline_at_ms(), 6_000);
713        assert_eq!(budget.timeout_ms(), 5_000);
714        assert_eq!(
715            budget.time_semantics(),
716            TimeoutTimeSemantics::LocalPhysicalBudget
717        );
718        assert_eq!(
719            budget
720                .remaining_at(&physical_time(2_500))
721                .expect("remaining"),
722            Duration::from_millis(3_500)
723        );
724        assert_eq!(
725            budget
726                .clamp_to_remaining(&physical_time(2_500), Duration::from_secs(10))
727                .expect("clamped"),
728            Duration::from_millis(3_500)
729        );
730
731        let child = budget
732            .child_budget(&physical_time(2_500), Duration::from_secs(2))
733            .expect("child");
734        assert_eq!(child.started_at_ms(), 2_500);
735        assert_eq!(child.deadline_at_ms(), 4_500);
736    }
737
738    #[test]
739    fn timeout_budget_expires_with_typed_failure() {
740        let budget =
741            TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
742                .expect("budget");
743
744        let error = budget
745            .remaining_at(&physical_time(6_500))
746            .expect_err("expired");
747        assert_eq!(error.code(), "deadline_exceeded");
748        assert!(matches!(
749            error,
750            TimeoutBudgetError::DeadlineExceeded {
751                deadline_at_ms: 6_000,
752                observed_at_ms: 6_500,
753            }
754        ));
755        assert_eq!(
756            budget.remaining_or_zero_at(&physical_time(6_500)),
757            Duration::ZERO
758        );
759    }
760
761    #[test]
762    fn attempt_budget_enforces_max_attempts() {
763        let mut budget = AttemptBudget::new(2);
764        assert_eq!(budget.remaining_attempts(), 2);
765        assert_eq!(budget.record_attempt().expect("attempt 0"), 0);
766        assert_eq!(budget.record_attempt().expect("attempt 1"), 1);
767        assert_eq!(budget.remaining_attempts(), 0);
768
769        let error = budget.record_attempt().expect_err("exhausted");
770        assert_eq!(error.code(), "attempt_budget_exhausted");
771        assert!(matches!(
772            error,
773            TimeoutBudgetError::AttemptBudgetExhausted {
774                max_attempts: 2,
775                attempts_used: 2,
776            }
777        ));
778    }
779
780    #[test]
781    fn exponential_backoff_is_bounded_and_round_trips_to_retry_policy() {
782        let backoff = ExponentialBackoffPolicy::new(
783            Duration::from_millis(100),
784            Duration::from_secs(1),
785            JitterMode::Deterministic,
786        )
787        .expect("backoff");
788        let policy = RetryBudgetPolicy::new(4, backoff.clone())
789            .with_per_attempt_timeout(Duration::from_millis(750));
790
791        assert_eq!(backoff.delay_for_attempt(0), Duration::from_millis(100));
792        assert!(backoff.delay_for_attempt(4) <= Duration::from_secs(1));
793
794        let retry_policy = policy.as_retry_policy();
795        assert_eq!(retry_policy.max_attempts, 4);
796        assert_eq!(retry_policy.timeout, Some(Duration::from_millis(750)));
797        assert_eq!(
798            retry_policy.calculate_delay(3),
799            backoff.delay_for_attempt(3)
800        );
801    }
802
803    #[test]
804    fn timeout_time_semantics_preserve_domain_split() {
805        assert_eq!(
806            TimeoutTimeSemantics::LocalPhysicalBudget.local_time_domain(),
807            Some(TimeDomain::PhysicalClock)
808        );
809        assert_eq!(
810            TimeoutTimeSemantics::LogicalSemanticOrdering.local_time_domain(),
811            Some(TimeDomain::LogicalClock)
812        );
813        assert_eq!(
814            TimeoutTimeSemantics::OrderSemanticOrdering.local_time_domain(),
815            Some(TimeDomain::OrderClock)
816        );
817        assert_eq!(
818            TimeoutTimeSemantics::ProvenancedSemanticOrdering.local_time_domain(),
819            None
820        );
821        assert!(TimeoutTimeSemantics::LocalPhysicalBudget.is_local_budget_domain());
822        assert!(!TimeoutTimeSemantics::LogicalSemanticOrdering.is_local_budget_domain());
823    }
824
825    #[test]
826    fn timeout_execution_profiles_scale_policy_by_environment() {
827        let production = TimeoutExecutionProfile::production();
828        let simulation = TimeoutExecutionProfile::simulation_test();
829        let harness = TimeoutExecutionProfile::harness();
830        let base_backoff = ExponentialBackoffPolicy::new(
831            Duration::from_secs(2),
832            Duration::from_secs(10),
833            JitterMode::Deterministic,
834        )
835        .expect("backoff");
836        let base_retry = RetryBudgetPolicy::new(5, base_backoff.clone())
837            .with_per_attempt_timeout(Duration::from_secs(8));
838
839        assert_eq!(production.class(), TimeoutExecutionClass::Production);
840        assert_eq!(
841            production
842                .scale_duration(Duration::from_secs(4))
843                .expect("scaled"),
844            Duration::from_secs(4)
845        );
846        assert_eq!(simulation.jitter(), JitterMode::None);
847        assert_eq!(harness.scale_percent(), 25);
848
849        let scaled = harness
850            .apply_retry_policy(&base_retry)
851            .expect("scaled policy");
852        assert_eq!(scaled.max_attempts(), 5);
853        assert_eq!(scaled.per_attempt_timeout(), Some(Duration::from_secs(2)));
854        assert_eq!(scaled.backoff().initial_delay(), Duration::from_millis(500));
855        assert_eq!(scaled.backoff().max_delay(), Duration::from_millis(2_500));
856        assert_eq!(scaled.backoff().jitter(), JitterMode::None);
857    }
858
859    #[test]
860    fn execution_profile_scaling_preserves_local_success_and_failure_relations() {
861        let profiles = [
862            TimeoutExecutionProfile::production(),
863            TimeoutExecutionProfile::simulation_test(),
864            TimeoutExecutionProfile::harness(),
865        ];
866
867        let base_timeout = Duration::from_secs(4);
868        let base_success_latency = Duration::from_millis(1_500);
869        let base_failure_latency = Duration::from_secs(6);
870
871        assert!(base_success_latency <= base_timeout);
872        assert!(base_failure_latency > base_timeout);
873
874        for profile in profiles {
875            let scaled_timeout = profile
876                .scale_duration(base_timeout)
877                .expect("scaled timeout");
878            let scaled_success = profile
879                .scale_duration(base_success_latency)
880                .expect("scaled success latency");
881            let scaled_failure = profile
882                .scale_duration(base_failure_latency)
883                .expect("scaled failure latency");
884
885            assert!(
886                scaled_success <= scaled_timeout,
887                "profile {:?} changed a local success relation into failure",
888                profile.class()
889            );
890            assert!(
891                scaled_failure > scaled_timeout,
892                "profile {:?} changed a local failure relation into success",
893                profile.class()
894            );
895        }
896    }
897
898    #[tokio::test]
899    async fn timeout_wrapper_returns_typed_deadline_error() {
900        let effects = ScriptedTimeEffects::new(
901            [physical_time(1_000), physical_time(6_200)],
902            SleepBehavior::Immediate,
903        );
904        let budget =
905            TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
906                .expect("budget");
907
908        let error = execute_with_timeout_budget(&effects, &budget, || async {
909            futures::future::pending::<Result<(), &'static str>>().await
910        })
911        .await
912        .expect_err("timed out");
913
914        assert!(matches!(
915            error,
916            TimeoutRunError::Timeout(TimeoutBudgetError::DeadlineExceeded {
917                deadline_at_ms: 6_000,
918                observed_at_ms: 6_200,
919            })
920        ));
921        assert_eq!(effects.sleep_calls(), vec![5_000]);
922    }
923
924    #[tokio::test]
925    async fn timeout_wrapper_preserves_remaining_child_budget() {
926        let parent =
927            TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
928                .expect("parent");
929        let child = parent
930            .child_budget(&physical_time(2_500), Duration::from_secs(10))
931            .expect("child");
932        let effects = ScriptedTimeEffects::new(
933            [physical_time(2_500), physical_time(6_100)],
934            SleepBehavior::Immediate,
935        );
936
937        let error = execute_with_timeout_budget(&effects, &child, || async {
938            futures::future::pending::<Result<(), &'static str>>().await
939        })
940        .await
941        .expect_err("timed out");
942
943        assert!(matches!(
944            error,
945            TimeoutRunError::Timeout(TimeoutBudgetError::DeadlineExceeded {
946                deadline_at_ms: 6_000,
947                observed_at_ms: 6_100,
948            })
949        ));
950        assert_eq!(effects.sleep_calls(), vec![3_500]);
951    }
952
953    #[tokio::test]
954    async fn retry_wrapper_retries_with_typed_backoff_policy() {
955        let effects = ScriptedTimeEffects::new([], SleepBehavior::YieldOnce);
956        let policy = RetryBudgetPolicy::new(
957            3,
958            ExponentialBackoffPolicy::new(
959                Duration::from_millis(100),
960                Duration::from_secs(1),
961                JitterMode::None,
962            )
963            .expect("backoff"),
964        );
965        let attempts = Arc::new(Mutex::new(Vec::new()));
966
967        let result = execute_with_retry_budget(&effects, &policy, {
968            let attempts = Arc::clone(&attempts);
969            move |attempt| {
970                let attempts = Arc::clone(&attempts);
971                async move {
972                    attempts.lock().push(attempt);
973                    if attempt < 2 {
974                        Err("retryable failure")
975                    } else {
976                        Ok("done")
977                    }
978                }
979            }
980        })
981        .await
982        .expect("eventual success");
983
984        assert_eq!(result, "done");
985        assert_eq!(*attempts.lock(), vec![0, 1, 2]);
986        assert_eq!(effects.sleep_calls(), vec![100, 200]);
987    }
988
989    #[tokio::test]
990    async fn retry_wrapper_surfaces_typed_attempt_exhaustion() {
991        let effects = ScriptedTimeEffects::new([], SleepBehavior::YieldOnce);
992        let policy = RetryBudgetPolicy::new(
993            2,
994            ExponentialBackoffPolicy::new(
995                Duration::from_millis(50),
996                Duration::from_millis(200),
997                JitterMode::None,
998            )
999            .expect("backoff"),
1000        );
1001
1002        let error = execute_with_retry_budget(&effects, &policy, |_attempt| async {
1003            Err::<(), _>("still failing")
1004        })
1005        .await
1006        .expect_err("exhausted");
1007
1008        assert!(matches!(
1009            error,
1010            RetryRunError::AttemptsExhausted {
1011                attempts_used: 2,
1012                last_error: "still failing",
1013            }
1014        ));
1015        assert_eq!(effects.sleep_calls(), vec![50]);
1016    }
1017}