1use super::{PhysicalTime, TimeDomain};
8use crate::{
9 effects::{BackoffStrategy, JitterMode, PhysicalTimeEffects, RetryPolicy, TimeError},
10 AuraError, ProtocolErrorCode,
11};
12use futures::{future::Either, pin_mut};
13use serde::{Deserialize, Serialize};
14use std::fmt;
15use std::future::Future;
16use std::time::Duration;
17
18pub type TimeoutBudgetResult<T> = Result<T, TimeoutBudgetError>;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum TimeoutTimeSemantics {
24 LocalPhysicalBudget,
26 LogicalSemanticOrdering,
28 OrderSemanticOrdering,
30 ProvenancedSemanticOrdering,
32}
33
34impl TimeoutTimeSemantics {
35 pub fn as_str(&self) -> &'static str {
36 match self {
37 Self::LocalPhysicalBudget => "local_physical_budget",
38 Self::LogicalSemanticOrdering => "logical_semantic_ordering",
39 Self::OrderSemanticOrdering => "order_semantic_ordering",
40 Self::ProvenancedSemanticOrdering => "provenanced_semantic_ordering",
41 }
42 }
43
44 pub fn local_time_domain(&self) -> Option<TimeDomain> {
45 match self {
46 Self::LocalPhysicalBudget => Some(TimeDomain::PhysicalClock),
47 Self::LogicalSemanticOrdering => Some(TimeDomain::LogicalClock),
48 Self::OrderSemanticOrdering => Some(TimeDomain::OrderClock),
49 Self::ProvenancedSemanticOrdering => None,
50 }
51 }
52
53 pub fn is_local_budget_domain(&self) -> bool {
54 matches!(self, Self::LocalPhysicalBudget)
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60pub enum TimeoutExecutionClass {
61 Production,
62 SimulationTest,
63 Harness,
64}
65
66impl TimeoutExecutionClass {
67 pub fn as_str(&self) -> &'static str {
68 match self {
69 Self::Production => "production",
70 Self::SimulationTest => "simulation_test",
71 Self::Harness => "harness",
72 }
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81pub struct TimeoutExecutionProfile {
82 class: TimeoutExecutionClass,
83 scale_percent: u32,
84 jitter: JitterMode,
85}
86
87impl TimeoutExecutionProfile {
88 pub fn new(
89 class: TimeoutExecutionClass,
90 scale_percent: u32,
91 jitter: JitterMode,
92 ) -> TimeoutBudgetResult<Self> {
93 if scale_percent == 0 {
94 return Err(TimeoutBudgetError::invalid_policy(
95 "timeout scale_percent must be greater than zero",
96 ));
97 }
98 Ok(Self {
99 class,
100 scale_percent,
101 jitter,
102 })
103 }
104
105 pub fn production() -> Self {
106 Self {
107 class: TimeoutExecutionClass::Production,
108 scale_percent: 100,
109 jitter: JitterMode::Deterministic,
110 }
111 }
112
113 pub fn simulation_test() -> Self {
114 Self {
115 class: TimeoutExecutionClass::SimulationTest,
116 scale_percent: 10,
117 jitter: JitterMode::None,
118 }
119 }
120
121 pub fn harness() -> Self {
122 Self {
123 class: TimeoutExecutionClass::Harness,
124 scale_percent: 25,
125 jitter: JitterMode::None,
126 }
127 }
128
129 pub fn class(&self) -> TimeoutExecutionClass {
130 self.class
131 }
132
133 pub fn scale_percent(&self) -> u32 {
134 self.scale_percent
135 }
136
137 pub fn jitter(&self) -> JitterMode {
138 self.jitter
139 }
140
141 pub fn scale_duration(&self, duration: Duration) -> TimeoutBudgetResult<Duration> {
142 let millis = duration_to_ms(duration)?;
143 let scaled = millis
144 .checked_mul(u64::from(self.scale_percent))
145 .ok_or_else(|| TimeoutBudgetError::invalid_policy("scaled timeout overflow"))?
146 / 100;
147 Ok(Duration::from_millis(scaled.max(1)))
148 }
149
150 pub fn apply_backoff(
151 &self,
152 backoff: &ExponentialBackoffPolicy,
153 ) -> TimeoutBudgetResult<ExponentialBackoffPolicy> {
154 ExponentialBackoffPolicy::new(
155 self.scale_duration(backoff.initial_delay())?,
156 self.scale_duration(backoff.max_delay())?,
157 self.jitter,
158 )
159 }
160
161 pub fn apply_retry_policy(
162 &self,
163 policy: &RetryBudgetPolicy,
164 ) -> TimeoutBudgetResult<RetryBudgetPolicy> {
165 let mut scaled =
166 RetryBudgetPolicy::new(policy.max_attempts(), self.apply_backoff(policy.backoff())?);
167 if let Some(timeout) = policy.per_attempt_timeout() {
168 scaled = scaled.with_per_attempt_timeout(self.scale_duration(timeout)?);
169 }
170 Ok(scaled)
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
176pub enum TimeoutBudgetError {
177 #[error("invalid timeout policy: {detail}")]
178 InvalidPolicy { detail: String },
179 #[error("time source unavailable: {detail}")]
180 TimeSourceUnavailable { detail: String },
181 #[error("local timeout budget exhausted at {observed_at_ms}ms (deadline {deadline_at_ms}ms)")]
182 DeadlineExceeded {
183 deadline_at_ms: u64,
184 observed_at_ms: u64,
185 },
186 #[error("retry attempt budget exhausted after {attempts_used} attempts (max {max_attempts})")]
187 AttemptBudgetExhausted {
188 max_attempts: u32,
189 attempts_used: u32,
190 },
191}
192
193impl TimeoutBudgetError {
194 pub fn invalid_policy(detail: impl Into<String>) -> Self {
195 Self::InvalidPolicy {
196 detail: detail.into(),
197 }
198 }
199
200 pub fn time_source_unavailable(detail: impl Into<String>) -> Self {
201 Self::TimeSourceUnavailable {
202 detail: detail.into(),
203 }
204 }
205
206 pub fn deadline_exceeded(deadline_at_ms: u64, observed_at_ms: u64) -> Self {
207 Self::DeadlineExceeded {
208 deadline_at_ms,
209 observed_at_ms,
210 }
211 }
212
213 pub fn attempt_budget_exhausted(max_attempts: u32, attempts_used: u32) -> Self {
214 Self::AttemptBudgetExhausted {
215 max_attempts,
216 attempts_used,
217 }
218 }
219}
220
221impl ProtocolErrorCode for TimeoutBudgetError {
222 fn code(&self) -> &'static str {
223 match self {
224 Self::InvalidPolicy { .. } => "invalid_timeout_policy",
225 Self::TimeSourceUnavailable { .. } => "time_source_unavailable",
226 Self::DeadlineExceeded { .. } => "deadline_exceeded",
227 Self::AttemptBudgetExhausted { .. } => "attempt_budget_exhausted",
228 }
229 }
230}
231
232impl From<TimeoutBudgetError> for AuraError {
233 fn from(value: TimeoutBudgetError) -> Self {
234 match value {
235 TimeoutBudgetError::InvalidPolicy { detail } => {
236 AuraError::invalid(format!("invalid_timeout_policy: {detail}"))
237 }
238 TimeoutBudgetError::TimeSourceUnavailable { detail } => {
239 AuraError::internal(format!("time_source_unavailable: {detail}"))
240 }
241 TimeoutBudgetError::DeadlineExceeded {
242 deadline_at_ms,
243 observed_at_ms,
244 } => AuraError::terminal(format!(
245 "deadline_exceeded: observed_at_ms={observed_at_ms} deadline_at_ms={deadline_at_ms}"
246 )),
247 TimeoutBudgetError::AttemptBudgetExhausted {
248 max_attempts,
249 attempts_used,
250 } => AuraError::terminal(format!(
251 "attempt_budget_exhausted: attempts_used={attempts_used} max_attempts={max_attempts}"
252 )),
253 }
254 }
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
262pub struct TimeoutBudget {
263 started_at_ms: u64,
264 deadline_at_ms: u64,
265}
266
267impl TimeoutBudget {
268 pub fn from_start_and_timeout(
269 started_at: &PhysicalTime,
270 timeout: Duration,
271 ) -> TimeoutBudgetResult<Self> {
272 let timeout_ms = duration_to_ms(timeout)?;
273 let deadline_at_ms = started_at
274 .ts_ms
275 .checked_add(timeout_ms)
276 .ok_or_else(|| TimeoutBudgetError::invalid_policy("timeout deadline overflow"))?;
277 Ok(Self {
278 started_at_ms: started_at.ts_ms,
279 deadline_at_ms,
280 })
281 }
282
283 pub fn started_at_ms(&self) -> u64 {
284 self.started_at_ms
285 }
286
287 pub fn deadline_at_ms(&self) -> u64 {
288 self.deadline_at_ms
289 }
290
291 pub fn timeout_ms(&self) -> u64 {
292 self.deadline_at_ms.saturating_sub(self.started_at_ms)
293 }
294
295 pub fn time_semantics(&self) -> TimeoutTimeSemantics {
296 TimeoutTimeSemantics::LocalPhysicalBudget
297 }
298
299 pub fn remaining_at(&self, now: &PhysicalTime) -> TimeoutBudgetResult<Duration> {
300 if now.ts_ms >= self.deadline_at_ms {
301 return Err(TimeoutBudgetError::deadline_exceeded(
302 self.deadline_at_ms,
303 now.ts_ms,
304 ));
305 }
306 Ok(Duration::from_millis(self.deadline_at_ms - now.ts_ms))
307 }
308
309 pub fn remaining_or_zero_at(&self, now: &PhysicalTime) -> Duration {
310 Duration::from_millis(self.deadline_at_ms.saturating_sub(now.ts_ms))
311 }
312
313 pub fn clamp_to_remaining(
314 &self,
315 now: &PhysicalTime,
316 requested: Duration,
317 ) -> TimeoutBudgetResult<Duration> {
318 let remaining = self.remaining_at(now)?;
319 Ok(remaining.min(requested))
320 }
321
322 pub fn child_budget(
323 &self,
324 now: &PhysicalTime,
325 requested: Duration,
326 ) -> TimeoutBudgetResult<Self> {
327 Self::from_start_and_timeout(now, self.clamp_to_remaining(now, requested)?)
328 }
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
333pub struct AttemptBudget {
334 max_attempts: u32,
335 attempts_used: u32,
336}
337
338impl AttemptBudget {
339 pub fn new(max_attempts: u32) -> Self {
340 Self {
341 max_attempts,
342 attempts_used: 0,
343 }
344 }
345
346 pub fn max_attempts(&self) -> u32 {
347 self.max_attempts
348 }
349
350 pub fn attempts_used(&self) -> u32 {
351 self.attempts_used
352 }
353
354 pub fn remaining_attempts(&self) -> u32 {
355 self.max_attempts.saturating_sub(self.attempts_used)
356 }
357
358 pub fn can_attempt(&self) -> bool {
359 self.attempts_used < self.max_attempts
360 }
361
362 pub fn record_attempt(&mut self) -> TimeoutBudgetResult<u32> {
363 if !self.can_attempt() {
364 return Err(TimeoutBudgetError::attempt_budget_exhausted(
365 self.max_attempts,
366 self.attempts_used,
367 ));
368 }
369 let attempt = self.attempts_used;
370 self.attempts_used = self
371 .attempts_used
372 .checked_add(1)
373 .ok_or_else(|| TimeoutBudgetError::invalid_policy("attempt counter overflow"))?;
374 Ok(attempt)
375 }
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
380pub struct ExponentialBackoffPolicy {
381 initial_delay: Duration,
382 max_delay: Duration,
383 jitter: JitterMode,
384}
385
386impl ExponentialBackoffPolicy {
387 pub fn new(
388 initial_delay: Duration,
389 max_delay: Duration,
390 jitter: JitterMode,
391 ) -> TimeoutBudgetResult<Self> {
392 if initial_delay.is_zero() {
393 return Err(TimeoutBudgetError::invalid_policy(
394 "initial_delay must be greater than zero",
395 ));
396 }
397 if max_delay < initial_delay {
398 return Err(TimeoutBudgetError::invalid_policy(
399 "max_delay must be greater than or equal to initial_delay",
400 ));
401 }
402 Ok(Self {
403 initial_delay,
404 max_delay,
405 jitter,
406 })
407 }
408
409 pub fn initial_delay(&self) -> Duration {
410 self.initial_delay
411 }
412
413 pub fn max_delay(&self) -> Duration {
414 self.max_delay
415 }
416
417 pub fn jitter(&self) -> JitterMode {
418 self.jitter
419 }
420
421 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
422 let strategy = match self.jitter {
423 JitterMode::None => BackoffStrategy::Exponential,
424 JitterMode::Deterministic => BackoffStrategy::ExponentialWithJitter,
425 };
426 strategy.calculate_delay(attempt, self.initial_delay, self.max_delay)
427 }
428}
429
430#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
432pub struct RetryBudgetPolicy {
433 max_attempts: u32,
434 per_attempt_timeout: Option<Duration>,
435 backoff: ExponentialBackoffPolicy,
436}
437
438impl RetryBudgetPolicy {
439 #[must_use]
440 pub fn new(max_attempts: u32, backoff: ExponentialBackoffPolicy) -> Self {
441 Self {
442 max_attempts,
443 per_attempt_timeout: None,
444 backoff,
445 }
446 }
447
448 #[must_use]
449 pub fn with_per_attempt_timeout(mut self, timeout: Duration) -> Self {
450 self.per_attempt_timeout = Some(timeout);
451 self
452 }
453
454 pub fn max_attempts(&self) -> u32 {
455 self.max_attempts
456 }
457
458 pub fn per_attempt_timeout(&self) -> Option<Duration> {
459 self.per_attempt_timeout
460 }
461
462 pub fn backoff(&self) -> &ExponentialBackoffPolicy {
463 &self.backoff
464 }
465
466 pub fn attempt_budget(&self) -> AttemptBudget {
467 AttemptBudget::new(self.max_attempts)
468 }
469
470 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
471 self.backoff.delay_for_attempt(attempt)
472 }
473
474 pub fn as_retry_policy(&self) -> RetryPolicy {
475 let mut policy = RetryPolicy::exponential()
476 .with_max_attempts(self.max_attempts)
477 .with_initial_delay(self.backoff.initial_delay())
478 .with_max_delay(self.backoff.max_delay())
479 .with_jitter(self.backoff.jitter());
480
481 if let Some(timeout) = self.per_attempt_timeout {
482 policy = policy.with_timeout(timeout);
483 }
484
485 policy
486 }
487}
488
489#[derive(Debug, Clone, PartialEq, Eq)]
491pub enum TimeoutRunError<E> {
492 Timeout(TimeoutBudgetError),
493 Operation(E),
494}
495
496impl<E: fmt::Display> fmt::Display for TimeoutRunError<E> {
497 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 match self {
499 Self::Timeout(error) => write!(f, "{error}"),
500 Self::Operation(error) => write!(f, "{error}"),
501 }
502 }
503}
504
505#[derive(Debug, Clone, PartialEq, Eq)]
507pub enum RetryRunError<E> {
508 Timeout(TimeoutBudgetError),
509 AttemptsExhausted { attempts_used: u32, last_error: E },
510}
511
512impl<E: fmt::Display> fmt::Display for RetryRunError<E> {
513 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514 match self {
515 Self::Timeout(error) => write!(f, "{error}"),
516 Self::AttemptsExhausted {
517 attempts_used,
518 last_error,
519 } => write!(
520 f,
521 "retry attempts exhausted after {attempts_used} attempts: {last_error}"
522 ),
523 }
524 }
525}
526
527pub async fn execute_with_timeout_budget<TTime, F, Fut, T, E>(
529 time: &TTime,
530 budget: &TimeoutBudget,
531 operation: F,
532) -> Result<T, TimeoutRunError<E>>
533where
534 TTime: PhysicalTimeEffects + Sync,
535 F: FnOnce() -> Fut,
536 Fut: Future<Output = Result<T, E>>,
537{
538 let now = current_physical_time(time)
539 .await
540 .map_err(TimeoutRunError::Timeout)?;
541 let remaining = budget
542 .remaining_at(&now)
543 .map_err(TimeoutRunError::Timeout)?;
544 let sleep_ms = duration_to_ms(remaining).map_err(TimeoutRunError::Timeout)?;
545
546 let operation_future = operation();
547 let sleep_future = time.sleep_ms(sleep_ms);
548 pin_mut!(operation_future);
549 pin_mut!(sleep_future);
550 match futures::future::select(operation_future, sleep_future).await {
551 Either::Left((result, _sleep_future)) => result.map_err(TimeoutRunError::Operation),
552 Either::Right((sleep, _operation_future)) => {
553 sleep.map_err(|error| TimeoutRunError::Timeout(time_error(error)))?;
554 let observed_at_ms = current_physical_time(time)
555 .await
556 .map(|time| time.ts_ms)
557 .unwrap_or(budget.deadline_at_ms());
558 Err(TimeoutRunError::Timeout(
559 TimeoutBudgetError::deadline_exceeded(budget.deadline_at_ms(), observed_at_ms),
560 ))
561 }
562 }
563}
564
565pub async fn execute_with_retry_budget<TTime, F, Fut, T, E>(
567 time: &TTime,
568 policy: &RetryBudgetPolicy,
569 mut operation: F,
570) -> Result<T, RetryRunError<E>>
571where
572 TTime: PhysicalTimeEffects + Sync,
573 F: FnMut(u32) -> Fut,
574 Fut: Future<Output = Result<T, E>>,
575{
576 let mut attempts = policy.attempt_budget();
577
578 loop {
579 let attempt = attempts.record_attempt().map_err(RetryRunError::Timeout)?;
580
581 let result = if let Some(timeout) = policy.per_attempt_timeout() {
582 let now = current_physical_time(time)
583 .await
584 .map_err(RetryRunError::Timeout)?;
585 let budget = TimeoutBudget::from_start_and_timeout(&now, timeout)
586 .map_err(RetryRunError::Timeout)?;
587 execute_with_timeout_budget(time, &budget, || operation(attempt)).await
588 } else {
589 operation(attempt).await.map_err(TimeoutRunError::Operation)
590 };
591
592 match result {
593 Ok(value) => return Ok(value),
594 Err(TimeoutRunError::Timeout(error)) => return Err(RetryRunError::Timeout(error)),
595 Err(TimeoutRunError::Operation(error)) => {
596 if !attempts.can_attempt() {
597 return Err(RetryRunError::AttemptsExhausted {
598 attempts_used: attempts.attempts_used(),
599 last_error: error,
600 });
601 }
602
603 let delay_ms = duration_to_ms(policy.delay_for_attempt(attempt))
604 .map_err(RetryRunError::Timeout)?;
605 time.sleep_ms(delay_ms)
606 .await
607 .map_err(|error| RetryRunError::Timeout(time_error(error)))?;
608 }
609 }
610 }
611}
612
613fn duration_to_ms(duration: Duration) -> TimeoutBudgetResult<u64> {
614 u64::try_from(duration.as_millis()).map_err(|_| {
615 TimeoutBudgetError::invalid_policy("duration does not fit in u64 milliseconds")
616 })
617}
618
619async fn current_physical_time<TTime: PhysicalTimeEffects + Sync>(
620 time: &TTime,
621) -> TimeoutBudgetResult<PhysicalTime> {
622 time.physical_time().await.map_err(time_error)
623}
624
625fn time_error(error: TimeError) -> TimeoutBudgetError {
626 TimeoutBudgetError::time_source_unavailable(error.to_string())
627}
628
629#[cfg(test)]
630#[allow(clippy::disallowed_types, clippy::expect_used, clippy::redundant_clone)]
631mod tests {
632 use super::{
633 execute_with_retry_budget, execute_with_timeout_budget, AttemptBudget,
634 ExponentialBackoffPolicy, RetryBudgetPolicy, RetryRunError, TimeoutBudget,
635 TimeoutBudgetError, TimeoutExecutionClass, TimeoutExecutionProfile, TimeoutRunError,
636 TimeoutTimeSemantics,
637 };
638 use crate::{
639 effects::{JitterMode, PhysicalTimeEffects, TimeError},
640 time::{PhysicalTime, TimeDomain},
641 ProtocolErrorCode,
642 };
643 use parking_lot::Mutex;
644 use std::time::Duration;
645 use std::{collections::VecDeque, sync::Arc};
646
647 fn physical_time(ts_ms: u64) -> PhysicalTime {
648 PhysicalTime {
649 ts_ms,
650 uncertainty: None,
651 }
652 }
653
654 #[derive(Debug, Clone, Copy)]
655 enum SleepBehavior {
656 Immediate,
657 YieldOnce,
658 }
659
660 #[derive(Clone)]
661 struct ScriptedTimeEffects {
662 times: Arc<Mutex<VecDeque<PhysicalTime>>>,
663 sleeps: Arc<Mutex<Vec<u64>>>,
664 sleep_behavior: SleepBehavior,
665 }
666
667 impl ScriptedTimeEffects {
668 fn new(
669 times: impl IntoIterator<Item = PhysicalTime>,
670 sleep_behavior: SleepBehavior,
671 ) -> Self {
672 Self {
673 times: Arc::new(Mutex::new(times.into_iter().collect())),
674 sleeps: Arc::new(Mutex::new(Vec::new())),
675 sleep_behavior,
676 }
677 }
678
679 fn sleep_calls(&self) -> Vec<u64> {
680 self.sleeps.lock().clone()
681 }
682 }
683
684 #[async_trait::async_trait]
685 impl PhysicalTimeEffects for ScriptedTimeEffects {
686 async fn physical_time(&self) -> Result<PhysicalTime, TimeError> {
687 self.times
688 .lock()
689 .pop_front()
690 .ok_or(TimeError::ServiceUnavailable)
691 }
692
693 async fn sleep_ms(&self, ms: u64) -> Result<(), TimeError> {
694 self.sleeps.lock().push(ms);
695 match self.sleep_behavior {
696 SleepBehavior::Immediate => Ok(()),
697 SleepBehavior::YieldOnce => {
698 tokio::time::sleep(Duration::from_millis(1)).await;
699 Ok(())
700 }
701 }
702 }
703 }
704
705 #[test]
706 fn timeout_budget_tracks_remaining_and_child_budget() {
707 let budget =
708 TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
709 .expect("budget");
710
711 assert_eq!(budget.started_at_ms(), 1_000);
712 assert_eq!(budget.deadline_at_ms(), 6_000);
713 assert_eq!(budget.timeout_ms(), 5_000);
714 assert_eq!(
715 budget.time_semantics(),
716 TimeoutTimeSemantics::LocalPhysicalBudget
717 );
718 assert_eq!(
719 budget
720 .remaining_at(&physical_time(2_500))
721 .expect("remaining"),
722 Duration::from_millis(3_500)
723 );
724 assert_eq!(
725 budget
726 .clamp_to_remaining(&physical_time(2_500), Duration::from_secs(10))
727 .expect("clamped"),
728 Duration::from_millis(3_500)
729 );
730
731 let child = budget
732 .child_budget(&physical_time(2_500), Duration::from_secs(2))
733 .expect("child");
734 assert_eq!(child.started_at_ms(), 2_500);
735 assert_eq!(child.deadline_at_ms(), 4_500);
736 }
737
738 #[test]
739 fn timeout_budget_expires_with_typed_failure() {
740 let budget =
741 TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
742 .expect("budget");
743
744 let error = budget
745 .remaining_at(&physical_time(6_500))
746 .expect_err("expired");
747 assert_eq!(error.code(), "deadline_exceeded");
748 assert!(matches!(
749 error,
750 TimeoutBudgetError::DeadlineExceeded {
751 deadline_at_ms: 6_000,
752 observed_at_ms: 6_500,
753 }
754 ));
755 assert_eq!(
756 budget.remaining_or_zero_at(&physical_time(6_500)),
757 Duration::ZERO
758 );
759 }
760
761 #[test]
762 fn attempt_budget_enforces_max_attempts() {
763 let mut budget = AttemptBudget::new(2);
764 assert_eq!(budget.remaining_attempts(), 2);
765 assert_eq!(budget.record_attempt().expect("attempt 0"), 0);
766 assert_eq!(budget.record_attempt().expect("attempt 1"), 1);
767 assert_eq!(budget.remaining_attempts(), 0);
768
769 let error = budget.record_attempt().expect_err("exhausted");
770 assert_eq!(error.code(), "attempt_budget_exhausted");
771 assert!(matches!(
772 error,
773 TimeoutBudgetError::AttemptBudgetExhausted {
774 max_attempts: 2,
775 attempts_used: 2,
776 }
777 ));
778 }
779
780 #[test]
781 fn exponential_backoff_is_bounded_and_round_trips_to_retry_policy() {
782 let backoff = ExponentialBackoffPolicy::new(
783 Duration::from_millis(100),
784 Duration::from_secs(1),
785 JitterMode::Deterministic,
786 )
787 .expect("backoff");
788 let policy = RetryBudgetPolicy::new(4, backoff.clone())
789 .with_per_attempt_timeout(Duration::from_millis(750));
790
791 assert_eq!(backoff.delay_for_attempt(0), Duration::from_millis(100));
792 assert!(backoff.delay_for_attempt(4) <= Duration::from_secs(1));
793
794 let retry_policy = policy.as_retry_policy();
795 assert_eq!(retry_policy.max_attempts, 4);
796 assert_eq!(retry_policy.timeout, Some(Duration::from_millis(750)));
797 assert_eq!(
798 retry_policy.calculate_delay(3),
799 backoff.delay_for_attempt(3)
800 );
801 }
802
803 #[test]
804 fn timeout_time_semantics_preserve_domain_split() {
805 assert_eq!(
806 TimeoutTimeSemantics::LocalPhysicalBudget.local_time_domain(),
807 Some(TimeDomain::PhysicalClock)
808 );
809 assert_eq!(
810 TimeoutTimeSemantics::LogicalSemanticOrdering.local_time_domain(),
811 Some(TimeDomain::LogicalClock)
812 );
813 assert_eq!(
814 TimeoutTimeSemantics::OrderSemanticOrdering.local_time_domain(),
815 Some(TimeDomain::OrderClock)
816 );
817 assert_eq!(
818 TimeoutTimeSemantics::ProvenancedSemanticOrdering.local_time_domain(),
819 None
820 );
821 assert!(TimeoutTimeSemantics::LocalPhysicalBudget.is_local_budget_domain());
822 assert!(!TimeoutTimeSemantics::LogicalSemanticOrdering.is_local_budget_domain());
823 }
824
825 #[test]
826 fn timeout_execution_profiles_scale_policy_by_environment() {
827 let production = TimeoutExecutionProfile::production();
828 let simulation = TimeoutExecutionProfile::simulation_test();
829 let harness = TimeoutExecutionProfile::harness();
830 let base_backoff = ExponentialBackoffPolicy::new(
831 Duration::from_secs(2),
832 Duration::from_secs(10),
833 JitterMode::Deterministic,
834 )
835 .expect("backoff");
836 let base_retry = RetryBudgetPolicy::new(5, base_backoff.clone())
837 .with_per_attempt_timeout(Duration::from_secs(8));
838
839 assert_eq!(production.class(), TimeoutExecutionClass::Production);
840 assert_eq!(
841 production
842 .scale_duration(Duration::from_secs(4))
843 .expect("scaled"),
844 Duration::from_secs(4)
845 );
846 assert_eq!(simulation.jitter(), JitterMode::None);
847 assert_eq!(harness.scale_percent(), 25);
848
849 let scaled = harness
850 .apply_retry_policy(&base_retry)
851 .expect("scaled policy");
852 assert_eq!(scaled.max_attempts(), 5);
853 assert_eq!(scaled.per_attempt_timeout(), Some(Duration::from_secs(2)));
854 assert_eq!(scaled.backoff().initial_delay(), Duration::from_millis(500));
855 assert_eq!(scaled.backoff().max_delay(), Duration::from_millis(2_500));
856 assert_eq!(scaled.backoff().jitter(), JitterMode::None);
857 }
858
859 #[test]
860 fn execution_profile_scaling_preserves_local_success_and_failure_relations() {
861 let profiles = [
862 TimeoutExecutionProfile::production(),
863 TimeoutExecutionProfile::simulation_test(),
864 TimeoutExecutionProfile::harness(),
865 ];
866
867 let base_timeout = Duration::from_secs(4);
868 let base_success_latency = Duration::from_millis(1_500);
869 let base_failure_latency = Duration::from_secs(6);
870
871 assert!(base_success_latency <= base_timeout);
872 assert!(base_failure_latency > base_timeout);
873
874 for profile in profiles {
875 let scaled_timeout = profile
876 .scale_duration(base_timeout)
877 .expect("scaled timeout");
878 let scaled_success = profile
879 .scale_duration(base_success_latency)
880 .expect("scaled success latency");
881 let scaled_failure = profile
882 .scale_duration(base_failure_latency)
883 .expect("scaled failure latency");
884
885 assert!(
886 scaled_success <= scaled_timeout,
887 "profile {:?} changed a local success relation into failure",
888 profile.class()
889 );
890 assert!(
891 scaled_failure > scaled_timeout,
892 "profile {:?} changed a local failure relation into success",
893 profile.class()
894 );
895 }
896 }
897
898 #[tokio::test]
899 async fn timeout_wrapper_returns_typed_deadline_error() {
900 let effects = ScriptedTimeEffects::new(
901 [physical_time(1_000), physical_time(6_200)],
902 SleepBehavior::Immediate,
903 );
904 let budget =
905 TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
906 .expect("budget");
907
908 let error = execute_with_timeout_budget(&effects, &budget, || async {
909 futures::future::pending::<Result<(), &'static str>>().await
910 })
911 .await
912 .expect_err("timed out");
913
914 assert!(matches!(
915 error,
916 TimeoutRunError::Timeout(TimeoutBudgetError::DeadlineExceeded {
917 deadline_at_ms: 6_000,
918 observed_at_ms: 6_200,
919 })
920 ));
921 assert_eq!(effects.sleep_calls(), vec![5_000]);
922 }
923
924 #[tokio::test]
925 async fn timeout_wrapper_preserves_remaining_child_budget() {
926 let parent =
927 TimeoutBudget::from_start_and_timeout(&physical_time(1_000), Duration::from_secs(5))
928 .expect("parent");
929 let child = parent
930 .child_budget(&physical_time(2_500), Duration::from_secs(10))
931 .expect("child");
932 let effects = ScriptedTimeEffects::new(
933 [physical_time(2_500), physical_time(6_100)],
934 SleepBehavior::Immediate,
935 );
936
937 let error = execute_with_timeout_budget(&effects, &child, || async {
938 futures::future::pending::<Result<(), &'static str>>().await
939 })
940 .await
941 .expect_err("timed out");
942
943 assert!(matches!(
944 error,
945 TimeoutRunError::Timeout(TimeoutBudgetError::DeadlineExceeded {
946 deadline_at_ms: 6_000,
947 observed_at_ms: 6_100,
948 })
949 ));
950 assert_eq!(effects.sleep_calls(), vec![3_500]);
951 }
952
953 #[tokio::test]
954 async fn retry_wrapper_retries_with_typed_backoff_policy() {
955 let effects = ScriptedTimeEffects::new([], SleepBehavior::YieldOnce);
956 let policy = RetryBudgetPolicy::new(
957 3,
958 ExponentialBackoffPolicy::new(
959 Duration::from_millis(100),
960 Duration::from_secs(1),
961 JitterMode::None,
962 )
963 .expect("backoff"),
964 );
965 let attempts = Arc::new(Mutex::new(Vec::new()));
966
967 let result = execute_with_retry_budget(&effects, &policy, {
968 let attempts = Arc::clone(&attempts);
969 move |attempt| {
970 let attempts = Arc::clone(&attempts);
971 async move {
972 attempts.lock().push(attempt);
973 if attempt < 2 {
974 Err("retryable failure")
975 } else {
976 Ok("done")
977 }
978 }
979 }
980 })
981 .await
982 .expect("eventual success");
983
984 assert_eq!(result, "done");
985 assert_eq!(*attempts.lock(), vec![0, 1, 2]);
986 assert_eq!(effects.sleep_calls(), vec![100, 200]);
987 }
988
989 #[tokio::test]
990 async fn retry_wrapper_surfaces_typed_attempt_exhaustion() {
991 let effects = ScriptedTimeEffects::new([], SleepBehavior::YieldOnce);
992 let policy = RetryBudgetPolicy::new(
993 2,
994 ExponentialBackoffPolicy::new(
995 Duration::from_millis(50),
996 Duration::from_millis(200),
997 JitterMode::None,
998 )
999 .expect("backoff"),
1000 );
1001
1002 let error = execute_with_retry_budget(&effects, &policy, |_attempt| async {
1003 Err::<(), _>("still failing")
1004 })
1005 .await
1006 .expect_err("exhausted");
1007
1008 assert!(matches!(
1009 error,
1010 RetryRunError::AttemptsExhausted {
1011 attempts_used: 2,
1012 last_error: "still failing",
1013 }
1014 ));
1015 assert_eq!(effects.sleep_calls(), vec![50]);
1016 }
1017}