use std::marker::PhantomData;
use std::sync::Arc;
use blake2::{Blake2b512, Digest};
use serde::{Deserialize, Serialize};
use crate::duration::Duration;
use crate::error::DurableError;
use crate::sealed::Sealed;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum JitterStrategy {
#[default]
None,
Full,
Half,
}
impl JitterStrategy {
pub fn apply(&self, delay_secs: f64, attempt: u32) -> f64 {
match self {
JitterStrategy::None => delay_secs,
JitterStrategy::Full => {
let factor = deterministic_random_factor(attempt);
factor * delay_secs
}
JitterStrategy::Half => {
let factor = deterministic_random_factor(attempt);
delay_secs / 2.0 + factor * (delay_secs / 2.0)
}
}
}
}
fn deterministic_random_factor(attempt: u32) -> f64 {
let mut hasher = Blake2b512::new();
hasher.update(b"jitter");
hasher.update(attempt.to_le_bytes());
let result = hasher.finalize();
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&result[..8]);
let value = u64::from_le_bytes(bytes);
(value as f64) / (u64::MAX as f64)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WaitDecision {
Continue { delay: Duration },
Done,
}
pub struct WaitStrategyConfig<T> {
pub max_attempts: Option<usize>,
pub initial_delay: Duration,
pub max_delay: Duration,
pub backoff_rate: f64,
pub jitter: JitterStrategy,
pub should_continue_polling: Box<dyn Fn(&T) -> bool + Send + Sync>,
}
#[allow(clippy::type_complexity)]
pub fn create_wait_strategy<T: Send + Sync + 'static>(
config: WaitStrategyConfig<T>,
) -> Box<dyn Fn(&T, usize) -> WaitDecision + Send + Sync> {
let max_attempts = config.max_attempts.unwrap_or(60);
let initial_delay_secs = config.initial_delay.to_seconds() as f64;
let max_delay_secs = config.max_delay.to_seconds() as f64;
let backoff_rate = config.backoff_rate;
let jitter = config.jitter;
let should_continue = config.should_continue_polling;
Box::new(move |result: &T, attempts_made: usize| -> WaitDecision {
if !should_continue(result) {
return WaitDecision::Done;
}
if attempts_made >= max_attempts {
return WaitDecision::Done;
}
let exponent = if attempts_made > 0 {
(attempts_made as i32) - 1
} else {
0
};
let base_delay = (initial_delay_secs * backoff_rate.powi(exponent)).min(max_delay_secs);
let jittered = jitter.apply(base_delay, attempts_made as u32);
let final_delay = jittered.max(1.0).round() as u64;
WaitDecision::Continue {
delay: Duration::from_seconds(final_delay),
}
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CheckpointingMode {
Eager,
Batched,
Optimistic,
}
impl Default for CheckpointingMode {
fn default() -> Self {
Self::Batched
}
}
impl CheckpointingMode {
pub fn is_eager(&self) -> bool {
matches!(self, Self::Eager)
}
pub fn is_batched(&self) -> bool {
matches!(self, Self::Batched)
}
pub fn is_optimistic(&self) -> bool {
matches!(self, Self::Optimistic)
}
pub fn description(&self) -> &'static str {
match self {
Self::Eager => "Checkpoint after every operation (maximum durability)",
Self::Batched => "Batch operations before checkpointing (balanced)",
Self::Optimistic => {
"Execute multiple operations before checkpointing (best performance)"
}
}
}
}
#[allow(private_bounds)]
pub trait RetryStrategy: Sealed + Send + Sync {
fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration>;
fn clone_box(&self) -> Box<dyn RetryStrategy>;
}
impl Clone for Box<dyn RetryStrategy> {
fn clone(&self) -> Self {
self.clone_box()
}
}
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
pub max_attempts: u32,
pub base_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
pub jitter: JitterStrategy,
}
impl ExponentialBackoff {
pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
Self {
max_attempts,
base_delay,
max_delay: Duration::from_hours(1),
multiplier: 2.0,
jitter: JitterStrategy::None,
}
}
pub fn builder() -> ExponentialBackoffBuilder {
ExponentialBackoffBuilder::default()
}
}
impl Sealed for ExponentialBackoff {}
impl RetryStrategy for ExponentialBackoff {
fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
if attempt >= self.max_attempts {
return None;
}
let base_seconds = self.base_delay.to_seconds() as f64;
let delay_seconds = base_seconds * self.multiplier.powi(attempt as i32);
let max_seconds = self.max_delay.to_seconds() as f64;
let capped_seconds = delay_seconds.min(max_seconds);
let jittered = self.jitter.apply(capped_seconds, attempt);
let final_seconds = jittered.max(1.0);
Some(Duration::from_seconds(final_seconds as u64))
}
fn clone_box(&self) -> Box<dyn RetryStrategy> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct ExponentialBackoffBuilder {
max_attempts: u32,
base_delay: Duration,
max_delay: Duration,
multiplier: f64,
jitter: JitterStrategy,
}
impl Default for ExponentialBackoffBuilder {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay: Duration::from_seconds(1),
max_delay: Duration::from_hours(1),
multiplier: 2.0,
jitter: JitterStrategy::None,
}
}
}
impl ExponentialBackoffBuilder {
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn base_delay(mut self, base_delay: Duration) -> Self {
self.base_delay = base_delay;
self
}
pub fn max_delay(mut self, max_delay: Duration) -> Self {
self.max_delay = max_delay;
self
}
pub fn multiplier(mut self, multiplier: f64) -> Self {
self.multiplier = multiplier;
self
}
pub fn jitter(mut self, jitter: JitterStrategy) -> Self {
self.jitter = jitter;
self
}
pub fn build(self) -> ExponentialBackoff {
ExponentialBackoff {
max_attempts: self.max_attempts,
base_delay: self.base_delay,
max_delay: self.max_delay,
multiplier: self.multiplier,
jitter: self.jitter,
}
}
}
#[derive(Debug, Clone)]
pub struct FixedDelay {
pub max_attempts: u32,
pub delay: Duration,
pub jitter: JitterStrategy,
}
impl FixedDelay {
pub fn new(max_attempts: u32, delay: Duration) -> Self {
Self {
max_attempts,
delay,
jitter: JitterStrategy::None,
}
}
pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
self.jitter = jitter;
self
}
}
impl Sealed for FixedDelay {}
impl RetryStrategy for FixedDelay {
fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
if attempt >= self.max_attempts {
return None;
}
let delay_secs = self.delay.to_seconds() as f64;
let jittered = self.jitter.apply(delay_secs, attempt);
let final_seconds = jittered.max(1.0);
Some(Duration::from_seconds(final_seconds as u64))
}
fn clone_box(&self) -> Box<dyn RetryStrategy> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct LinearBackoff {
pub max_attempts: u32,
pub base_delay: Duration,
pub max_delay: Duration,
pub jitter: JitterStrategy,
}
impl LinearBackoff {
pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
Self {
max_attempts,
base_delay,
max_delay: Duration::from_hours(1),
jitter: JitterStrategy::None,
}
}
pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
self.max_delay = max_delay;
self
}
pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
self.jitter = jitter;
self
}
}
impl Sealed for LinearBackoff {}
impl RetryStrategy for LinearBackoff {
fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
if attempt >= self.max_attempts {
return None;
}
let base_seconds = self.base_delay.to_seconds();
let delay_seconds = base_seconds.saturating_mul((attempt + 1) as u64);
let max_seconds = self.max_delay.to_seconds();
let capped_seconds = delay_seconds.min(max_seconds) as f64;
let jittered = self.jitter.apply(capped_seconds, attempt);
let final_seconds = jittered.max(1.0);
Some(Duration::from_seconds(final_seconds as u64))
}
fn clone_box(&self) -> Box<dyn RetryStrategy> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct NoRetry;
impl Sealed for NoRetry {}
impl RetryStrategy for NoRetry {
fn next_delay(&self, _attempt: u32, _error: &str) -> Option<Duration> {
None
}
fn clone_box(&self) -> Box<dyn RetryStrategy> {
Box::new(*self)
}
}
#[derive(Clone)]
pub enum ErrorPattern {
Contains(String),
Regex(regex::Regex),
}
impl std::fmt::Debug for ErrorPattern {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ErrorPattern::Contains(s) => f.debug_tuple("Contains").field(s).finish(),
ErrorPattern::Regex(r) => f.debug_tuple("Regex").field(&r.as_str()).finish(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct RetryableErrorFilter {
pub patterns: Vec<ErrorPattern>,
pub error_types: Vec<String>,
}
impl RetryableErrorFilter {
pub fn is_retryable(&self, error_msg: &str) -> bool {
if self.patterns.is_empty() && self.error_types.is_empty() {
return true;
}
self.patterns.iter().any(|p| match p {
ErrorPattern::Contains(s) => error_msg.contains(s.as_str()),
ErrorPattern::Regex(r) => r.is_match(error_msg),
})
}
pub fn is_retryable_with_type(&self, error_msg: &str, error_type: &str) -> bool {
if self.patterns.is_empty() && self.error_types.is_empty() {
return true;
}
let matches_type = self.error_types.iter().any(|t| t == error_type);
matches_type || self.is_retryable(error_msg)
}
}
pub fn custom_retry<F>(f: F) -> CustomRetry<F>
where
F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
{
CustomRetry { f }
}
#[derive(Clone)]
pub struct CustomRetry<F>
where
F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
{
f: F,
}
impl<F> std::fmt::Debug for CustomRetry<F>
where
F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CustomRetry").finish()
}
}
impl<F> Sealed for CustomRetry<F> where
F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static
{
}
impl<F> RetryStrategy for CustomRetry<F>
where
F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
{
fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration> {
(self.f)(attempt, error)
}
fn clone_box(&self) -> Box<dyn RetryStrategy> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum StepSemantics {
AtMostOncePerRetry,
#[default]
AtLeastOncePerRetry,
}
#[derive(Clone, Default)]
pub struct StepConfig {
pub retry_strategy: Option<Box<dyn RetryStrategy>>,
pub step_semantics: StepSemantics,
pub serdes: Option<Arc<dyn SerDesAny>>,
pub retryable_error_filter: Option<RetryableErrorFilter>,
}
impl std::fmt::Debug for StepConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StepConfig")
.field("retry_strategy", &self.retry_strategy.is_some())
.field("step_semantics", &self.step_semantics)
.field("serdes", &self.serdes.is_some())
.field(
"retryable_error_filter",
&self.retryable_error_filter.is_some(),
)
.finish()
}
}
#[derive(Debug, Clone, Default)]
pub struct CallbackConfig {
pub timeout: Duration,
pub heartbeat_timeout: Duration,
pub serdes: Option<Arc<dyn SerDesAny>>,
}
#[derive(Clone)]
pub struct InvokeConfig<P, R> {
pub timeout: Duration,
pub serdes_payload: Option<Arc<dyn SerDesAny>>,
pub serdes_result: Option<Arc<dyn SerDesAny>>,
pub tenant_id: Option<String>,
_marker: PhantomData<(P, R)>,
}
impl<P, R> Default for InvokeConfig<P, R> {
fn default() -> Self {
Self {
timeout: Duration::default(),
serdes_payload: None,
serdes_result: None,
tenant_id: None,
_marker: PhantomData,
}
}
}
impl<P, R> std::fmt::Debug for InvokeConfig<P, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InvokeConfig")
.field("timeout", &self.timeout)
.field("serdes_payload", &self.serdes_payload.is_some())
.field("serdes_result", &self.serdes_result.is_some())
.field("tenant_id", &self.tenant_id)
.finish()
}
}
#[derive(Debug, Clone, Default)]
pub struct MapConfig {
pub max_concurrency: Option<usize>,
pub item_batcher: Option<ItemBatcher>,
pub completion_config: CompletionConfig,
pub serdes: Option<Arc<dyn SerDesAny>>,
}
#[derive(Debug, Clone, Default)]
pub struct ParallelConfig {
pub max_concurrency: Option<usize>,
pub completion_config: CompletionConfig,
pub serdes: Option<Arc<dyn SerDesAny>>,
}
#[derive(Clone, Default)]
#[allow(clippy::type_complexity)]
pub struct ChildConfig {
pub serdes: Option<Arc<dyn SerDesAny>>,
pub replay_children: bool,
pub error_mapper: Option<Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>>,
pub summary_generator: Option<Arc<dyn Fn(&str) -> String + Send + Sync>>,
}
impl std::fmt::Debug for ChildConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChildConfig")
.field("serdes", &self.serdes)
.field("replay_children", &self.replay_children)
.field("error_mapper", &self.error_mapper.as_ref().map(|_| "..."))
.field(
"summary_generator",
&self.summary_generator.as_ref().map(|_| "..."),
)
.finish()
}
}
impl ChildConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_replay_children() -> Self {
Self {
replay_children: true,
..Default::default()
}
}
pub fn set_replay_children(mut self, replay_children: bool) -> Self {
self.replay_children = replay_children;
self
}
pub fn set_serdes(mut self, serdes: Arc<dyn SerDesAny>) -> Self {
self.serdes = Some(serdes);
self
}
pub fn set_error_mapper(
mut self,
mapper: Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>,
) -> Self {
self.error_mapper = Some(mapper);
self
}
pub fn set_summary_generator(
mut self,
generator: Arc<dyn Fn(&str) -> String + Send + Sync>,
) -> Self {
self.summary_generator = Some(generator);
self
}
}
pub type ContextConfig = ChildConfig;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CompletionConfig {
pub min_successful: Option<usize>,
pub tolerated_failure_count: Option<usize>,
pub tolerated_failure_percentage: Option<f64>,
}
impl CompletionConfig {
pub fn first_successful() -> Self {
Self {
min_successful: Some(1),
..Default::default()
}
}
pub fn all_completed() -> Self {
Self::default()
}
pub fn all_successful() -> Self {
Self {
tolerated_failure_count: Some(0),
tolerated_failure_percentage: Some(0.0),
..Default::default()
}
}
pub fn with_min_successful(count: usize) -> Self {
Self {
min_successful: Some(count),
..Default::default()
}
}
pub fn with_failure_tolerance(count: usize) -> Self {
Self {
tolerated_failure_count: Some(count),
..Default::default()
}
}
}
#[derive(Debug, Clone)]
pub struct ItemBatcher {
pub max_items_per_batch: usize,
pub max_bytes_per_batch: usize,
}
impl Default for ItemBatcher {
fn default() -> Self {
Self {
max_items_per_batch: 100,
max_bytes_per_batch: 256 * 1024, }
}
}
impl ItemBatcher {
pub fn new(max_items_per_batch: usize, max_bytes_per_batch: usize) -> Self {
Self {
max_items_per_batch,
max_bytes_per_batch,
}
}
pub fn batch<T: Serialize + Clone>(&self, items: &[T]) -> Vec<(usize, Vec<T>)> {
if items.is_empty() {
return Vec::new();
}
let mut batches = Vec::new();
let mut current_batch = Vec::new();
let mut current_bytes = 0usize;
let mut batch_start_index = 0;
for (i, item) in items.iter().enumerate() {
let item_bytes = serde_json::to_string(item).map(|s| s.len()).unwrap_or(0);
let would_exceed_items = current_batch.len() >= self.max_items_per_batch;
let would_exceed_bytes =
current_bytes + item_bytes > self.max_bytes_per_batch && !current_batch.is_empty();
if would_exceed_items || would_exceed_bytes {
batches.push((batch_start_index, std::mem::take(&mut current_batch)));
current_bytes = 0;
batch_start_index = i;
}
current_batch.push(item.clone());
current_bytes += item_bytes;
}
if !current_batch.is_empty() {
batches.push((batch_start_index, current_batch));
}
batches
}
}
pub trait SerDesAny: Send + Sync {
fn serialize_any(
&self,
value: &dyn std::any::Any,
) -> Result<String, crate::error::DurableError>;
fn deserialize_any(
&self,
data: &str,
) -> Result<Box<dyn std::any::Any + Send>, crate::error::DurableError>;
}
impl std::fmt::Debug for dyn SerDesAny {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("SerDesAny")
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
#[test]
fn test_step_semantics_default() {
let semantics = StepSemantics::default();
assert_eq!(semantics, StepSemantics::AtLeastOncePerRetry);
}
#[test]
fn test_step_config_default() {
let config = StepConfig::default();
assert!(config.retry_strategy.is_none());
assert_eq!(config.step_semantics, StepSemantics::AtLeastOncePerRetry);
assert!(config.serdes.is_none());
}
#[test]
fn test_completion_config_first_successful() {
let config = CompletionConfig::first_successful();
assert_eq!(config.min_successful, Some(1));
assert!(config.tolerated_failure_count.is_none());
assert!(config.tolerated_failure_percentage.is_none());
}
#[test]
fn test_completion_config_all_completed() {
let config = CompletionConfig::all_completed();
assert!(config.min_successful.is_none());
assert!(config.tolerated_failure_count.is_none());
assert!(config.tolerated_failure_percentage.is_none());
}
#[test]
fn test_completion_config_all_successful() {
let config = CompletionConfig::all_successful();
assert!(config.min_successful.is_none());
assert_eq!(config.tolerated_failure_count, Some(0));
assert_eq!(config.tolerated_failure_percentage, Some(0.0));
}
#[test]
fn test_item_batcher_default() {
let batcher = ItemBatcher::default();
assert_eq!(batcher.max_items_per_batch, 100);
assert_eq!(batcher.max_bytes_per_batch, 256 * 1024);
}
#[test]
fn test_item_batcher_new() {
let batcher = ItemBatcher::new(50, 128 * 1024);
assert_eq!(batcher.max_items_per_batch, 50);
assert_eq!(batcher.max_bytes_per_batch, 128 * 1024);
}
#[test]
fn test_callback_config_default() {
let config = CallbackConfig::default();
assert_eq!(config.timeout.to_seconds(), 0);
assert_eq!(config.heartbeat_timeout.to_seconds(), 0);
}
#[test]
fn test_invoke_config_default() {
let config: InvokeConfig<String, String> = InvokeConfig::default();
assert_eq!(config.timeout.to_seconds(), 0);
assert!(config.tenant_id.is_none());
}
#[test]
fn test_map_config_default() {
let config = MapConfig::default();
assert!(config.max_concurrency.is_none());
assert!(config.item_batcher.is_none());
}
#[test]
fn test_parallel_config_default() {
let config = ParallelConfig::default();
assert!(config.max_concurrency.is_none());
}
#[test]
fn test_child_config_default() {
let config = ChildConfig::default();
assert!(!config.replay_children);
assert!(config.serdes.is_none());
assert!(config.error_mapper.is_none());
assert!(config.summary_generator.is_none());
}
#[test]
fn test_child_config_with_replay_children() {
let config = ChildConfig::with_replay_children();
assert!(config.replay_children);
}
#[test]
fn test_child_config_set_replay_children() {
let config = ChildConfig::new().set_replay_children(true);
assert!(config.replay_children);
}
#[test]
fn test_context_config_type_alias() {
let config: ContextConfig = ContextConfig::with_replay_children();
assert!(config.replay_children);
}
#[test]
fn test_checkpointing_mode_default() {
let mode = CheckpointingMode::default();
assert_eq!(mode, CheckpointingMode::Batched);
assert!(mode.is_batched());
}
#[test]
fn test_checkpointing_mode_eager() {
let mode = CheckpointingMode::Eager;
assert!(mode.is_eager());
assert!(!mode.is_batched());
assert!(!mode.is_optimistic());
}
#[test]
fn test_checkpointing_mode_batched() {
let mode = CheckpointingMode::Batched;
assert!(!mode.is_eager());
assert!(mode.is_batched());
assert!(!mode.is_optimistic());
}
#[test]
fn test_checkpointing_mode_optimistic() {
let mode = CheckpointingMode::Optimistic;
assert!(!mode.is_eager());
assert!(!mode.is_batched());
assert!(mode.is_optimistic());
}
#[test]
fn test_checkpointing_mode_description() {
assert!(CheckpointingMode::Eager
.description()
.contains("maximum durability"));
assert!(CheckpointingMode::Batched
.description()
.contains("balanced"));
assert!(CheckpointingMode::Optimistic
.description()
.contains("best performance"));
}
#[test]
fn test_checkpointing_mode_serialization() {
let mode = CheckpointingMode::Eager;
let serialized = serde_json::to_string(&mode).unwrap();
let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
assert_eq!(mode, deserialized);
let mode = CheckpointingMode::Batched;
let serialized = serde_json::to_string(&mode).unwrap();
let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
assert_eq!(mode, deserialized);
let mode = CheckpointingMode::Optimistic;
let serialized = serde_json::to_string(&mode).unwrap();
let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
assert_eq!(mode, deserialized);
}
#[test]
fn test_exponential_backoff_new() {
let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
assert_eq!(strategy.max_attempts, 5);
assert_eq!(strategy.base_delay.to_seconds(), 1);
assert_eq!(strategy.max_delay.to_seconds(), 3600); assert!((strategy.multiplier - 2.0).abs() < f64::EPSILON);
}
#[test]
fn test_exponential_backoff_builder() {
let strategy = ExponentialBackoff::builder()
.max_attempts(10)
.base_delay(Duration::from_seconds(2))
.max_delay(Duration::from_minutes(30))
.multiplier(3.0)
.build();
assert_eq!(strategy.max_attempts, 10);
assert_eq!(strategy.base_delay.to_seconds(), 2);
assert_eq!(strategy.max_delay.to_seconds(), 1800); assert!((strategy.multiplier - 3.0).abs() < f64::EPSILON);
}
#[test]
fn test_exponential_backoff_delays() {
let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(1)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(2)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(4)
);
assert_eq!(
strategy.next_delay(3, "error").map(|d| d.to_seconds()),
Some(8)
);
assert_eq!(
strategy.next_delay(4, "error").map(|d| d.to_seconds()),
Some(16)
);
assert_eq!(strategy.next_delay(5, "error"), None);
}
#[test]
fn test_exponential_backoff_max_delay_cap() {
let strategy = ExponentialBackoff::builder()
.max_attempts(10)
.base_delay(Duration::from_seconds(10))
.max_delay(Duration::from_seconds(30))
.build();
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(20)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(30)
);
assert_eq!(
strategy.next_delay(3, "error").map(|d| d.to_seconds()),
Some(30)
);
}
#[test]
fn test_fixed_delay_new() {
let strategy = FixedDelay::new(3, Duration::from_seconds(5));
assert_eq!(strategy.max_attempts, 3);
assert_eq!(strategy.delay.to_seconds(), 5);
}
#[test]
fn test_fixed_delay_constant() {
let strategy = FixedDelay::new(3, Duration::from_seconds(5));
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(5)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(5)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(5)
);
assert_eq!(strategy.next_delay(3, "error"), None);
}
#[test]
fn test_linear_backoff_new() {
let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
assert_eq!(strategy.max_attempts, 5);
assert_eq!(strategy.base_delay.to_seconds(), 2);
assert_eq!(strategy.max_delay.to_seconds(), 3600); }
#[test]
fn test_linear_backoff_with_max_delay() {
let strategy = LinearBackoff::new(5, Duration::from_seconds(2))
.with_max_delay(Duration::from_seconds(10));
assert_eq!(strategy.max_delay.to_seconds(), 10);
}
#[test]
fn test_linear_backoff_delays() {
let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(2)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(4)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(6)
);
assert_eq!(
strategy.next_delay(3, "error").map(|d| d.to_seconds()),
Some(8)
);
assert_eq!(
strategy.next_delay(4, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(strategy.next_delay(5, "error"), None);
}
#[test]
fn test_linear_backoff_max_delay_cap() {
let strategy = LinearBackoff::new(10, Duration::from_seconds(5))
.with_max_delay(Duration::from_seconds(15));
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(5)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(15)
);
assert_eq!(
strategy.next_delay(3, "error").map(|d| d.to_seconds()),
Some(15)
);
}
#[test]
fn test_no_retry() {
let strategy = NoRetry;
assert_eq!(strategy.next_delay(0, "error"), None);
assert_eq!(strategy.next_delay(1, "error"), None);
assert_eq!(strategy.next_delay(100, "error"), None);
}
#[test]
fn test_no_retry_default() {
let strategy = NoRetry;
assert_eq!(strategy.next_delay(0, "error"), None);
}
#[test]
fn test_custom_retry_basic() {
let strategy = custom_retry(|attempt, _error| {
if attempt >= 3 {
None
} else {
Some(Duration::from_seconds(10))
}
});
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(
strategy.next_delay(1, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(
strategy.next_delay(2, "error").map(|d| d.to_seconds()),
Some(10)
);
assert_eq!(strategy.next_delay(3, "error"), None);
}
#[test]
fn test_custom_retry_error_based() {
let strategy = custom_retry(|attempt, error| {
if attempt >= 5 {
return None;
}
if error.contains("transient") {
Some(Duration::from_seconds(1))
} else if error.contains("rate_limit") {
Some(Duration::from_seconds(30))
} else {
None }
});
assert_eq!(
strategy
.next_delay(0, "transient error")
.map(|d| d.to_seconds()),
Some(1)
);
assert_eq!(
strategy
.next_delay(0, "rate_limit exceeded")
.map(|d| d.to_seconds()),
Some(30)
);
assert_eq!(strategy.next_delay(0, "permanent failure"), None);
}
#[test]
fn test_retry_strategy_clone_box() {
let exp: Box<dyn RetryStrategy> =
Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
let exp_clone = exp.clone_box();
assert_eq!(
exp.next_delay(0, "e").map(|d| d.to_seconds()),
exp_clone.next_delay(0, "e").map(|d| d.to_seconds())
);
let fixed: Box<dyn RetryStrategy> = Box::new(FixedDelay::new(3, Duration::from_seconds(5)));
let fixed_clone = fixed.clone_box();
assert_eq!(
fixed.next_delay(0, "e").map(|d| d.to_seconds()),
fixed_clone.next_delay(0, "e").map(|d| d.to_seconds())
);
let linear: Box<dyn RetryStrategy> =
Box::new(LinearBackoff::new(3, Duration::from_seconds(2)));
let linear_clone = linear.clone_box();
assert_eq!(
linear.next_delay(0, "e").map(|d| d.to_seconds()),
linear_clone.next_delay(0, "e").map(|d| d.to_seconds())
);
let no_retry: Box<dyn RetryStrategy> = Box::new(NoRetry);
let no_retry_clone = no_retry.clone_box();
assert_eq!(
no_retry.next_delay(0, "e"),
no_retry_clone.next_delay(0, "e")
);
}
#[test]
fn test_boxed_retry_strategy_clone() {
let strategy: Box<dyn RetryStrategy> =
Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
let cloned = strategy.clone();
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
cloned.next_delay(0, "error").map(|d| d.to_seconds())
);
}
#[test]
fn test_step_config_with_retry_strategy() {
let config = StepConfig {
retry_strategy: Some(Box::new(ExponentialBackoff::new(
3,
Duration::from_seconds(1),
))),
step_semantics: StepSemantics::AtLeastOncePerRetry,
serdes: None,
retryable_error_filter: None,
};
assert!(config.retry_strategy.is_some());
let strategy = config.retry_strategy.as_ref().unwrap();
assert_eq!(
strategy.next_delay(0, "error").map(|d| d.to_seconds()),
Some(1)
);
}
#[test]
fn test_retry_strategy_debug() {
let exp = ExponentialBackoff::new(3, Duration::from_seconds(1));
let debug_str = format!("{:?}", exp);
assert!(debug_str.contains("ExponentialBackoff"));
let fixed = FixedDelay::new(3, Duration::from_seconds(5));
let debug_str = format!("{:?}", fixed);
assert!(debug_str.contains("FixedDelay"));
let linear = LinearBackoff::new(3, Duration::from_seconds(2));
let debug_str = format!("{:?}", linear);
assert!(debug_str.contains("LinearBackoff"));
let no_retry = NoRetry;
let debug_str = format!("{:?}", no_retry);
assert!(debug_str.contains("NoRetry"));
let custom = custom_retry(|_, _| None);
let debug_str = format!("{:?}", custom);
assert!(debug_str.contains("CustomRetry"));
}
fn step_semantics_strategy() -> impl Strategy<Value = StepSemantics> {
prop_oneof![
Just(StepSemantics::AtMostOncePerRetry),
Just(StepSemantics::AtLeastOncePerRetry),
]
}
fn checkpointing_mode_strategy() -> impl Strategy<Value = CheckpointingMode> {
prop_oneof![
Just(CheckpointingMode::Eager),
Just(CheckpointingMode::Batched),
Just(CheckpointingMode::Optimistic),
]
}
proptest! {
#[test]
fn prop_step_config_validity(semantics in step_semantics_strategy()) {
let config = StepConfig {
retry_strategy: None,
step_semantics: semantics,
serdes: None,
retryable_error_filter: None,
};
let _ = config.retry_strategy.is_none();
let _ = config.step_semantics;
let _ = config.serdes.is_none();
let debug_str = format!("{:?}", config);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_callback_config_positive_timeout(
timeout_secs in 1u64..=86400u64,
heartbeat_secs in 1u64..=86400u64
) {
let config = CallbackConfig {
timeout: Duration::from_seconds(timeout_secs),
heartbeat_timeout: Duration::from_seconds(heartbeat_secs),
serdes: None,
};
prop_assert_eq!(config.timeout.to_seconds(), timeout_secs);
prop_assert_eq!(config.heartbeat_timeout.to_seconds(), heartbeat_secs);
let debug_str = format!("{:?}", config);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_duration_conversion_roundtrip(seconds in 0u64..=u64::MAX / 2) {
let original = Duration::from_seconds(seconds);
let extracted = original.to_seconds();
let reconstructed = Duration::from_seconds(extracted);
prop_assert_eq!(original, reconstructed);
prop_assert_eq!(original.to_seconds(), reconstructed.to_seconds());
}
#[test]
fn prop_completion_config_consistency(
min_successful in proptest::option::of(0usize..100),
tolerated_count in proptest::option::of(0usize..100),
tolerated_pct in proptest::option::of(0.0f64..=1.0f64)
) {
let config = CompletionConfig {
min_successful,
tolerated_failure_count: tolerated_count,
tolerated_failure_percentage: tolerated_pct,
};
prop_assert_eq!(config.min_successful, min_successful);
prop_assert_eq!(config.tolerated_failure_count, tolerated_count);
prop_assert_eq!(config.tolerated_failure_percentage, tolerated_pct);
let serialized = serde_json::to_string(&config).unwrap();
let deserialized: CompletionConfig = serde_json::from_str(&serialized).unwrap();
prop_assert_eq!(config.min_successful, deserialized.min_successful);
prop_assert_eq!(config.tolerated_failure_count, deserialized.tolerated_failure_count);
match (config.tolerated_failure_percentage, deserialized.tolerated_failure_percentage) {
(Some(a), Some(b)) => prop_assert!((a - b).abs() < f64::EPSILON),
(None, None) => {},
_ => prop_assert!(false, "tolerated_failure_percentage mismatch"),
}
}
#[test]
fn prop_checkpointing_mode_roundtrip(mode in checkpointing_mode_strategy()) {
let serialized = serde_json::to_string(&mode).unwrap();
let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
prop_assert_eq!(mode, deserialized);
}
#[test]
fn prop_checkpointing_mode_classification(mode in checkpointing_mode_strategy()) {
let eager = mode.is_eager();
let batched = mode.is_batched();
let optimistic = mode.is_optimistic();
let count = [eager, batched, optimistic].iter().filter(|&&x| x).count();
prop_assert_eq!(count, 1, "Exactly one classification should be true");
match mode {
CheckpointingMode::Eager => prop_assert!(eager),
CheckpointingMode::Batched => prop_assert!(batched),
CheckpointingMode::Optimistic => prop_assert!(optimistic),
}
}
#[test]
fn prop_step_semantics_roundtrip(semantics in step_semantics_strategy()) {
let serialized = serde_json::to_string(&semantics).unwrap();
let deserialized: StepSemantics = serde_json::from_str(&serialized).unwrap();
prop_assert_eq!(semantics, deserialized);
}
#[test]
fn prop_item_batcher_validity(
max_items in 1usize..=10000,
max_bytes in 1usize..=10_000_000
) {
let batcher = ItemBatcher::new(max_items, max_bytes);
prop_assert_eq!(batcher.max_items_per_batch, max_items);
prop_assert_eq!(batcher.max_bytes_per_batch, max_bytes);
let debug_str = format!("{:?}", batcher);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_child_config_builder_consistency(replay_children in proptest::bool::ANY) {
let config = ChildConfig::new().set_replay_children(replay_children);
prop_assert_eq!(config.replay_children, replay_children);
let debug_str = format!("{:?}", config);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_map_config_validity(
max_concurrency in proptest::option::of(1usize..=1000)
) {
let config = MapConfig {
max_concurrency,
item_batcher: None,
completion_config: CompletionConfig::default(),
serdes: None,
};
prop_assert_eq!(config.max_concurrency, max_concurrency);
let debug_str = format!("{:?}", config);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_parallel_config_validity(
max_concurrency in proptest::option::of(1usize..=1000)
) {
let config = ParallelConfig {
max_concurrency,
completion_config: CompletionConfig::default(),
serdes: None,
};
prop_assert_eq!(config.max_concurrency, max_concurrency);
let debug_str = format!("{:?}", config);
prop_assert!(!debug_str.is_empty());
}
#[test]
fn prop_item_batcher_configuration_respected(
max_items in 1usize..=50,
max_bytes in 100usize..=10000,
item_count in 0usize..=200
) {
let batcher = ItemBatcher::new(max_items, max_bytes);
let items: Vec<String> = (0..item_count)
.map(|i| format!("item_{:04}", i))
.collect();
let batches = batcher.batch(&items);
for (_, batch) in &batches {
prop_assert!(
batch.len() <= max_items,
"Batch has {} items but max is {}",
batch.len(),
max_items
);
}
for (_, batch) in &batches {
let batch_bytes: usize = batch.iter()
.map(|item| serde_json::to_string(item).map(|s| s.len()).unwrap_or(0))
.sum();
if batch.len() > 1 {
prop_assert!(
batch_bytes <= max_bytes,
"Batch has {} bytes but max is {} (batch has {} items)",
batch_bytes,
max_bytes,
batch.len()
);
}
}
}
#[test]
fn prop_item_batcher_ordering_preservation(
max_items in 1usize..=50,
max_bytes in 100usize..=10000,
item_count in 0usize..=200
) {
let batcher = ItemBatcher::new(max_items, max_bytes);
let items: Vec<String> = (0..item_count)
.map(|i| format!("item_{:04}", i))
.collect();
let batches = batcher.batch(&items);
let reconstructed: Vec<String> = batches
.into_iter()
.flat_map(|(_, batch)| batch)
.collect();
prop_assert_eq!(
items.len(),
reconstructed.len(),
"Reconstructed list has different length: expected {}, got {}",
items.len(),
reconstructed.len()
);
for (i, (original, reconstructed_item)) in items.iter().zip(reconstructed.iter()).enumerate() {
prop_assert_eq!(
original,
reconstructed_item,
"Item at index {} differs: expected '{}', got '{}'",
i,
original,
reconstructed_item
);
}
}
}
#[test]
fn test_jitter_strategy_none_returns_exact_delay() {
let jitter = JitterStrategy::None;
assert_eq!(jitter.apply(10.0, 0), 10.0);
assert_eq!(jitter.apply(5.5, 3), 5.5);
assert_eq!(jitter.apply(0.0, 0), 0.0);
assert_eq!(jitter.apply(100.0, 99), 100.0);
}
#[test]
fn test_jitter_strategy_full_bounds() {
let jitter = JitterStrategy::Full;
for attempt in 0..20 {
let result = jitter.apply(10.0, attempt);
assert!(
(0.0..=10.0).contains(&result),
"Full jitter for attempt {} produced {}, expected [0, 10]",
attempt,
result
);
}
}
#[test]
fn test_jitter_strategy_half_bounds() {
let jitter = JitterStrategy::Half;
for attempt in 0..20 {
let result = jitter.apply(10.0, attempt);
assert!(
(5.0..=10.0).contains(&result),
"Half jitter for attempt {} produced {}, expected [5, 10]",
attempt,
result
);
}
}
#[test]
fn test_jitter_strategy_deterministic() {
let full = JitterStrategy::Full;
let r1 = full.apply(10.0, 5);
let r2 = full.apply(10.0, 5);
assert_eq!(r1, r2);
let half = JitterStrategy::Half;
let r1 = half.apply(10.0, 5);
let r2 = half.apply(10.0, 5);
assert_eq!(r1, r2);
}
#[test]
fn test_jitter_strategy_zero_delay() {
assert_eq!(JitterStrategy::Full.apply(0.0, 0), 0.0);
assert_eq!(JitterStrategy::Half.apply(0.0, 0), 0.0);
assert_eq!(JitterStrategy::None.apply(0.0, 0), 0.0);
}
#[test]
fn test_jitter_strategy_default_is_none() {
assert_eq!(JitterStrategy::default(), JitterStrategy::None);
}
#[test]
fn test_exponential_backoff_with_full_jitter() {
let strategy = ExponentialBackoff::builder()
.max_attempts(5)
.base_delay(Duration::from_seconds(5))
.max_delay(Duration::from_seconds(60))
.jitter(JitterStrategy::Full)
.build();
for attempt in 0..5 {
let delay = strategy.next_delay(attempt, "error");
assert!(delay.is_some());
let secs = delay.unwrap().to_seconds();
assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
}
assert!(strategy.next_delay(5, "error").is_none());
}
#[test]
fn test_exponential_backoff_with_half_jitter() {
let strategy = ExponentialBackoff::builder()
.max_attempts(5)
.base_delay(Duration::from_seconds(10))
.max_delay(Duration::from_seconds(60))
.jitter(JitterStrategy::Half)
.build();
for attempt in 0..5 {
let delay = strategy.next_delay(attempt, "error");
assert!(delay.is_some());
let secs = delay.unwrap().to_seconds();
assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
}
}
#[test]
fn test_exponential_backoff_no_jitter_unchanged() {
let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
assert_eq!(strategy.jitter, JitterStrategy::None);
assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(1));
assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(2));
assert_eq!(strategy.next_delay(2, "e").map(|d| d.to_seconds()), Some(4));
}
#[test]
fn test_fixed_delay_with_jitter() {
let strategy =
FixedDelay::new(3, Duration::from_seconds(10)).with_jitter(JitterStrategy::Full);
for attempt in 0..3 {
let delay = strategy.next_delay(attempt, "error");
assert!(delay.is_some());
let secs = delay.unwrap().to_seconds();
assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
}
assert!(strategy.next_delay(3, "error").is_none());
}
#[test]
fn test_fixed_delay_no_jitter_unchanged() {
let strategy = FixedDelay::new(3, Duration::from_seconds(5));
assert_eq!(strategy.jitter, JitterStrategy::None);
assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(5));
assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(5));
}
#[test]
fn test_linear_backoff_with_jitter() {
let strategy =
LinearBackoff::new(5, Duration::from_seconds(5)).with_jitter(JitterStrategy::Half);
for attempt in 0..5 {
let delay = strategy.next_delay(attempt, "error");
assert!(delay.is_some());
let secs = delay.unwrap().to_seconds();
assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
}
assert!(strategy.next_delay(5, "error").is_none());
}
#[test]
fn test_linear_backoff_no_jitter_unchanged() {
let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
assert_eq!(strategy.jitter, JitterStrategy::None);
assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(2));
assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(4));
}
#[test]
fn test_jitter_minimum_floor_all_strategies() {
let exp = ExponentialBackoff::builder()
.max_attempts(3)
.base_delay(Duration::from_seconds(1))
.jitter(JitterStrategy::Full)
.build();
for attempt in 0..3 {
let secs = exp.next_delay(attempt, "e").unwrap().to_seconds();
assert!(
secs >= 1,
"ExponentialBackoff attempt {} delay {} < 1",
attempt,
secs
);
}
let fixed = FixedDelay::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
for attempt in 0..3 {
let secs = fixed.next_delay(attempt, "e").unwrap().to_seconds();
assert!(
secs >= 1,
"FixedDelay attempt {} delay {} < 1",
attempt,
secs
);
}
let linear =
LinearBackoff::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
for attempt in 0..3 {
let secs = linear.next_delay(attempt, "e").unwrap().to_seconds();
assert!(
secs >= 1,
"LinearBackoff attempt {} delay {} < 1",
attempt,
secs
);
}
}
fn jitter_strategy_strategy() -> impl Strategy<Value = JitterStrategy> {
prop_oneof![
Just(JitterStrategy::None),
Just(JitterStrategy::Full),
Just(JitterStrategy::Half),
]
}
proptest! {
#[test]
fn prop_jitter_none_identity(delay in 0.0f64..1000.0, attempt in 0u32..100) {
let result = JitterStrategy::None.apply(delay, attempt);
prop_assert!((result - delay).abs() < f64::EPSILON,
"None jitter changed delay from {} to {}", delay, result);
}
#[test]
fn prop_jitter_full_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
let result = JitterStrategy::Full.apply(delay, attempt);
prop_assert!(result >= 0.0, "Full jitter result {} < 0", result);
prop_assert!(result <= delay + f64::EPSILON,
"Full jitter result {} > delay {}", result, delay);
}
#[test]
fn prop_jitter_half_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
let result = JitterStrategy::Half.apply(delay, attempt);
prop_assert!(result >= delay / 2.0 - f64::EPSILON,
"Half jitter result {} < delay/2 {}", result, delay / 2.0);
prop_assert!(result <= delay + f64::EPSILON,
"Half jitter result {} > delay {}", result, delay);
}
#[test]
fn prop_jitter_deterministic(
jitter in jitter_strategy_strategy(),
delay in 0.0f64..1000.0,
attempt in 0u32..100
) {
let r1 = jitter.apply(delay, attempt);
let r2 = jitter.apply(delay, attempt);
prop_assert!((r1 - r2).abs() < f64::EPSILON,
"Jitter not deterministic: {} vs {}", r1, r2);
}
#[test]
fn prop_jitter_minimum_floor(
jitter in jitter_strategy_strategy(),
attempt in 0u32..10,
base_delay_secs in 1u64..100
) {
let exp = ExponentialBackoff::builder()
.max_attempts(10)
.base_delay(Duration::from_seconds(base_delay_secs))
.jitter(jitter)
.build();
if let Some(d) = exp.next_delay(attempt, "e") {
prop_assert!(d.to_seconds() >= 1,
"ExponentialBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
}
let fixed = FixedDelay::new(10, Duration::from_seconds(base_delay_secs))
.with_jitter(jitter);
if let Some(d) = fixed.next_delay(attempt, "e") {
prop_assert!(d.to_seconds() >= 1,
"FixedDelay delay {} < 1 for attempt {}", d.to_seconds(), attempt);
}
let linear = LinearBackoff::new(10, Duration::from_seconds(base_delay_secs))
.with_jitter(jitter);
if let Some(d) = linear.next_delay(attempt, "e") {
prop_assert!(d.to_seconds() >= 1,
"LinearBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
}
}
}
}
#[cfg(test)]
mod retryable_error_filter_tests {
use super::*;
#[test]
fn test_empty_filter_retries_all() {
let filter = RetryableErrorFilter::default();
assert!(filter.is_retryable("any error message"));
assert!(filter.is_retryable(""));
assert!(filter.is_retryable("timeout"));
assert!(filter.is_retryable_with_type("any error", "AnyType"));
}
#[test]
fn test_contains_pattern_matches_substring() {
let filter = RetryableErrorFilter {
patterns: vec![ErrorPattern::Contains("timeout".to_string())],
error_types: vec![],
};
assert!(filter.is_retryable("request timeout occurred"));
assert!(filter.is_retryable("timeout"));
assert!(filter.is_retryable("a timeout happened"));
}
#[test]
fn test_contains_pattern_no_match() {
let filter = RetryableErrorFilter {
patterns: vec![ErrorPattern::Contains("timeout".to_string())],
error_types: vec![],
};
assert!(!filter.is_retryable("connection refused"));
assert!(!filter.is_retryable("invalid input"));
assert!(!filter.is_retryable(""));
}
#[test]
fn test_regex_pattern_matches() {
let filter = RetryableErrorFilter {
patterns: vec![ErrorPattern::Regex(
regex::Regex::new(r"(?i)connection.*refused").unwrap(),
)],
error_types: vec![],
};
assert!(filter.is_retryable("Connection was refused"));
assert!(filter.is_retryable("connection refused"));
assert!(filter.is_retryable("CONNECTION actively REFUSED"));
}
#[test]
fn test_regex_pattern_no_match() {
let filter = RetryableErrorFilter {
patterns: vec![ErrorPattern::Regex(
regex::Regex::new(r"(?i)connection.*refused").unwrap(),
)],
error_types: vec![],
};
assert!(!filter.is_retryable("timeout error"));
assert!(!filter.is_retryable("refused connection")); }
#[test]
fn test_or_logic_multiple_patterns() {
let filter = RetryableErrorFilter {
patterns: vec![
ErrorPattern::Contains("timeout".to_string()),
ErrorPattern::Regex(regex::Regex::new(r"(?i)connection.*refused").unwrap()),
],
error_types: vec![],
};
assert!(filter.is_retryable("request timeout"));
assert!(filter.is_retryable("Connection refused"));
assert!(!filter.is_retryable("invalid input"));
}
#[test]
fn test_error_type_matching() {
let filter = RetryableErrorFilter {
patterns: vec![],
error_types: vec!["TransientError".to_string()],
};
assert!(!filter.is_retryable("some error"));
assert!(filter.is_retryable_with_type("some error", "TransientError"));
assert!(!filter.is_retryable_with_type("some error", "PermanentError"));
}
#[test]
fn test_or_logic_patterns_and_types() {
let filter = RetryableErrorFilter {
patterns: vec![ErrorPattern::Contains("timeout".to_string())],
error_types: vec!["TransientError".to_string()],
};
assert!(filter.is_retryable_with_type("request timeout", "PermanentError"));
assert!(filter.is_retryable_with_type("invalid input", "TransientError"));
assert!(filter.is_retryable_with_type("request timeout", "TransientError"));
assert!(!filter.is_retryable_with_type("invalid input", "PermanentError"));
}
#[test]
fn test_error_pattern_debug() {
let contains = ErrorPattern::Contains("test".to_string());
let debug_str = format!("{:?}", contains);
assert!(debug_str.contains("Contains"));
assert!(debug_str.contains("test"));
let regex = ErrorPattern::Regex(regex::Regex::new(r"\d+").unwrap());
let debug_str = format!("{:?}", regex);
assert!(debug_str.contains("Regex"));
}
#[test]
fn test_retryable_error_filter_clone() {
let filter = RetryableErrorFilter {
patterns: vec![
ErrorPattern::Contains("timeout".to_string()),
ErrorPattern::Regex(regex::Regex::new(r"err\d+").unwrap()),
],
error_types: vec!["TransientError".to_string()],
};
let cloned = filter.clone();
assert!(cloned.is_retryable("timeout error"));
assert!(cloned.is_retryable("err42"));
assert!(cloned.is_retryable_with_type("x", "TransientError"));
}
#[test]
fn test_wait_decision_done_when_predicate_false() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(10),
initial_delay: Duration::from_seconds(5),
max_delay: Duration::from_seconds(300),
backoff_rate: 1.5,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|state: &String| state != "COMPLETED"),
});
let decision = strategy(&"COMPLETED".to_string(), 1);
assert_eq!(decision, WaitDecision::Done);
}
#[test]
fn test_wait_decision_continue_with_backoff() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(10),
initial_delay: Duration::from_seconds(5),
max_delay: Duration::from_seconds(300),
backoff_rate: 2.0,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|state: &String| state != "DONE"),
});
let decision = strategy(&"PENDING".to_string(), 1);
assert_eq!(
decision,
WaitDecision::Continue {
delay: Duration::from_seconds(5)
}
);
let decision = strategy(&"PENDING".to_string(), 2);
assert_eq!(
decision,
WaitDecision::Continue {
delay: Duration::from_seconds(10)
}
);
let decision = strategy(&"PENDING".to_string(), 3);
assert_eq!(
decision,
WaitDecision::Continue {
delay: Duration::from_seconds(20)
}
);
}
#[test]
fn test_wait_strategy_delay_capped_at_max() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(20),
initial_delay: Duration::from_seconds(10),
max_delay: Duration::from_seconds(30),
backoff_rate: 2.0,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 3);
assert_eq!(
decision,
WaitDecision::Continue {
delay: Duration::from_seconds(30)
}
);
let decision = strategy(&0, 5);
assert_eq!(
decision,
WaitDecision::Continue {
delay: Duration::from_seconds(30)
}
);
}
#[test]
fn test_wait_strategy_max_attempts_returns_done() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(3),
initial_delay: Duration::from_seconds(5),
max_delay: Duration::from_seconds(300),
backoff_rate: 1.5,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 3);
assert_eq!(decision, WaitDecision::Done);
}
#[test]
fn test_wait_strategy_jitter_application() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(10),
initial_delay: Duration::from_seconds(10),
max_delay: Duration::from_seconds(300),
backoff_rate: 1.0,
jitter: JitterStrategy::Full,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 1);
match decision {
WaitDecision::Continue { delay } => {
assert!(
delay.to_seconds() >= 1 && delay.to_seconds() <= 10,
"Jittered delay {} should be in [1, 10]",
delay.to_seconds()
);
}
WaitDecision::Done => panic!("Expected Continue, got Done"),
}
}
#[test]
fn test_wait_strategy_delay_minimum_floor() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: Some(10),
initial_delay: Duration::from_seconds(1),
max_delay: Duration::from_seconds(300),
backoff_rate: 1.0,
jitter: JitterStrategy::Full,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 1);
match decision {
WaitDecision::Continue { delay } => {
assert!(
delay.to_seconds() >= 1,
"Delay {} should be at least 1 second",
delay.to_seconds()
);
}
WaitDecision::Done => panic!("Expected Continue, got Done"),
}
}
#[test]
fn test_wait_strategy_default_max_attempts() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: None, initial_delay: Duration::from_seconds(1),
max_delay: Duration::from_seconds(10),
backoff_rate: 1.0,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 59);
assert!(matches!(decision, WaitDecision::Continue { .. }));
}
#[test]
fn test_wait_strategy_default_max_attempts_returns_done() {
let strategy = create_wait_strategy(WaitStrategyConfig {
max_attempts: None, initial_delay: Duration::from_seconds(1),
max_delay: Duration::from_seconds(10),
backoff_rate: 1.0,
jitter: JitterStrategy::None,
should_continue_polling: Box::new(|_: &i32| true),
});
let decision = strategy(&0, 60);
assert_eq!(decision, WaitDecision::Done);
}
#[test]
fn test_wait_decision_enum_variants() {
let cont = WaitDecision::Continue {
delay: Duration::from_seconds(5),
};
let done = WaitDecision::Done;
assert!(format!("{:?}", cont).contains("Continue"));
assert!(format!("{:?}", done).contains("Done"));
assert_eq!(
WaitDecision::Continue {
delay: Duration::from_seconds(5)
},
WaitDecision::Continue {
delay: Duration::from_seconds(5)
}
);
assert_ne!(
WaitDecision::Continue {
delay: Duration::from_seconds(5)
},
WaitDecision::Done
);
}
}