1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use blake2::{Blake2b512, Digest};
22use serde::{Deserialize, Serialize};
23
24use crate::duration::Duration;
25use crate::error::DurableError;
26use crate::sealed::Sealed;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
56pub enum JitterStrategy {
57 #[default]
59 None,
60 Full,
62 Half,
64}
65
66impl JitterStrategy {
67 pub fn apply(&self, delay_secs: f64, attempt: u32) -> f64 {
85 match self {
86 JitterStrategy::None => delay_secs,
87 JitterStrategy::Full => {
88 let factor = deterministic_random_factor(attempt);
89 factor * delay_secs
90 }
91 JitterStrategy::Half => {
92 let factor = deterministic_random_factor(attempt);
93 delay_secs / 2.0 + factor * (delay_secs / 2.0)
94 }
95 }
96 }
97}
98
99fn deterministic_random_factor(attempt: u32) -> f64 {
104 let mut hasher = Blake2b512::new();
105 hasher.update(b"jitter");
106 hasher.update(attempt.to_le_bytes());
107 let result = hasher.finalize();
108
109 let mut bytes = [0u8; 8];
111 bytes.copy_from_slice(&result[..8]);
112 let value = u64::from_le_bytes(bytes);
113 (value as f64) / (u64::MAX as f64)
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum WaitDecision {
137 Continue { delay: Duration },
139 Done,
141}
142
143pub struct WaitStrategyConfig<T> {
173 pub max_attempts: Option<usize>,
175 pub initial_delay: Duration,
177 pub max_delay: Duration,
179 pub backoff_rate: f64,
181 pub jitter: JitterStrategy,
183 pub should_continue_polling: Box<dyn Fn(&T) -> bool + Send + Sync>,
185}
186
187#[allow(clippy::type_complexity)]
200pub fn create_wait_strategy<T: Send + Sync + 'static>(
201 config: WaitStrategyConfig<T>,
202) -> Box<dyn Fn(&T, usize) -> WaitDecision + Send + Sync> {
203 let max_attempts = config.max_attempts.unwrap_or(60);
204 let initial_delay_secs = config.initial_delay.to_seconds() as f64;
205 let max_delay_secs = config.max_delay.to_seconds() as f64;
206 let backoff_rate = config.backoff_rate;
207 let jitter = config.jitter;
208 let should_continue = config.should_continue_polling;
209
210 Box::new(move |result: &T, attempts_made: usize| -> WaitDecision {
211 if !should_continue(result) {
213 return WaitDecision::Done;
214 }
215
216 if attempts_made >= max_attempts {
219 return WaitDecision::Done;
220 }
221
222 let exponent = if attempts_made > 0 {
224 (attempts_made as i32) - 1
225 } else {
226 0
227 };
228 let base_delay = (initial_delay_secs * backoff_rate.powi(exponent)).min(max_delay_secs);
229
230 let jittered = jitter.apply(base_delay, attempts_made as u32);
232 let final_delay = jittered.max(1.0).round() as u64;
233
234 WaitDecision::Continue {
235 delay: Duration::from_seconds(final_delay),
236 }
237 })
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
287pub enum CheckpointingMode {
288 Eager,
307
308 Batched,
328
329 Optimistic,
349}
350
351impl Default for CheckpointingMode {
352 fn default() -> Self {
357 Self::Batched
358 }
359}
360
361impl CheckpointingMode {
362 pub fn is_eager(&self) -> bool {
364 matches!(self, Self::Eager)
365 }
366
367 pub fn is_batched(&self) -> bool {
369 matches!(self, Self::Batched)
370 }
371
372 pub fn is_optimistic(&self) -> bool {
374 matches!(self, Self::Optimistic)
375 }
376
377 pub fn description(&self) -> &'static str {
379 match self {
380 Self::Eager => "Checkpoint after every operation (maximum durability)",
381 Self::Batched => "Batch operations before checkpointing (balanced)",
382 Self::Optimistic => {
383 "Execute multiple operations before checkpointing (best performance)"
384 }
385 }
386 }
387}
388
389#[allow(private_bounds)]
398pub trait RetryStrategy: Sealed + Send + Sync {
399 fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration>;
401
402 fn clone_box(&self) -> Box<dyn RetryStrategy>;
404}
405
406impl Clone for Box<dyn RetryStrategy> {
407 fn clone(&self) -> Self {
408 self.clone_box()
409 }
410}
411
412#[derive(Debug, Clone)]
438pub struct ExponentialBackoff {
439 pub max_attempts: u32,
441 pub base_delay: Duration,
443 pub max_delay: Duration,
445 pub multiplier: f64,
447 pub jitter: JitterStrategy,
449}
450
451impl ExponentialBackoff {
452 pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
459 Self {
460 max_attempts,
461 base_delay,
462 max_delay: Duration::from_hours(1),
463 multiplier: 2.0,
464 jitter: JitterStrategy::None,
465 }
466 }
467
468 pub fn builder() -> ExponentialBackoffBuilder {
470 ExponentialBackoffBuilder::default()
471 }
472}
473
474impl Sealed for ExponentialBackoff {}
475
476impl RetryStrategy for ExponentialBackoff {
477 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
478 if attempt >= self.max_attempts {
479 return None;
480 }
481
482 let base_seconds = self.base_delay.to_seconds() as f64;
483 let delay_seconds = base_seconds * self.multiplier.powi(attempt as i32);
484 let max_seconds = self.max_delay.to_seconds() as f64;
485 let capped_seconds = delay_seconds.min(max_seconds);
486
487 let jittered = self.jitter.apply(capped_seconds, attempt);
488 let final_seconds = jittered.max(1.0);
489
490 Some(Duration::from_seconds(final_seconds as u64))
491 }
492
493 fn clone_box(&self) -> Box<dyn RetryStrategy> {
494 Box::new(self.clone())
495 }
496}
497
498#[derive(Debug, Clone)]
500pub struct ExponentialBackoffBuilder {
501 max_attempts: u32,
502 base_delay: Duration,
503 max_delay: Duration,
504 multiplier: f64,
505 jitter: JitterStrategy,
506}
507
508impl Default for ExponentialBackoffBuilder {
509 fn default() -> Self {
510 Self {
511 max_attempts: 3,
512 base_delay: Duration::from_seconds(1),
513 max_delay: Duration::from_hours(1),
514 multiplier: 2.0,
515 jitter: JitterStrategy::None,
516 }
517 }
518}
519
520impl ExponentialBackoffBuilder {
521 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
523 self.max_attempts = max_attempts;
524 self
525 }
526
527 pub fn base_delay(mut self, base_delay: Duration) -> Self {
529 self.base_delay = base_delay;
530 self
531 }
532
533 pub fn max_delay(mut self, max_delay: Duration) -> Self {
535 self.max_delay = max_delay;
536 self
537 }
538
539 pub fn multiplier(mut self, multiplier: f64) -> Self {
541 self.multiplier = multiplier;
542 self
543 }
544
545 pub fn jitter(mut self, jitter: JitterStrategy) -> Self {
547 self.jitter = jitter;
548 self
549 }
550
551 pub fn build(self) -> ExponentialBackoff {
553 ExponentialBackoff {
554 max_attempts: self.max_attempts,
555 base_delay: self.base_delay,
556 max_delay: self.max_delay,
557 multiplier: self.multiplier,
558 jitter: self.jitter,
559 }
560 }
561}
562
563#[derive(Debug, Clone)]
577pub struct FixedDelay {
578 pub max_attempts: u32,
580 pub delay: Duration,
582 pub jitter: JitterStrategy,
584}
585
586impl FixedDelay {
587 pub fn new(max_attempts: u32, delay: Duration) -> Self {
594 Self {
595 max_attempts,
596 delay,
597 jitter: JitterStrategy::None,
598 }
599 }
600
601 pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
603 self.jitter = jitter;
604 self
605 }
606}
607
608impl Sealed for FixedDelay {}
609
610impl RetryStrategy for FixedDelay {
611 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
612 if attempt >= self.max_attempts {
613 return None;
614 }
615
616 let delay_secs = self.delay.to_seconds() as f64;
617 let jittered = self.jitter.apply(delay_secs, attempt);
618 let final_seconds = jittered.max(1.0);
619
620 Some(Duration::from_seconds(final_seconds as u64))
621 }
622
623 fn clone_box(&self) -> Box<dyn RetryStrategy> {
624 Box::new(self.clone())
625 }
626}
627
628#[derive(Debug, Clone)]
642pub struct LinearBackoff {
643 pub max_attempts: u32,
645 pub base_delay: Duration,
647 pub max_delay: Duration,
649 pub jitter: JitterStrategy,
651}
652
653impl LinearBackoff {
654 pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
661 Self {
662 max_attempts,
663 base_delay,
664 max_delay: Duration::from_hours(1),
665 jitter: JitterStrategy::None,
666 }
667 }
668
669 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
671 self.max_delay = max_delay;
672 self
673 }
674
675 pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
677 self.jitter = jitter;
678 self
679 }
680}
681
682impl Sealed for LinearBackoff {}
683
684impl RetryStrategy for LinearBackoff {
685 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
686 if attempt >= self.max_attempts {
687 return None;
688 }
689
690 let base_seconds = self.base_delay.to_seconds();
691 let delay_seconds = base_seconds.saturating_mul((attempt + 1) as u64);
692 let max_seconds = self.max_delay.to_seconds();
693 let capped_seconds = delay_seconds.min(max_seconds) as f64;
694
695 let jittered = self.jitter.apply(capped_seconds, attempt);
696 let final_seconds = jittered.max(1.0);
697
698 Some(Duration::from_seconds(final_seconds as u64))
699 }
700
701 fn clone_box(&self) -> Box<dyn RetryStrategy> {
702 Box::new(self.clone())
703 }
704}
705
706#[derive(Debug, Clone, Copy, Default)]
716pub struct NoRetry;
717
718impl Sealed for NoRetry {}
719
720impl RetryStrategy for NoRetry {
721 fn next_delay(&self, _attempt: u32, _error: &str) -> Option<Duration> {
722 None
723 }
724
725 fn clone_box(&self) -> Box<dyn RetryStrategy> {
726 Box::new(*self)
727 }
728}
729
730#[derive(Clone)]
744pub enum ErrorPattern {
745 Contains(String),
747 Regex(regex::Regex),
749}
750
751impl std::fmt::Debug for ErrorPattern {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 match self {
754 ErrorPattern::Contains(s) => f.debug_tuple("Contains").field(s).finish(),
755 ErrorPattern::Regex(r) => f.debug_tuple("Regex").field(&r.as_str()).finish(),
756 }
757 }
758}
759
760#[derive(Clone, Debug, Default)]
786pub struct RetryableErrorFilter {
787 pub patterns: Vec<ErrorPattern>,
789 pub error_types: Vec<String>,
791}
792
793impl RetryableErrorFilter {
794 pub fn is_retryable(&self, error_msg: &str) -> bool {
801 if self.patterns.is_empty() && self.error_types.is_empty() {
802 return true;
803 }
804
805 self.patterns.iter().any(|p| match p {
806 ErrorPattern::Contains(s) => error_msg.contains(s.as_str()),
807 ErrorPattern::Regex(r) => r.is_match(error_msg),
808 })
809 }
810
811 pub fn is_retryable_with_type(&self, error_msg: &str, error_type: &str) -> bool {
818 if self.patterns.is_empty() && self.error_types.is_empty() {
819 return true;
820 }
821
822 let matches_type = self.error_types.iter().any(|t| t == error_type);
823 matches_type || self.is_retryable(error_msg)
824 }
825}
826
827pub fn custom_retry<F>(f: F) -> CustomRetry<F>
851where
852 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
853{
854 CustomRetry { f }
855}
856
857#[derive(Clone)]
861pub struct CustomRetry<F>
862where
863 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
864{
865 f: F,
866}
867
868impl<F> std::fmt::Debug for CustomRetry<F>
869where
870 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
871{
872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873 f.debug_struct("CustomRetry").finish()
874 }
875}
876
877impl<F> Sealed for CustomRetry<F> where
878 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static
879{
880}
881
882impl<F> RetryStrategy for CustomRetry<F>
883where
884 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
885{
886 fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration> {
887 (self.f)(attempt, error)
888 }
889
890 fn clone_box(&self) -> Box<dyn RetryStrategy> {
891 Box::new(self.clone())
892 }
893}
894
895#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
901pub enum StepSemantics {
902 AtMostOncePerRetry,
904 #[default]
906 AtLeastOncePerRetry,
907}
908
909#[derive(Clone, Default)]
934pub struct StepConfig {
935 pub retry_strategy: Option<Box<dyn RetryStrategy>>,
937 pub step_semantics: StepSemantics,
939 pub serdes: Option<Arc<dyn SerDesAny>>,
941 pub retryable_error_filter: Option<RetryableErrorFilter>,
945}
946
947impl std::fmt::Debug for StepConfig {
948 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
949 f.debug_struct("StepConfig")
950 .field("retry_strategy", &self.retry_strategy.is_some())
951 .field("step_semantics", &self.step_semantics)
952 .field("serdes", &self.serdes.is_some())
953 .field(
954 "retryable_error_filter",
955 &self.retryable_error_filter.is_some(),
956 )
957 .finish()
958 }
959}
960
961#[derive(Debug, Clone, Default)]
963pub struct CallbackConfig {
964 pub timeout: Duration,
966 pub heartbeat_timeout: Duration,
968 pub serdes: Option<Arc<dyn SerDesAny>>,
970}
971
972#[derive(Clone)]
974pub struct InvokeConfig<P, R> {
975 pub timeout: Duration,
977 pub serdes_payload: Option<Arc<dyn SerDesAny>>,
979 pub serdes_result: Option<Arc<dyn SerDesAny>>,
981 pub tenant_id: Option<String>,
983 _marker: PhantomData<(P, R)>,
985}
986
987impl<P, R> Default for InvokeConfig<P, R> {
988 fn default() -> Self {
989 Self {
990 timeout: Duration::default(),
991 serdes_payload: None,
992 serdes_result: None,
993 tenant_id: None,
994 _marker: PhantomData,
995 }
996 }
997}
998
999impl<P, R> std::fmt::Debug for InvokeConfig<P, R> {
1000 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001 f.debug_struct("InvokeConfig")
1002 .field("timeout", &self.timeout)
1003 .field("serdes_payload", &self.serdes_payload.is_some())
1004 .field("serdes_result", &self.serdes_result.is_some())
1005 .field("tenant_id", &self.tenant_id)
1006 .finish()
1007 }
1008}
1009
1010#[derive(Debug, Clone, Default)]
1037pub struct MapConfig {
1038 pub max_concurrency: Option<usize>,
1040 pub item_batcher: Option<ItemBatcher>,
1042 pub completion_config: CompletionConfig,
1044 pub serdes: Option<Arc<dyn SerDesAny>>,
1046}
1047
1048#[derive(Debug, Clone, Default)]
1050pub struct ParallelConfig {
1051 pub max_concurrency: Option<usize>,
1053 pub completion_config: CompletionConfig,
1055 pub serdes: Option<Arc<dyn SerDesAny>>,
1057}
1058
1059#[derive(Clone, Default)]
1064#[allow(clippy::type_complexity)]
1065pub struct ChildConfig {
1066 pub serdes: Option<Arc<dyn SerDesAny>>,
1068 pub replay_children: bool,
1077 pub error_mapper: Option<Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>>,
1084 pub summary_generator: Option<Arc<dyn Fn(&str) -> String + Send + Sync>>,
1095}
1096
1097impl std::fmt::Debug for ChildConfig {
1098 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1099 f.debug_struct("ChildConfig")
1100 .field("serdes", &self.serdes)
1101 .field("replay_children", &self.replay_children)
1102 .field("error_mapper", &self.error_mapper.as_ref().map(|_| "..."))
1103 .field(
1104 "summary_generator",
1105 &self.summary_generator.as_ref().map(|_| "..."),
1106 )
1107 .finish()
1108 }
1109}
1110
1111impl ChildConfig {
1112 pub fn new() -> Self {
1114 Self::default()
1115 }
1116
1117 pub fn with_replay_children() -> Self {
1131 Self {
1132 replay_children: true,
1133 ..Default::default()
1134 }
1135 }
1136
1137 pub fn set_replay_children(mut self, replay_children: bool) -> Self {
1143 self.replay_children = replay_children;
1144 self
1145 }
1146
1147 pub fn set_serdes(mut self, serdes: Arc<dyn SerDesAny>) -> Self {
1149 self.serdes = Some(serdes);
1150 self
1151 }
1152
1153 pub fn set_error_mapper(
1158 mut self,
1159 mapper: Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>,
1160 ) -> Self {
1161 self.error_mapper = Some(mapper);
1162 self
1163 }
1164
1165 pub fn set_summary_generator(
1171 mut self,
1172 generator: Arc<dyn Fn(&str) -> String + Send + Sync>,
1173 ) -> Self {
1174 self.summary_generator = Some(generator);
1175 self
1176 }
1177}
1178
1179pub type ContextConfig = ChildConfig;
1184
1185#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1187pub struct CompletionConfig {
1188 pub min_successful: Option<usize>,
1190 pub tolerated_failure_count: Option<usize>,
1192 pub tolerated_failure_percentage: Option<f64>,
1194}
1195
1196impl CompletionConfig {
1197 pub fn first_successful() -> Self {
1208 Self {
1209 min_successful: Some(1),
1210 ..Default::default()
1211 }
1212 }
1213
1214 pub fn all_completed() -> Self {
1225 Self::default()
1226 }
1227
1228 pub fn all_successful() -> Self {
1240 Self {
1241 tolerated_failure_count: Some(0),
1242 tolerated_failure_percentage: Some(0.0),
1243 ..Default::default()
1244 }
1245 }
1246
1247 pub fn with_min_successful(count: usize) -> Self {
1249 Self {
1250 min_successful: Some(count),
1251 ..Default::default()
1252 }
1253 }
1254
1255 pub fn with_failure_tolerance(count: usize) -> Self {
1257 Self {
1258 tolerated_failure_count: Some(count),
1259 ..Default::default()
1260 }
1261 }
1262}
1263
1264#[derive(Debug, Clone)]
1266pub struct ItemBatcher {
1267 pub max_items_per_batch: usize,
1269 pub max_bytes_per_batch: usize,
1271}
1272
1273impl Default for ItemBatcher {
1274 fn default() -> Self {
1275 Self {
1276 max_items_per_batch: 100,
1277 max_bytes_per_batch: 256 * 1024, }
1279 }
1280}
1281
1282impl ItemBatcher {
1283 pub fn new(max_items_per_batch: usize, max_bytes_per_batch: usize) -> Self {
1285 Self {
1286 max_items_per_batch,
1287 max_bytes_per_batch,
1288 }
1289 }
1290
1291 pub fn batch<T: Serialize + Clone>(&self, items: &[T]) -> Vec<(usize, Vec<T>)> {
1324 if items.is_empty() {
1325 return Vec::new();
1326 }
1327
1328 let mut batches = Vec::new();
1329 let mut current_batch = Vec::new();
1330 let mut current_bytes = 0usize;
1331 let mut batch_start_index = 0;
1332
1333 for (i, item) in items.iter().enumerate() {
1334 let item_bytes = serde_json::to_string(item).map(|s| s.len()).unwrap_or(0);
1336
1337 let would_exceed_items = current_batch.len() >= self.max_items_per_batch;
1339 let would_exceed_bytes =
1340 current_bytes + item_bytes > self.max_bytes_per_batch && !current_batch.is_empty();
1341
1342 if would_exceed_items || would_exceed_bytes {
1343 batches.push((batch_start_index, std::mem::take(&mut current_batch)));
1345 current_bytes = 0;
1346 batch_start_index = i;
1347 }
1348
1349 current_batch.push(item.clone());
1350 current_bytes += item_bytes;
1351 }
1352
1353 if !current_batch.is_empty() {
1355 batches.push((batch_start_index, current_batch));
1356 }
1357
1358 batches
1359 }
1360}
1361
1362pub trait SerDesAny: Send + Sync {
1364 fn serialize_any(
1366 &self,
1367 value: &dyn std::any::Any,
1368 ) -> Result<String, crate::error::DurableError>;
1369 fn deserialize_any(
1371 &self,
1372 data: &str,
1373 ) -> Result<Box<dyn std::any::Any + Send>, crate::error::DurableError>;
1374}
1375
1376impl std::fmt::Debug for dyn SerDesAny {
1377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1378 f.write_str("SerDesAny")
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use super::*;
1385 use proptest::prelude::*;
1386
1387 #[test]
1392 fn test_step_semantics_default() {
1393 let semantics = StepSemantics::default();
1394 assert_eq!(semantics, StepSemantics::AtLeastOncePerRetry);
1395 }
1396
1397 #[test]
1398 fn test_step_config_default() {
1399 let config = StepConfig::default();
1400 assert!(config.retry_strategy.is_none());
1401 assert_eq!(config.step_semantics, StepSemantics::AtLeastOncePerRetry);
1402 assert!(config.serdes.is_none());
1403 }
1404
1405 #[test]
1406 fn test_completion_config_first_successful() {
1407 let config = CompletionConfig::first_successful();
1408 assert_eq!(config.min_successful, Some(1));
1409 assert!(config.tolerated_failure_count.is_none());
1410 assert!(config.tolerated_failure_percentage.is_none());
1411 }
1412
1413 #[test]
1414 fn test_completion_config_all_completed() {
1415 let config = CompletionConfig::all_completed();
1416 assert!(config.min_successful.is_none());
1417 assert!(config.tolerated_failure_count.is_none());
1418 assert!(config.tolerated_failure_percentage.is_none());
1419 }
1420
1421 #[test]
1422 fn test_completion_config_all_successful() {
1423 let config = CompletionConfig::all_successful();
1424 assert!(config.min_successful.is_none());
1425 assert_eq!(config.tolerated_failure_count, Some(0));
1426 assert_eq!(config.tolerated_failure_percentage, Some(0.0));
1427 }
1428
1429 #[test]
1430 fn test_item_batcher_default() {
1431 let batcher = ItemBatcher::default();
1432 assert_eq!(batcher.max_items_per_batch, 100);
1433 assert_eq!(batcher.max_bytes_per_batch, 256 * 1024);
1434 }
1435
1436 #[test]
1437 fn test_item_batcher_new() {
1438 let batcher = ItemBatcher::new(50, 128 * 1024);
1439 assert_eq!(batcher.max_items_per_batch, 50);
1440 assert_eq!(batcher.max_bytes_per_batch, 128 * 1024);
1441 }
1442
1443 #[test]
1444 fn test_callback_config_default() {
1445 let config = CallbackConfig::default();
1446 assert_eq!(config.timeout.to_seconds(), 0);
1447 assert_eq!(config.heartbeat_timeout.to_seconds(), 0);
1448 }
1449
1450 #[test]
1451 fn test_invoke_config_default() {
1452 let config: InvokeConfig<String, String> = InvokeConfig::default();
1453 assert_eq!(config.timeout.to_seconds(), 0);
1454 assert!(config.tenant_id.is_none());
1455 }
1456
1457 #[test]
1458 fn test_map_config_default() {
1459 let config = MapConfig::default();
1460 assert!(config.max_concurrency.is_none());
1461 assert!(config.item_batcher.is_none());
1462 }
1463
1464 #[test]
1465 fn test_parallel_config_default() {
1466 let config = ParallelConfig::default();
1467 assert!(config.max_concurrency.is_none());
1468 }
1469
1470 #[test]
1471 fn test_child_config_default() {
1472 let config = ChildConfig::default();
1473 assert!(!config.replay_children);
1474 assert!(config.serdes.is_none());
1475 assert!(config.error_mapper.is_none());
1476 assert!(config.summary_generator.is_none());
1477 }
1478
1479 #[test]
1480 fn test_child_config_with_replay_children() {
1481 let config = ChildConfig::with_replay_children();
1482 assert!(config.replay_children);
1483 }
1484
1485 #[test]
1486 fn test_child_config_set_replay_children() {
1487 let config = ChildConfig::new().set_replay_children(true);
1488 assert!(config.replay_children);
1489 }
1490
1491 #[test]
1492 fn test_context_config_type_alias() {
1493 let config: ContextConfig = ContextConfig::with_replay_children();
1495 assert!(config.replay_children);
1496 }
1497
1498 #[test]
1499 fn test_checkpointing_mode_default() {
1500 let mode = CheckpointingMode::default();
1501 assert_eq!(mode, CheckpointingMode::Batched);
1502 assert!(mode.is_batched());
1503 }
1504
1505 #[test]
1506 fn test_checkpointing_mode_eager() {
1507 let mode = CheckpointingMode::Eager;
1508 assert!(mode.is_eager());
1509 assert!(!mode.is_batched());
1510 assert!(!mode.is_optimistic());
1511 }
1512
1513 #[test]
1514 fn test_checkpointing_mode_batched() {
1515 let mode = CheckpointingMode::Batched;
1516 assert!(!mode.is_eager());
1517 assert!(mode.is_batched());
1518 assert!(!mode.is_optimistic());
1519 }
1520
1521 #[test]
1522 fn test_checkpointing_mode_optimistic() {
1523 let mode = CheckpointingMode::Optimistic;
1524 assert!(!mode.is_eager());
1525 assert!(!mode.is_batched());
1526 assert!(mode.is_optimistic());
1527 }
1528
1529 #[test]
1530 fn test_checkpointing_mode_description() {
1531 assert!(CheckpointingMode::Eager
1532 .description()
1533 .contains("maximum durability"));
1534 assert!(CheckpointingMode::Batched
1535 .description()
1536 .contains("balanced"));
1537 assert!(CheckpointingMode::Optimistic
1538 .description()
1539 .contains("best performance"));
1540 }
1541
1542 #[test]
1543 fn test_checkpointing_mode_serialization() {
1544 let mode = CheckpointingMode::Eager;
1546 let serialized = serde_json::to_string(&mode).unwrap();
1547 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1548 assert_eq!(mode, deserialized);
1549
1550 let mode = CheckpointingMode::Batched;
1551 let serialized = serde_json::to_string(&mode).unwrap();
1552 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1553 assert_eq!(mode, deserialized);
1554
1555 let mode = CheckpointingMode::Optimistic;
1556 let serialized = serde_json::to_string(&mode).unwrap();
1557 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1558 assert_eq!(mode, deserialized);
1559 }
1560
1561 #[test]
1566 fn test_exponential_backoff_new() {
1567 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
1568 assert_eq!(strategy.max_attempts, 5);
1569 assert_eq!(strategy.base_delay.to_seconds(), 1);
1570 assert_eq!(strategy.max_delay.to_seconds(), 3600); assert!((strategy.multiplier - 2.0).abs() < f64::EPSILON);
1572 }
1573
1574 #[test]
1575 fn test_exponential_backoff_builder() {
1576 let strategy = ExponentialBackoff::builder()
1577 .max_attempts(10)
1578 .base_delay(Duration::from_seconds(2))
1579 .max_delay(Duration::from_minutes(30))
1580 .multiplier(3.0)
1581 .build();
1582
1583 assert_eq!(strategy.max_attempts, 10);
1584 assert_eq!(strategy.base_delay.to_seconds(), 2);
1585 assert_eq!(strategy.max_delay.to_seconds(), 1800); assert!((strategy.multiplier - 3.0).abs() < f64::EPSILON);
1587 }
1588
1589 #[test]
1590 fn test_exponential_backoff_delays() {
1591 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
1592
1593 assert_eq!(
1595 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1596 Some(1)
1597 );
1598 assert_eq!(
1600 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1601 Some(2)
1602 );
1603 assert_eq!(
1605 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1606 Some(4)
1607 );
1608 assert_eq!(
1610 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1611 Some(8)
1612 );
1613 assert_eq!(
1615 strategy.next_delay(4, "error").map(|d| d.to_seconds()),
1616 Some(16)
1617 );
1618 assert_eq!(strategy.next_delay(5, "error"), None);
1620 }
1621
1622 #[test]
1623 fn test_exponential_backoff_max_delay_cap() {
1624 let strategy = ExponentialBackoff::builder()
1625 .max_attempts(10)
1626 .base_delay(Duration::from_seconds(10))
1627 .max_delay(Duration::from_seconds(30))
1628 .build();
1629
1630 assert_eq!(
1632 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1633 Some(10)
1634 );
1635 assert_eq!(
1637 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1638 Some(20)
1639 );
1640 assert_eq!(
1642 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1643 Some(30)
1644 );
1645 assert_eq!(
1647 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1648 Some(30)
1649 );
1650 }
1651
1652 #[test]
1653 fn test_fixed_delay_new() {
1654 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
1655 assert_eq!(strategy.max_attempts, 3);
1656 assert_eq!(strategy.delay.to_seconds(), 5);
1657 }
1658
1659 #[test]
1660 fn test_fixed_delay_constant() {
1661 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
1662
1663 assert_eq!(
1665 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1666 Some(5)
1667 );
1668 assert_eq!(
1669 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1670 Some(5)
1671 );
1672 assert_eq!(
1673 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1674 Some(5)
1675 );
1676 assert_eq!(strategy.next_delay(3, "error"), None);
1678 }
1679
1680 #[test]
1681 fn test_linear_backoff_new() {
1682 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
1683 assert_eq!(strategy.max_attempts, 5);
1684 assert_eq!(strategy.base_delay.to_seconds(), 2);
1685 assert_eq!(strategy.max_delay.to_seconds(), 3600); }
1687
1688 #[test]
1689 fn test_linear_backoff_with_max_delay() {
1690 let strategy = LinearBackoff::new(5, Duration::from_seconds(2))
1691 .with_max_delay(Duration::from_seconds(10));
1692 assert_eq!(strategy.max_delay.to_seconds(), 10);
1693 }
1694
1695 #[test]
1696 fn test_linear_backoff_delays() {
1697 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
1698
1699 assert_eq!(
1701 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1702 Some(2)
1703 );
1704 assert_eq!(
1706 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1707 Some(4)
1708 );
1709 assert_eq!(
1711 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1712 Some(6)
1713 );
1714 assert_eq!(
1716 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1717 Some(8)
1718 );
1719 assert_eq!(
1721 strategy.next_delay(4, "error").map(|d| d.to_seconds()),
1722 Some(10)
1723 );
1724 assert_eq!(strategy.next_delay(5, "error"), None);
1726 }
1727
1728 #[test]
1729 fn test_linear_backoff_max_delay_cap() {
1730 let strategy = LinearBackoff::new(10, Duration::from_seconds(5))
1731 .with_max_delay(Duration::from_seconds(15));
1732
1733 assert_eq!(
1735 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1736 Some(5)
1737 );
1738 assert_eq!(
1740 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1741 Some(10)
1742 );
1743 assert_eq!(
1745 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1746 Some(15)
1747 );
1748 assert_eq!(
1750 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1751 Some(15)
1752 );
1753 }
1754
1755 #[test]
1756 fn test_no_retry() {
1757 let strategy = NoRetry;
1758
1759 assert_eq!(strategy.next_delay(0, "error"), None);
1761 assert_eq!(strategy.next_delay(1, "error"), None);
1762 assert_eq!(strategy.next_delay(100, "error"), None);
1763 }
1764
1765 #[test]
1766 fn test_no_retry_default() {
1767 let strategy = NoRetry::default();
1768 assert_eq!(strategy.next_delay(0, "error"), None);
1769 }
1770
1771 #[test]
1772 fn test_custom_retry_basic() {
1773 let strategy = custom_retry(|attempt, _error| {
1774 if attempt >= 3 {
1775 None
1776 } else {
1777 Some(Duration::from_seconds(10))
1778 }
1779 });
1780
1781 assert_eq!(
1782 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1783 Some(10)
1784 );
1785 assert_eq!(
1786 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1787 Some(10)
1788 );
1789 assert_eq!(
1790 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1791 Some(10)
1792 );
1793 assert_eq!(strategy.next_delay(3, "error"), None);
1794 }
1795
1796 #[test]
1797 fn test_custom_retry_error_based() {
1798 let strategy = custom_retry(|attempt, error| {
1799 if attempt >= 5 {
1800 return None;
1801 }
1802 if error.contains("transient") {
1803 Some(Duration::from_seconds(1))
1804 } else if error.contains("rate_limit") {
1805 Some(Duration::from_seconds(30))
1806 } else {
1807 None }
1809 });
1810
1811 assert_eq!(
1813 strategy
1814 .next_delay(0, "transient error")
1815 .map(|d| d.to_seconds()),
1816 Some(1)
1817 );
1818 assert_eq!(
1820 strategy
1821 .next_delay(0, "rate_limit exceeded")
1822 .map(|d| d.to_seconds()),
1823 Some(30)
1824 );
1825 assert_eq!(strategy.next_delay(0, "permanent failure"), None);
1827 }
1828
1829 #[test]
1830 fn test_retry_strategy_clone_box() {
1831 let exp: Box<dyn RetryStrategy> =
1833 Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
1834 let exp_clone = exp.clone_box();
1835 assert_eq!(
1836 exp.next_delay(0, "e").map(|d| d.to_seconds()),
1837 exp_clone.next_delay(0, "e").map(|d| d.to_seconds())
1838 );
1839
1840 let fixed: Box<dyn RetryStrategy> = Box::new(FixedDelay::new(3, Duration::from_seconds(5)));
1841 let fixed_clone = fixed.clone_box();
1842 assert_eq!(
1843 fixed.next_delay(0, "e").map(|d| d.to_seconds()),
1844 fixed_clone.next_delay(0, "e").map(|d| d.to_seconds())
1845 );
1846
1847 let linear: Box<dyn RetryStrategy> =
1848 Box::new(LinearBackoff::new(3, Duration::from_seconds(2)));
1849 let linear_clone = linear.clone_box();
1850 assert_eq!(
1851 linear.next_delay(0, "e").map(|d| d.to_seconds()),
1852 linear_clone.next_delay(0, "e").map(|d| d.to_seconds())
1853 );
1854
1855 let no_retry: Box<dyn RetryStrategy> = Box::new(NoRetry);
1856 let no_retry_clone = no_retry.clone_box();
1857 assert_eq!(
1858 no_retry.next_delay(0, "e"),
1859 no_retry_clone.next_delay(0, "e")
1860 );
1861 }
1862
1863 #[test]
1864 fn test_boxed_retry_strategy_clone() {
1865 let strategy: Box<dyn RetryStrategy> =
1867 Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
1868 let cloned = strategy.clone();
1869
1870 assert_eq!(
1871 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1872 cloned.next_delay(0, "error").map(|d| d.to_seconds())
1873 );
1874 }
1875
1876 #[test]
1877 fn test_step_config_with_retry_strategy() {
1878 let config = StepConfig {
1879 retry_strategy: Some(Box::new(ExponentialBackoff::new(
1880 3,
1881 Duration::from_seconds(1),
1882 ))),
1883 step_semantics: StepSemantics::AtLeastOncePerRetry,
1884 serdes: None,
1885 retryable_error_filter: None,
1886 };
1887
1888 assert!(config.retry_strategy.is_some());
1889 let strategy = config.retry_strategy.as_ref().unwrap();
1890 assert_eq!(
1891 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1892 Some(1)
1893 );
1894 }
1895
1896 #[test]
1897 fn test_retry_strategy_debug() {
1898 let exp = ExponentialBackoff::new(3, Duration::from_seconds(1));
1900 let debug_str = format!("{:?}", exp);
1901 assert!(debug_str.contains("ExponentialBackoff"));
1902
1903 let fixed = FixedDelay::new(3, Duration::from_seconds(5));
1904 let debug_str = format!("{:?}", fixed);
1905 assert!(debug_str.contains("FixedDelay"));
1906
1907 let linear = LinearBackoff::new(3, Duration::from_seconds(2));
1908 let debug_str = format!("{:?}", linear);
1909 assert!(debug_str.contains("LinearBackoff"));
1910
1911 let no_retry = NoRetry;
1912 let debug_str = format!("{:?}", no_retry);
1913 assert!(debug_str.contains("NoRetry"));
1914
1915 let custom = custom_retry(|_, _| None);
1916 let debug_str = format!("{:?}", custom);
1917 assert!(debug_str.contains("CustomRetry"));
1918 }
1919
1920 fn step_semantics_strategy() -> impl Strategy<Value = StepSemantics> {
1926 prop_oneof![
1927 Just(StepSemantics::AtMostOncePerRetry),
1928 Just(StepSemantics::AtLeastOncePerRetry),
1929 ]
1930 }
1931
1932 fn checkpointing_mode_strategy() -> impl Strategy<Value = CheckpointingMode> {
1934 prop_oneof![
1935 Just(CheckpointingMode::Eager),
1936 Just(CheckpointingMode::Batched),
1937 Just(CheckpointingMode::Optimistic),
1938 ]
1939 }
1940
1941 proptest! {
1942 #[test]
1947 fn prop_step_config_validity(semantics in step_semantics_strategy()) {
1948 let config = StepConfig {
1949 retry_strategy: None,
1950 step_semantics: semantics,
1951 serdes: None,
1952 retryable_error_filter: None,
1953 };
1954
1955 let _ = config.retry_strategy.is_none();
1957 let _ = config.step_semantics;
1958 let _ = config.serdes.is_none();
1959
1960 let debug_str = format!("{:?}", config);
1962 prop_assert!(!debug_str.is_empty());
1963 }
1964
1965 #[test]
1969 fn prop_callback_config_positive_timeout(
1970 timeout_secs in 1u64..=86400u64,
1971 heartbeat_secs in 1u64..=86400u64
1972 ) {
1973 let config = CallbackConfig {
1974 timeout: Duration::from_seconds(timeout_secs),
1975 heartbeat_timeout: Duration::from_seconds(heartbeat_secs),
1976 serdes: None,
1977 };
1978
1979 prop_assert_eq!(config.timeout.to_seconds(), timeout_secs);
1981 prop_assert_eq!(config.heartbeat_timeout.to_seconds(), heartbeat_secs);
1982
1983 let debug_str = format!("{:?}", config);
1985 prop_assert!(!debug_str.is_empty());
1986 }
1987
1988 #[test]
1992 fn prop_duration_conversion_roundtrip(seconds in 0u64..=u64::MAX / 2) {
1993 let original = Duration::from_seconds(seconds);
1994 let extracted = original.to_seconds();
1995 let reconstructed = Duration::from_seconds(extracted);
1996
1997 prop_assert_eq!(original, reconstructed);
1998 prop_assert_eq!(original.to_seconds(), reconstructed.to_seconds());
1999 }
2000
2001 #[test]
2007 fn prop_completion_config_consistency(
2008 min_successful in proptest::option::of(0usize..100),
2009 tolerated_count in proptest::option::of(0usize..100),
2010 tolerated_pct in proptest::option::of(0.0f64..=1.0f64)
2011 ) {
2012 let config = CompletionConfig {
2013 min_successful,
2014 tolerated_failure_count: tolerated_count,
2015 tolerated_failure_percentage: tolerated_pct,
2016 };
2017
2018 prop_assert_eq!(config.min_successful, min_successful);
2020 prop_assert_eq!(config.tolerated_failure_count, tolerated_count);
2021 prop_assert_eq!(config.tolerated_failure_percentage, tolerated_pct);
2022
2023 let serialized = serde_json::to_string(&config).unwrap();
2025 let deserialized: CompletionConfig = serde_json::from_str(&serialized).unwrap();
2026
2027 prop_assert_eq!(config.min_successful, deserialized.min_successful);
2028 prop_assert_eq!(config.tolerated_failure_count, deserialized.tolerated_failure_count);
2029 match (config.tolerated_failure_percentage, deserialized.tolerated_failure_percentage) {
2031 (Some(a), Some(b)) => prop_assert!((a - b).abs() < f64::EPSILON),
2032 (None, None) => {},
2033 _ => prop_assert!(false, "tolerated_failure_percentage mismatch"),
2034 }
2035 }
2036
2037 #[test]
2041 fn prop_checkpointing_mode_roundtrip(mode in checkpointing_mode_strategy()) {
2042 let serialized = serde_json::to_string(&mode).unwrap();
2043 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
2044 prop_assert_eq!(mode, deserialized);
2045 }
2046
2047 #[test]
2051 fn prop_checkpointing_mode_classification(mode in checkpointing_mode_strategy()) {
2052 let eager = mode.is_eager();
2053 let batched = mode.is_batched();
2054 let optimistic = mode.is_optimistic();
2055
2056 let count = [eager, batched, optimistic].iter().filter(|&&x| x).count();
2058 prop_assert_eq!(count, 1, "Exactly one classification should be true");
2059
2060 match mode {
2062 CheckpointingMode::Eager => prop_assert!(eager),
2063 CheckpointingMode::Batched => prop_assert!(batched),
2064 CheckpointingMode::Optimistic => prop_assert!(optimistic),
2065 }
2066 }
2067
2068 #[test]
2072 fn prop_step_semantics_roundtrip(semantics in step_semantics_strategy()) {
2073 let serialized = serde_json::to_string(&semantics).unwrap();
2074 let deserialized: StepSemantics = serde_json::from_str(&serialized).unwrap();
2075 prop_assert_eq!(semantics, deserialized);
2076 }
2077
2078 #[test]
2082 fn prop_item_batcher_validity(
2083 max_items in 1usize..=10000,
2084 max_bytes in 1usize..=10_000_000
2085 ) {
2086 let batcher = ItemBatcher::new(max_items, max_bytes);
2087
2088 prop_assert_eq!(batcher.max_items_per_batch, max_items);
2089 prop_assert_eq!(batcher.max_bytes_per_batch, max_bytes);
2090
2091 let debug_str = format!("{:?}", batcher);
2093 prop_assert!(!debug_str.is_empty());
2094 }
2095
2096 #[test]
2100 fn prop_child_config_builder_consistency(replay_children in proptest::bool::ANY) {
2101 let config = ChildConfig::new().set_replay_children(replay_children);
2102
2103 prop_assert_eq!(config.replay_children, replay_children);
2104
2105 let debug_str = format!("{:?}", config);
2107 prop_assert!(!debug_str.is_empty());
2108 }
2109
2110 #[test]
2114 fn prop_map_config_validity(
2115 max_concurrency in proptest::option::of(1usize..=1000)
2116 ) {
2117 let config = MapConfig {
2118 max_concurrency,
2119 item_batcher: None,
2120 completion_config: CompletionConfig::default(),
2121 serdes: None,
2122 };
2123
2124 prop_assert_eq!(config.max_concurrency, max_concurrency);
2125
2126 let debug_str = format!("{:?}", config);
2128 prop_assert!(!debug_str.is_empty());
2129 }
2130
2131 #[test]
2135 fn prop_parallel_config_validity(
2136 max_concurrency in proptest::option::of(1usize..=1000)
2137 ) {
2138 let config = ParallelConfig {
2139 max_concurrency,
2140 completion_config: CompletionConfig::default(),
2141 serdes: None,
2142 };
2143
2144 prop_assert_eq!(config.max_concurrency, max_concurrency);
2145
2146 let debug_str = format!("{:?}", config);
2148 prop_assert!(!debug_str.is_empty());
2149 }
2150
2151 #[test]
2157 fn prop_item_batcher_configuration_respected(
2158 max_items in 1usize..=50,
2159 max_bytes in 100usize..=10000,
2160 item_count in 0usize..=200
2161 ) {
2162 let batcher = ItemBatcher::new(max_items, max_bytes);
2163
2164 let items: Vec<String> = (0..item_count)
2166 .map(|i| format!("item_{:04}", i))
2167 .collect();
2168
2169 let batches = batcher.batch(&items);
2170
2171 for (_, batch) in &batches {
2173 prop_assert!(
2174 batch.len() <= max_items,
2175 "Batch has {} items but max is {}",
2176 batch.len(),
2177 max_items
2178 );
2179 }
2180
2181 for (_, batch) in &batches {
2183 let batch_bytes: usize = batch.iter()
2184 .map(|item| serde_json::to_string(item).map(|s| s.len()).unwrap_or(0))
2185 .sum();
2186
2187 if batch.len() > 1 {
2190 prop_assert!(
2191 batch_bytes <= max_bytes,
2192 "Batch has {} bytes but max is {} (batch has {} items)",
2193 batch_bytes,
2194 max_bytes,
2195 batch.len()
2196 );
2197 }
2198 }
2199 }
2200
2201 #[test]
2206 fn prop_item_batcher_ordering_preservation(
2207 max_items in 1usize..=50,
2208 max_bytes in 100usize..=10000,
2209 item_count in 0usize..=200
2210 ) {
2211 let batcher = ItemBatcher::new(max_items, max_bytes);
2212
2213 let items: Vec<String> = (0..item_count)
2215 .map(|i| format!("item_{:04}", i))
2216 .collect();
2217
2218 let batches = batcher.batch(&items);
2219
2220 let reconstructed: Vec<String> = batches
2222 .into_iter()
2223 .flat_map(|(_, batch)| batch)
2224 .collect();
2225
2226 prop_assert_eq!(
2228 items.len(),
2229 reconstructed.len(),
2230 "Reconstructed list has different length: expected {}, got {}",
2231 items.len(),
2232 reconstructed.len()
2233 );
2234
2235 for (i, (original, reconstructed_item)) in items.iter().zip(reconstructed.iter()).enumerate() {
2236 prop_assert_eq!(
2237 original,
2238 reconstructed_item,
2239 "Item at index {} differs: expected '{}', got '{}'",
2240 i,
2241 original,
2242 reconstructed_item
2243 );
2244 }
2245 }
2246 }
2247
2248 #[test]
2253 fn test_jitter_strategy_none_returns_exact_delay() {
2254 let jitter = JitterStrategy::None;
2255 assert_eq!(jitter.apply(10.0, 0), 10.0);
2256 assert_eq!(jitter.apply(5.5, 3), 5.5);
2257 assert_eq!(jitter.apply(0.0, 0), 0.0);
2258 assert_eq!(jitter.apply(100.0, 99), 100.0);
2259 }
2260
2261 #[test]
2262 fn test_jitter_strategy_full_bounds() {
2263 let jitter = JitterStrategy::Full;
2264 for attempt in 0..20 {
2265 let result = jitter.apply(10.0, attempt);
2266 assert!(
2267 result >= 0.0 && result <= 10.0,
2268 "Full jitter for attempt {} produced {}, expected [0, 10]",
2269 attempt,
2270 result
2271 );
2272 }
2273 }
2274
2275 #[test]
2276 fn test_jitter_strategy_half_bounds() {
2277 let jitter = JitterStrategy::Half;
2278 for attempt in 0..20 {
2279 let result = jitter.apply(10.0, attempt);
2280 assert!(
2281 result >= 5.0 && result <= 10.0,
2282 "Half jitter for attempt {} produced {}, expected [5, 10]",
2283 attempt,
2284 result
2285 );
2286 }
2287 }
2288
2289 #[test]
2290 fn test_jitter_strategy_deterministic() {
2291 let full = JitterStrategy::Full;
2293 let r1 = full.apply(10.0, 5);
2294 let r2 = full.apply(10.0, 5);
2295 assert_eq!(r1, r2);
2296
2297 let half = JitterStrategy::Half;
2298 let r1 = half.apply(10.0, 5);
2299 let r2 = half.apply(10.0, 5);
2300 assert_eq!(r1, r2);
2301 }
2302
2303 #[test]
2304 fn test_jitter_strategy_zero_delay() {
2305 assert_eq!(JitterStrategy::Full.apply(0.0, 0), 0.0);
2307 assert_eq!(JitterStrategy::Half.apply(0.0, 0), 0.0);
2308 assert_eq!(JitterStrategy::None.apply(0.0, 0), 0.0);
2309 }
2310
2311 #[test]
2312 fn test_jitter_strategy_default_is_none() {
2313 assert_eq!(JitterStrategy::default(), JitterStrategy::None);
2314 }
2315
2316 #[test]
2321 fn test_exponential_backoff_with_full_jitter() {
2322 let strategy = ExponentialBackoff::builder()
2323 .max_attempts(5)
2324 .base_delay(Duration::from_seconds(5))
2325 .max_delay(Duration::from_seconds(60))
2326 .jitter(JitterStrategy::Full)
2327 .build();
2328
2329 for attempt in 0..5 {
2330 let delay = strategy.next_delay(attempt, "error");
2331 assert!(delay.is_some());
2332 let secs = delay.unwrap().to_seconds();
2333 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2335 }
2336 assert!(strategy.next_delay(5, "error").is_none());
2337 }
2338
2339 #[test]
2340 fn test_exponential_backoff_with_half_jitter() {
2341 let strategy = ExponentialBackoff::builder()
2342 .max_attempts(5)
2343 .base_delay(Duration::from_seconds(10))
2344 .max_delay(Duration::from_seconds(60))
2345 .jitter(JitterStrategy::Half)
2346 .build();
2347
2348 for attempt in 0..5 {
2349 let delay = strategy.next_delay(attempt, "error");
2350 assert!(delay.is_some());
2351 let secs = delay.unwrap().to_seconds();
2352 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2353 }
2354 }
2355
2356 #[test]
2357 fn test_exponential_backoff_no_jitter_unchanged() {
2358 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
2360 assert_eq!(strategy.jitter, JitterStrategy::None);
2361 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(1));
2362 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(2));
2363 assert_eq!(strategy.next_delay(2, "e").map(|d| d.to_seconds()), Some(4));
2364 }
2365
2366 #[test]
2367 fn test_fixed_delay_with_jitter() {
2368 let strategy =
2369 FixedDelay::new(3, Duration::from_seconds(10)).with_jitter(JitterStrategy::Full);
2370
2371 for attempt in 0..3 {
2372 let delay = strategy.next_delay(attempt, "error");
2373 assert!(delay.is_some());
2374 let secs = delay.unwrap().to_seconds();
2375 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2376 }
2377 assert!(strategy.next_delay(3, "error").is_none());
2378 }
2379
2380 #[test]
2381 fn test_fixed_delay_no_jitter_unchanged() {
2382 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
2383 assert_eq!(strategy.jitter, JitterStrategy::None);
2384 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(5));
2385 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(5));
2386 }
2387
2388 #[test]
2389 fn test_linear_backoff_with_jitter() {
2390 let strategy =
2391 LinearBackoff::new(5, Duration::from_seconds(5)).with_jitter(JitterStrategy::Half);
2392
2393 for attempt in 0..5 {
2394 let delay = strategy.next_delay(attempt, "error");
2395 assert!(delay.is_some());
2396 let secs = delay.unwrap().to_seconds();
2397 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2398 }
2399 assert!(strategy.next_delay(5, "error").is_none());
2400 }
2401
2402 #[test]
2403 fn test_linear_backoff_no_jitter_unchanged() {
2404 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
2405 assert_eq!(strategy.jitter, JitterStrategy::None);
2406 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(2));
2407 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(4));
2408 }
2409
2410 #[test]
2411 fn test_jitter_minimum_floor_all_strategies() {
2412 let exp = ExponentialBackoff::builder()
2414 .max_attempts(3)
2415 .base_delay(Duration::from_seconds(1))
2416 .jitter(JitterStrategy::Full)
2417 .build();
2418 for attempt in 0..3 {
2419 let secs = exp.next_delay(attempt, "e").unwrap().to_seconds();
2420 assert!(
2421 secs >= 1,
2422 "ExponentialBackoff attempt {} delay {} < 1",
2423 attempt,
2424 secs
2425 );
2426 }
2427
2428 let fixed = FixedDelay::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
2429 for attempt in 0..3 {
2430 let secs = fixed.next_delay(attempt, "e").unwrap().to_seconds();
2431 assert!(
2432 secs >= 1,
2433 "FixedDelay attempt {} delay {} < 1",
2434 attempt,
2435 secs
2436 );
2437 }
2438
2439 let linear =
2440 LinearBackoff::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
2441 for attempt in 0..3 {
2442 let secs = linear.next_delay(attempt, "e").unwrap().to_seconds();
2443 assert!(
2444 secs >= 1,
2445 "LinearBackoff attempt {} delay {} < 1",
2446 attempt,
2447 secs
2448 );
2449 }
2450 }
2451
2452 fn jitter_strategy_strategy() -> impl Strategy<Value = JitterStrategy> {
2458 prop_oneof![
2459 Just(JitterStrategy::None),
2460 Just(JitterStrategy::Full),
2461 Just(JitterStrategy::Half),
2462 ]
2463 }
2464
2465 proptest! {
2466 #[test]
2470 fn prop_jitter_none_identity(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2471 let result = JitterStrategy::None.apply(delay, attempt);
2472 prop_assert!((result - delay).abs() < f64::EPSILON,
2473 "None jitter changed delay from {} to {}", delay, result);
2474 }
2475
2476 #[test]
2480 fn prop_jitter_full_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2481 let result = JitterStrategy::Full.apply(delay, attempt);
2482 prop_assert!(result >= 0.0, "Full jitter result {} < 0", result);
2483 prop_assert!(result <= delay + f64::EPSILON,
2484 "Full jitter result {} > delay {}", result, delay);
2485 }
2486
2487 #[test]
2491 fn prop_jitter_half_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2492 let result = JitterStrategy::Half.apply(delay, attempt);
2493 prop_assert!(result >= delay / 2.0 - f64::EPSILON,
2494 "Half jitter result {} < delay/2 {}", result, delay / 2.0);
2495 prop_assert!(result <= delay + f64::EPSILON,
2496 "Half jitter result {} > delay {}", result, delay);
2497 }
2498
2499 #[test]
2503 fn prop_jitter_deterministic(
2504 jitter in jitter_strategy_strategy(),
2505 delay in 0.0f64..1000.0,
2506 attempt in 0u32..100
2507 ) {
2508 let r1 = jitter.apply(delay, attempt);
2509 let r2 = jitter.apply(delay, attempt);
2510 prop_assert!((r1 - r2).abs() < f64::EPSILON,
2511 "Jitter not deterministic: {} vs {}", r1, r2);
2512 }
2513
2514 #[test]
2518 fn prop_jitter_minimum_floor(
2519 jitter in jitter_strategy_strategy(),
2520 attempt in 0u32..10,
2521 base_delay_secs in 1u64..100
2522 ) {
2523 let exp = ExponentialBackoff::builder()
2525 .max_attempts(10)
2526 .base_delay(Duration::from_seconds(base_delay_secs))
2527 .jitter(jitter)
2528 .build();
2529 if let Some(d) = exp.next_delay(attempt, "e") {
2530 prop_assert!(d.to_seconds() >= 1,
2531 "ExponentialBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2532 }
2533
2534 let fixed = FixedDelay::new(10, Duration::from_seconds(base_delay_secs))
2536 .with_jitter(jitter);
2537 if let Some(d) = fixed.next_delay(attempt, "e") {
2538 prop_assert!(d.to_seconds() >= 1,
2539 "FixedDelay delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2540 }
2541
2542 let linear = LinearBackoff::new(10, Duration::from_seconds(base_delay_secs))
2544 .with_jitter(jitter);
2545 if let Some(d) = linear.next_delay(attempt, "e") {
2546 prop_assert!(d.to_seconds() >= 1,
2547 "LinearBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2548 }
2549 }
2550 }
2551}
2552
2553#[cfg(test)]
2554mod retryable_error_filter_tests {
2555 use super::*;
2556
2557 #[test]
2558 fn test_empty_filter_retries_all() {
2559 let filter = RetryableErrorFilter::default();
2560 assert!(filter.is_retryable("any error message"));
2561 assert!(filter.is_retryable(""));
2562 assert!(filter.is_retryable("timeout"));
2563 assert!(filter.is_retryable_with_type("any error", "AnyType"));
2564 }
2565
2566 #[test]
2567 fn test_contains_pattern_matches_substring() {
2568 let filter = RetryableErrorFilter {
2569 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2570 error_types: vec![],
2571 };
2572 assert!(filter.is_retryable("request timeout occurred"));
2573 assert!(filter.is_retryable("timeout"));
2574 assert!(filter.is_retryable("a timeout happened"));
2575 }
2576
2577 #[test]
2578 fn test_contains_pattern_no_match() {
2579 let filter = RetryableErrorFilter {
2580 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2581 error_types: vec![],
2582 };
2583 assert!(!filter.is_retryable("connection refused"));
2584 assert!(!filter.is_retryable("invalid input"));
2585 assert!(!filter.is_retryable(""));
2586 }
2587
2588 #[test]
2589 fn test_regex_pattern_matches() {
2590 let filter = RetryableErrorFilter {
2591 patterns: vec![ErrorPattern::Regex(
2592 regex::Regex::new(r"(?i)connection.*refused").unwrap(),
2593 )],
2594 error_types: vec![],
2595 };
2596 assert!(filter.is_retryable("Connection was refused"));
2597 assert!(filter.is_retryable("connection refused"));
2598 assert!(filter.is_retryable("CONNECTION actively REFUSED"));
2599 }
2600
2601 #[test]
2602 fn test_regex_pattern_no_match() {
2603 let filter = RetryableErrorFilter {
2604 patterns: vec![ErrorPattern::Regex(
2605 regex::Regex::new(r"(?i)connection.*refused").unwrap(),
2606 )],
2607 error_types: vec![],
2608 };
2609 assert!(!filter.is_retryable("timeout error"));
2610 assert!(!filter.is_retryable("refused connection")); }
2612
2613 #[test]
2614 fn test_or_logic_multiple_patterns() {
2615 let filter = RetryableErrorFilter {
2616 patterns: vec![
2617 ErrorPattern::Contains("timeout".to_string()),
2618 ErrorPattern::Regex(regex::Regex::new(r"(?i)connection.*refused").unwrap()),
2619 ],
2620 error_types: vec![],
2621 };
2622 assert!(filter.is_retryable("request timeout"));
2624 assert!(filter.is_retryable("Connection refused"));
2626 assert!(!filter.is_retryable("invalid input"));
2628 }
2629
2630 #[test]
2631 fn test_error_type_matching() {
2632 let filter = RetryableErrorFilter {
2633 patterns: vec![],
2634 error_types: vec!["TransientError".to_string()],
2635 };
2636 assert!(!filter.is_retryable("some error"));
2638 assert!(filter.is_retryable_with_type("some error", "TransientError"));
2640 assert!(!filter.is_retryable_with_type("some error", "PermanentError"));
2641 }
2642
2643 #[test]
2644 fn test_or_logic_patterns_and_types() {
2645 let filter = RetryableErrorFilter {
2646 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2647 error_types: vec!["TransientError".to_string()],
2648 };
2649 assert!(filter.is_retryable_with_type("request timeout", "PermanentError"));
2651 assert!(filter.is_retryable_with_type("invalid input", "TransientError"));
2653 assert!(filter.is_retryable_with_type("request timeout", "TransientError"));
2655 assert!(!filter.is_retryable_with_type("invalid input", "PermanentError"));
2657 }
2658
2659 #[test]
2660 fn test_error_pattern_debug() {
2661 let contains = ErrorPattern::Contains("test".to_string());
2662 let debug_str = format!("{:?}", contains);
2663 assert!(debug_str.contains("Contains"));
2664 assert!(debug_str.contains("test"));
2665
2666 let regex = ErrorPattern::Regex(regex::Regex::new(r"\d+").unwrap());
2667 let debug_str = format!("{:?}", regex);
2668 assert!(debug_str.contains("Regex"));
2669 }
2670
2671 #[test]
2672 fn test_retryable_error_filter_clone() {
2673 let filter = RetryableErrorFilter {
2674 patterns: vec![
2675 ErrorPattern::Contains("timeout".to_string()),
2676 ErrorPattern::Regex(regex::Regex::new(r"err\d+").unwrap()),
2677 ],
2678 error_types: vec!["TransientError".to_string()],
2679 };
2680 let cloned = filter.clone();
2681 assert!(cloned.is_retryable("timeout error"));
2682 assert!(cloned.is_retryable("err42"));
2683 assert!(cloned.is_retryable_with_type("x", "TransientError"));
2684 }
2685
2686 #[test]
2692 fn test_wait_decision_done_when_predicate_false() {
2693 let strategy = create_wait_strategy(WaitStrategyConfig {
2695 max_attempts: Some(10),
2696 initial_delay: Duration::from_seconds(5),
2697 max_delay: Duration::from_seconds(300),
2698 backoff_rate: 1.5,
2699 jitter: JitterStrategy::None,
2700 should_continue_polling: Box::new(|state: &String| state != "COMPLETED"),
2701 });
2702
2703 let decision = strategy(&"COMPLETED".to_string(), 1);
2705 assert_eq!(decision, WaitDecision::Done);
2706 }
2707
2708 #[test]
2709 fn test_wait_decision_continue_with_backoff() {
2710 let strategy = create_wait_strategy(WaitStrategyConfig {
2712 max_attempts: Some(10),
2713 initial_delay: Duration::from_seconds(5),
2714 max_delay: Duration::from_seconds(300),
2715 backoff_rate: 2.0,
2716 jitter: JitterStrategy::None,
2717 should_continue_polling: Box::new(|state: &String| state != "DONE"),
2718 });
2719
2720 let decision = strategy(&"PENDING".to_string(), 1);
2722 assert_eq!(
2723 decision,
2724 WaitDecision::Continue {
2725 delay: Duration::from_seconds(5)
2726 }
2727 );
2728
2729 let decision = strategy(&"PENDING".to_string(), 2);
2731 assert_eq!(
2732 decision,
2733 WaitDecision::Continue {
2734 delay: Duration::from_seconds(10)
2735 }
2736 );
2737
2738 let decision = strategy(&"PENDING".to_string(), 3);
2740 assert_eq!(
2741 decision,
2742 WaitDecision::Continue {
2743 delay: Duration::from_seconds(20)
2744 }
2745 );
2746 }
2747
2748 #[test]
2749 fn test_wait_strategy_delay_capped_at_max() {
2750 let strategy = create_wait_strategy(WaitStrategyConfig {
2752 max_attempts: Some(20),
2753 initial_delay: Duration::from_seconds(10),
2754 max_delay: Duration::from_seconds(30),
2755 backoff_rate: 2.0,
2756 jitter: JitterStrategy::None,
2757 should_continue_polling: Box::new(|_: &i32| true),
2758 });
2759
2760 let decision = strategy(&0, 3);
2762 assert_eq!(
2763 decision,
2764 WaitDecision::Continue {
2765 delay: Duration::from_seconds(30)
2766 }
2767 );
2768
2769 let decision = strategy(&0, 5);
2771 assert_eq!(
2772 decision,
2773 WaitDecision::Continue {
2774 delay: Duration::from_seconds(30)
2775 }
2776 );
2777 }
2778
2779 #[test]
2780 fn test_wait_strategy_max_attempts_returns_done() {
2781 let strategy = create_wait_strategy(WaitStrategyConfig {
2783 max_attempts: Some(3),
2784 initial_delay: Duration::from_seconds(5),
2785 max_delay: Duration::from_seconds(300),
2786 backoff_rate: 1.5,
2787 jitter: JitterStrategy::None,
2788 should_continue_polling: Box::new(|_: &i32| true),
2789 });
2790
2791 let decision = strategy(&0, 3);
2793 assert_eq!(decision, WaitDecision::Done);
2794 }
2795
2796 #[test]
2797 fn test_wait_strategy_jitter_application() {
2798 let strategy = create_wait_strategy(WaitStrategyConfig {
2800 max_attempts: Some(10),
2801 initial_delay: Duration::from_seconds(10),
2802 max_delay: Duration::from_seconds(300),
2803 backoff_rate: 1.0,
2804 jitter: JitterStrategy::Full,
2805 should_continue_polling: Box::new(|_: &i32| true),
2806 });
2807
2808 let decision = strategy(&0, 1);
2811 match decision {
2812 WaitDecision::Continue { delay } => {
2813 assert!(
2814 delay.to_seconds() >= 1 && delay.to_seconds() <= 10,
2815 "Jittered delay {} should be in [1, 10]",
2816 delay.to_seconds()
2817 );
2818 }
2819 WaitDecision::Done => panic!("Expected Continue, got Done"),
2820 }
2821 }
2822
2823 #[test]
2824 fn test_wait_strategy_delay_minimum_floor() {
2825 let strategy = create_wait_strategy(WaitStrategyConfig {
2827 max_attempts: Some(10),
2828 initial_delay: Duration::from_seconds(1),
2829 max_delay: Duration::from_seconds(300),
2830 backoff_rate: 1.0,
2831 jitter: JitterStrategy::Full,
2832 should_continue_polling: Box::new(|_: &i32| true),
2833 });
2834
2835 let decision = strategy(&0, 1);
2837 match decision {
2838 WaitDecision::Continue { delay } => {
2839 assert!(
2840 delay.to_seconds() >= 1,
2841 "Delay {} should be at least 1 second",
2842 delay.to_seconds()
2843 );
2844 }
2845 WaitDecision::Done => panic!("Expected Continue, got Done"),
2846 }
2847 }
2848
2849 #[test]
2850 fn test_wait_strategy_default_max_attempts() {
2851 let strategy = create_wait_strategy(WaitStrategyConfig {
2853 max_attempts: None, initial_delay: Duration::from_seconds(1),
2855 max_delay: Duration::from_seconds(10),
2856 backoff_rate: 1.0,
2857 jitter: JitterStrategy::None,
2858 should_continue_polling: Box::new(|_: &i32| true),
2859 });
2860
2861 let decision = strategy(&0, 59);
2863 assert!(matches!(decision, WaitDecision::Continue { .. }));
2864 }
2865
2866 #[test]
2867 fn test_wait_strategy_default_max_attempts_returns_done() {
2868 let strategy = create_wait_strategy(WaitStrategyConfig {
2870 max_attempts: None, initial_delay: Duration::from_seconds(1),
2872 max_delay: Duration::from_seconds(10),
2873 backoff_rate: 1.0,
2874 jitter: JitterStrategy::None,
2875 should_continue_polling: Box::new(|_: &i32| true),
2876 });
2877
2878 let decision = strategy(&0, 60);
2880 assert_eq!(decision, WaitDecision::Done);
2881 }
2882
2883 #[test]
2884 fn test_wait_decision_enum_variants() {
2885 let cont = WaitDecision::Continue {
2887 delay: Duration::from_seconds(5),
2888 };
2889 let done = WaitDecision::Done;
2890
2891 assert!(format!("{:?}", cont).contains("Continue"));
2893 assert!(format!("{:?}", done).contains("Done"));
2894
2895 assert_eq!(
2897 WaitDecision::Continue {
2898 delay: Duration::from_seconds(5)
2899 },
2900 WaitDecision::Continue {
2901 delay: Duration::from_seconds(5)
2902 }
2903 );
2904 assert_ne!(
2905 WaitDecision::Continue {
2906 delay: Duration::from_seconds(5)
2907 },
2908 WaitDecision::Done
2909 );
2910 }
2911}