1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use blake2::{Blake2b512, Digest};
22use serde::{Deserialize, Serialize};
23
24use crate::duration::Duration;
25use crate::error::DurableError;
26use crate::sealed::Sealed;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
64pub enum JitterStrategy {
65 #[default]
67 None,
68 Full,
70 Half,
72}
73
74impl JitterStrategy {
75 pub fn apply(&self, delay_secs: f64, attempt: u32) -> f64 {
93 match self {
94 JitterStrategy::None => delay_secs,
95 JitterStrategy::Full => {
96 let factor = deterministic_random_factor(attempt);
97 factor * delay_secs
98 }
99 JitterStrategy::Half => {
100 let factor = deterministic_random_factor(attempt);
101 delay_secs / 2.0 + factor * (delay_secs / 2.0)
102 }
103 }
104 }
105}
106
107fn deterministic_random_factor(attempt: u32) -> f64 {
112 let mut hasher = Blake2b512::new();
113 hasher.update(b"jitter");
114 hasher.update(attempt.to_le_bytes());
115 let result = hasher.finalize();
116
117 let mut bytes = [0u8; 8];
119 bytes.copy_from_slice(&result[..8]);
120 let value = u64::from_le_bytes(bytes);
121 (value as f64) / (u64::MAX as f64)
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
148pub enum WaitDecision {
149 Continue { delay: Duration },
151 Done,
153}
154
155pub struct WaitStrategyConfig<T> {
193 pub max_attempts: Option<usize>,
195 pub initial_delay: Duration,
197 pub max_delay: Duration,
199 pub backoff_rate: f64,
201 pub jitter: JitterStrategy,
203 pub should_continue_polling: Box<dyn Fn(&T) -> bool + Send + Sync>,
205}
206
207#[allow(clippy::type_complexity)]
228pub fn create_wait_strategy<T: Send + Sync + 'static>(
229 config: WaitStrategyConfig<T>,
230) -> Box<dyn Fn(&T, usize) -> WaitDecision + Send + Sync> {
231 let max_attempts = config.max_attempts.unwrap_or(60);
232 let initial_delay_secs = config.initial_delay.to_seconds() as f64;
233 let max_delay_secs = config.max_delay.to_seconds() as f64;
234 let backoff_rate = config.backoff_rate;
235 let jitter = config.jitter;
236 let should_continue = config.should_continue_polling;
237
238 Box::new(move |result: &T, attempts_made: usize| -> WaitDecision {
239 if !should_continue(result) {
241 return WaitDecision::Done;
242 }
243
244 if attempts_made >= max_attempts {
246 panic!(
247 "waitForCondition exceeded maximum attempts ({})",
248 max_attempts
249 );
250 }
251
252 let exponent = if attempts_made > 0 {
254 (attempts_made as i32) - 1
255 } else {
256 0
257 };
258 let base_delay = (initial_delay_secs * backoff_rate.powi(exponent)).min(max_delay_secs);
259
260 let jittered = jitter.apply(base_delay, attempts_made as u32);
262 let final_delay = jittered.max(1.0).round() as u64;
263
264 WaitDecision::Continue {
265 delay: Duration::from_seconds(final_delay),
266 }
267 })
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
317pub enum CheckpointingMode {
318 Eager,
337
338 Batched,
358
359 Optimistic,
379}
380
381impl Default for CheckpointingMode {
382 fn default() -> Self {
387 Self::Batched
388 }
389}
390
391impl CheckpointingMode {
392 pub fn is_eager(&self) -> bool {
394 matches!(self, Self::Eager)
395 }
396
397 pub fn is_batched(&self) -> bool {
399 matches!(self, Self::Batched)
400 }
401
402 pub fn is_optimistic(&self) -> bool {
404 matches!(self, Self::Optimistic)
405 }
406
407 pub fn description(&self) -> &'static str {
409 match self {
410 Self::Eager => "Checkpoint after every operation (maximum durability)",
411 Self::Batched => "Batch operations before checkpointing (balanced)",
412 Self::Optimistic => {
413 "Execute multiple operations before checkpointing (best performance)"
414 }
415 }
416 }
417}
418
419#[allow(private_bounds)]
433pub trait RetryStrategy: Sealed + Send + Sync {
434 fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration>;
436
437 fn clone_box(&self) -> Box<dyn RetryStrategy>;
439}
440
441impl Clone for Box<dyn RetryStrategy> {
442 fn clone(&self) -> Self {
443 self.clone_box()
444 }
445}
446
447#[derive(Debug, Clone)]
473pub struct ExponentialBackoff {
474 pub max_attempts: u32,
476 pub base_delay: Duration,
478 pub max_delay: Duration,
480 pub multiplier: f64,
482 pub jitter: JitterStrategy,
484}
485
486impl ExponentialBackoff {
487 pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
494 Self {
495 max_attempts,
496 base_delay,
497 max_delay: Duration::from_hours(1),
498 multiplier: 2.0,
499 jitter: JitterStrategy::None,
500 }
501 }
502
503 pub fn builder() -> ExponentialBackoffBuilder {
505 ExponentialBackoffBuilder::default()
506 }
507}
508
509impl Sealed for ExponentialBackoff {}
510
511impl RetryStrategy for ExponentialBackoff {
512 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
513 if attempt >= self.max_attempts {
514 return None;
515 }
516
517 let base_seconds = self.base_delay.to_seconds() as f64;
518 let delay_seconds = base_seconds * self.multiplier.powi(attempt as i32);
519 let max_seconds = self.max_delay.to_seconds() as f64;
520 let capped_seconds = delay_seconds.min(max_seconds);
521
522 let jittered = self.jitter.apply(capped_seconds, attempt);
523 let final_seconds = jittered.max(1.0);
524
525 Some(Duration::from_seconds(final_seconds as u64))
526 }
527
528 fn clone_box(&self) -> Box<dyn RetryStrategy> {
529 Box::new(self.clone())
530 }
531}
532
533#[derive(Debug, Clone)]
535pub struct ExponentialBackoffBuilder {
536 max_attempts: u32,
537 base_delay: Duration,
538 max_delay: Duration,
539 multiplier: f64,
540 jitter: JitterStrategy,
541}
542
543impl Default for ExponentialBackoffBuilder {
544 fn default() -> Self {
545 Self {
546 max_attempts: 3,
547 base_delay: Duration::from_seconds(1),
548 max_delay: Duration::from_hours(1),
549 multiplier: 2.0,
550 jitter: JitterStrategy::None,
551 }
552 }
553}
554
555impl ExponentialBackoffBuilder {
556 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
558 self.max_attempts = max_attempts;
559 self
560 }
561
562 pub fn base_delay(mut self, base_delay: Duration) -> Self {
564 self.base_delay = base_delay;
565 self
566 }
567
568 pub fn max_delay(mut self, max_delay: Duration) -> Self {
570 self.max_delay = max_delay;
571 self
572 }
573
574 pub fn multiplier(mut self, multiplier: f64) -> Self {
576 self.multiplier = multiplier;
577 self
578 }
579
580 pub fn jitter(mut self, jitter: JitterStrategy) -> Self {
582 self.jitter = jitter;
583 self
584 }
585
586 pub fn build(self) -> ExponentialBackoff {
588 ExponentialBackoff {
589 max_attempts: self.max_attempts,
590 base_delay: self.base_delay,
591 max_delay: self.max_delay,
592 multiplier: self.multiplier,
593 jitter: self.jitter,
594 }
595 }
596}
597
598#[derive(Debug, Clone)]
612pub struct FixedDelay {
613 pub max_attempts: u32,
615 pub delay: Duration,
617 pub jitter: JitterStrategy,
619}
620
621impl FixedDelay {
622 pub fn new(max_attempts: u32, delay: Duration) -> Self {
629 Self {
630 max_attempts,
631 delay,
632 jitter: JitterStrategy::None,
633 }
634 }
635
636 pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
638 self.jitter = jitter;
639 self
640 }
641}
642
643impl Sealed for FixedDelay {}
644
645impl RetryStrategy for FixedDelay {
646 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
647 if attempt >= self.max_attempts {
648 return None;
649 }
650
651 let delay_secs = self.delay.to_seconds() as f64;
652 let jittered = self.jitter.apply(delay_secs, attempt);
653 let final_seconds = jittered.max(1.0);
654
655 Some(Duration::from_seconds(final_seconds as u64))
656 }
657
658 fn clone_box(&self) -> Box<dyn RetryStrategy> {
659 Box::new(self.clone())
660 }
661}
662
663#[derive(Debug, Clone)]
677pub struct LinearBackoff {
678 pub max_attempts: u32,
680 pub base_delay: Duration,
682 pub max_delay: Duration,
684 pub jitter: JitterStrategy,
686}
687
688impl LinearBackoff {
689 pub fn new(max_attempts: u32, base_delay: Duration) -> Self {
696 Self {
697 max_attempts,
698 base_delay,
699 max_delay: Duration::from_hours(1),
700 jitter: JitterStrategy::None,
701 }
702 }
703
704 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
706 self.max_delay = max_delay;
707 self
708 }
709
710 pub fn with_jitter(mut self, jitter: JitterStrategy) -> Self {
712 self.jitter = jitter;
713 self
714 }
715}
716
717impl Sealed for LinearBackoff {}
718
719impl RetryStrategy for LinearBackoff {
720 fn next_delay(&self, attempt: u32, _error: &str) -> Option<Duration> {
721 if attempt >= self.max_attempts {
722 return None;
723 }
724
725 let base_seconds = self.base_delay.to_seconds();
726 let delay_seconds = base_seconds.saturating_mul((attempt + 1) as u64);
727 let max_seconds = self.max_delay.to_seconds();
728 let capped_seconds = delay_seconds.min(max_seconds) as f64;
729
730 let jittered = self.jitter.apply(capped_seconds, attempt);
731 let final_seconds = jittered.max(1.0);
732
733 Some(Duration::from_seconds(final_seconds as u64))
734 }
735
736 fn clone_box(&self) -> Box<dyn RetryStrategy> {
737 Box::new(self.clone())
738 }
739}
740
741#[derive(Debug, Clone, Copy, Default)]
751pub struct NoRetry;
752
753impl Sealed for NoRetry {}
754
755impl RetryStrategy for NoRetry {
756 fn next_delay(&self, _attempt: u32, _error: &str) -> Option<Duration> {
757 None
758 }
759
760 fn clone_box(&self) -> Box<dyn RetryStrategy> {
761 Box::new(*self)
762 }
763}
764
765#[derive(Clone)]
785pub enum ErrorPattern {
786 Contains(String),
788 Regex(regex::Regex),
790}
791
792impl std::fmt::Debug for ErrorPattern {
793 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
794 match self {
795 ErrorPattern::Contains(s) => f.debug_tuple("Contains").field(s).finish(),
796 ErrorPattern::Regex(r) => f.debug_tuple("Regex").field(&r.as_str()).finish(),
797 }
798 }
799}
800
801#[derive(Clone, Debug, Default)]
832pub struct RetryableErrorFilter {
833 pub patterns: Vec<ErrorPattern>,
835 pub error_types: Vec<String>,
837}
838
839impl RetryableErrorFilter {
840 pub fn is_retryable(&self, error_msg: &str) -> bool {
847 if self.patterns.is_empty() && self.error_types.is_empty() {
848 return true;
849 }
850
851 self.patterns.iter().any(|p| match p {
852 ErrorPattern::Contains(s) => error_msg.contains(s.as_str()),
853 ErrorPattern::Regex(r) => r.is_match(error_msg),
854 })
855 }
856
857 pub fn is_retryable_with_type(&self, error_msg: &str, error_type: &str) -> bool {
864 if self.patterns.is_empty() && self.error_types.is_empty() {
865 return true;
866 }
867
868 let matches_type = self.error_types.iter().any(|t| t == error_type);
869 matches_type || self.is_retryable(error_msg)
870 }
871}
872
873pub fn custom_retry<F>(f: F) -> CustomRetry<F>
897where
898 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
899{
900 CustomRetry { f }
901}
902
903#[derive(Clone)]
907pub struct CustomRetry<F>
908where
909 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
910{
911 f: F,
912}
913
914impl<F> std::fmt::Debug for CustomRetry<F>
915where
916 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
917{
918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919 f.debug_struct("CustomRetry").finish()
920 }
921}
922
923impl<F> Sealed for CustomRetry<F> where
924 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static
925{
926}
927
928impl<F> RetryStrategy for CustomRetry<F>
929where
930 F: Fn(u32, &str) -> Option<Duration> + Send + Sync + Clone + 'static,
931{
932 fn next_delay(&self, attempt: u32, error: &str) -> Option<Duration> {
933 (self.f)(attempt, error)
934 }
935
936 fn clone_box(&self) -> Box<dyn RetryStrategy> {
937 Box::new(self.clone())
938 }
939}
940
941#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
947pub enum StepSemantics {
948 AtMostOncePerRetry,
950 #[default]
952 AtLeastOncePerRetry,
953}
954
955#[derive(Clone, Default)]
980pub struct StepConfig {
981 pub retry_strategy: Option<Box<dyn RetryStrategy>>,
983 pub step_semantics: StepSemantics,
985 pub serdes: Option<Arc<dyn SerDesAny>>,
987 pub retryable_error_filter: Option<RetryableErrorFilter>,
991}
992
993impl std::fmt::Debug for StepConfig {
994 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
995 f.debug_struct("StepConfig")
996 .field("retry_strategy", &self.retry_strategy.is_some())
997 .field("step_semantics", &self.step_semantics)
998 .field("serdes", &self.serdes.is_some())
999 .field(
1000 "retryable_error_filter",
1001 &self.retryable_error_filter.is_some(),
1002 )
1003 .finish()
1004 }
1005}
1006
1007#[derive(Debug, Clone, Default)]
1009pub struct CallbackConfig {
1010 pub timeout: Duration,
1012 pub heartbeat_timeout: Duration,
1014 pub serdes: Option<Arc<dyn SerDesAny>>,
1016}
1017
1018#[derive(Clone)]
1020pub struct InvokeConfig<P, R> {
1021 pub timeout: Duration,
1023 pub serdes_payload: Option<Arc<dyn SerDesAny>>,
1025 pub serdes_result: Option<Arc<dyn SerDesAny>>,
1027 pub tenant_id: Option<String>,
1029 _marker: PhantomData<(P, R)>,
1031}
1032
1033impl<P, R> Default for InvokeConfig<P, R> {
1034 fn default() -> Self {
1035 Self {
1036 timeout: Duration::default(),
1037 serdes_payload: None,
1038 serdes_result: None,
1039 tenant_id: None,
1040 _marker: PhantomData,
1041 }
1042 }
1043}
1044
1045impl<P, R> std::fmt::Debug for InvokeConfig<P, R> {
1046 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1047 f.debug_struct("InvokeConfig")
1048 .field("timeout", &self.timeout)
1049 .field("serdes_payload", &self.serdes_payload.is_some())
1050 .field("serdes_result", &self.serdes_result.is_some())
1051 .field("tenant_id", &self.tenant_id)
1052 .finish()
1053 }
1054}
1055
1056#[derive(Debug, Clone, Default)]
1083pub struct MapConfig {
1084 pub max_concurrency: Option<usize>,
1086 pub item_batcher: Option<ItemBatcher>,
1088 pub completion_config: CompletionConfig,
1090 pub serdes: Option<Arc<dyn SerDesAny>>,
1092}
1093
1094#[derive(Debug, Clone, Default)]
1096pub struct ParallelConfig {
1097 pub max_concurrency: Option<usize>,
1099 pub completion_config: CompletionConfig,
1101 pub serdes: Option<Arc<dyn SerDesAny>>,
1103}
1104
1105#[derive(Clone, Default)]
1116#[allow(clippy::type_complexity)]
1117pub struct ChildConfig {
1118 pub serdes: Option<Arc<dyn SerDesAny>>,
1120 pub replay_children: bool,
1134 pub error_mapper: Option<Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>>,
1147 pub summary_generator: Option<Arc<dyn Fn(&str) -> String + Send + Sync>>,
1164}
1165
1166impl std::fmt::Debug for ChildConfig {
1167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1168 f.debug_struct("ChildConfig")
1169 .field("serdes", &self.serdes)
1170 .field("replay_children", &self.replay_children)
1171 .field("error_mapper", &self.error_mapper.as_ref().map(|_| "..."))
1172 .field(
1173 "summary_generator",
1174 &self.summary_generator.as_ref().map(|_| "..."),
1175 )
1176 .finish()
1177 }
1178}
1179
1180impl ChildConfig {
1181 pub fn new() -> Self {
1183 Self::default()
1184 }
1185
1186 pub fn with_replay_children() -> Self {
1200 Self {
1201 replay_children: true,
1202 ..Default::default()
1203 }
1204 }
1205
1206 pub fn set_replay_children(mut self, replay_children: bool) -> Self {
1212 self.replay_children = replay_children;
1213 self
1214 }
1215
1216 pub fn set_serdes(mut self, serdes: Arc<dyn SerDesAny>) -> Self {
1218 self.serdes = Some(serdes);
1219 self
1220 }
1221
1222 pub fn set_error_mapper(
1231 mut self,
1232 mapper: Arc<dyn Fn(DurableError) -> DurableError + Send + Sync>,
1233 ) -> Self {
1234 self.error_mapper = Some(mapper);
1235 self
1236 }
1237
1238 pub fn set_summary_generator(
1248 mut self,
1249 generator: Arc<dyn Fn(&str) -> String + Send + Sync>,
1250 ) -> Self {
1251 self.summary_generator = Some(generator);
1252 self
1253 }
1254}
1255
1256pub type ContextConfig = ChildConfig;
1265
1266#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1268pub struct CompletionConfig {
1269 pub min_successful: Option<usize>,
1271 pub tolerated_failure_count: Option<usize>,
1273 pub tolerated_failure_percentage: Option<f64>,
1275}
1276
1277impl CompletionConfig {
1278 pub fn first_successful() -> Self {
1289 Self {
1290 min_successful: Some(1),
1291 ..Default::default()
1292 }
1293 }
1294
1295 pub fn all_completed() -> Self {
1306 Self::default()
1307 }
1308
1309 pub fn all_successful() -> Self {
1321 Self {
1322 tolerated_failure_count: Some(0),
1323 tolerated_failure_percentage: Some(0.0),
1324 ..Default::default()
1325 }
1326 }
1327
1328 pub fn with_min_successful(count: usize) -> Self {
1330 Self {
1331 min_successful: Some(count),
1332 ..Default::default()
1333 }
1334 }
1335
1336 pub fn with_failure_tolerance(count: usize) -> Self {
1338 Self {
1339 tolerated_failure_count: Some(count),
1340 ..Default::default()
1341 }
1342 }
1343}
1344
1345#[derive(Debug, Clone)]
1347pub struct ItemBatcher {
1348 pub max_items_per_batch: usize,
1350 pub max_bytes_per_batch: usize,
1352}
1353
1354impl Default for ItemBatcher {
1355 fn default() -> Self {
1356 Self {
1357 max_items_per_batch: 100,
1358 max_bytes_per_batch: 256 * 1024, }
1360 }
1361}
1362
1363impl ItemBatcher {
1364 pub fn new(max_items_per_batch: usize, max_bytes_per_batch: usize) -> Self {
1366 Self {
1367 max_items_per_batch,
1368 max_bytes_per_batch,
1369 }
1370 }
1371
1372 pub fn batch<T: Serialize + Clone>(&self, items: &[T]) -> Vec<(usize, Vec<T>)> {
1413 if items.is_empty() {
1414 return Vec::new();
1415 }
1416
1417 let mut batches = Vec::new();
1418 let mut current_batch = Vec::new();
1419 let mut current_bytes = 0usize;
1420 let mut batch_start_index = 0;
1421
1422 for (i, item) in items.iter().enumerate() {
1423 let item_bytes = serde_json::to_string(item).map(|s| s.len()).unwrap_or(0);
1425
1426 let would_exceed_items = current_batch.len() >= self.max_items_per_batch;
1428 let would_exceed_bytes =
1429 current_bytes + item_bytes > self.max_bytes_per_batch && !current_batch.is_empty();
1430
1431 if would_exceed_items || would_exceed_bytes {
1432 batches.push((batch_start_index, std::mem::take(&mut current_batch)));
1434 current_bytes = 0;
1435 batch_start_index = i;
1436 }
1437
1438 current_batch.push(item.clone());
1439 current_bytes += item_bytes;
1440 }
1441
1442 if !current_batch.is_empty() {
1444 batches.push((batch_start_index, current_batch));
1445 }
1446
1447 batches
1448 }
1449}
1450
1451pub trait SerDesAny: Send + Sync {
1453 fn serialize_any(
1455 &self,
1456 value: &dyn std::any::Any,
1457 ) -> Result<String, crate::error::DurableError>;
1458 fn deserialize_any(
1460 &self,
1461 data: &str,
1462 ) -> Result<Box<dyn std::any::Any + Send>, crate::error::DurableError>;
1463}
1464
1465impl std::fmt::Debug for dyn SerDesAny {
1466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1467 f.write_str("SerDesAny")
1468 }
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 use super::*;
1474 use proptest::prelude::*;
1475
1476 #[test]
1481 fn test_step_semantics_default() {
1482 let semantics = StepSemantics::default();
1483 assert_eq!(semantics, StepSemantics::AtLeastOncePerRetry);
1484 }
1485
1486 #[test]
1487 fn test_step_config_default() {
1488 let config = StepConfig::default();
1489 assert!(config.retry_strategy.is_none());
1490 assert_eq!(config.step_semantics, StepSemantics::AtLeastOncePerRetry);
1491 assert!(config.serdes.is_none());
1492 }
1493
1494 #[test]
1495 fn test_completion_config_first_successful() {
1496 let config = CompletionConfig::first_successful();
1497 assert_eq!(config.min_successful, Some(1));
1498 assert!(config.tolerated_failure_count.is_none());
1499 assert!(config.tolerated_failure_percentage.is_none());
1500 }
1501
1502 #[test]
1503 fn test_completion_config_all_completed() {
1504 let config = CompletionConfig::all_completed();
1505 assert!(config.min_successful.is_none());
1506 assert!(config.tolerated_failure_count.is_none());
1507 assert!(config.tolerated_failure_percentage.is_none());
1508 }
1509
1510 #[test]
1511 fn test_completion_config_all_successful() {
1512 let config = CompletionConfig::all_successful();
1513 assert!(config.min_successful.is_none());
1514 assert_eq!(config.tolerated_failure_count, Some(0));
1515 assert_eq!(config.tolerated_failure_percentage, Some(0.0));
1516 }
1517
1518 #[test]
1519 fn test_item_batcher_default() {
1520 let batcher = ItemBatcher::default();
1521 assert_eq!(batcher.max_items_per_batch, 100);
1522 assert_eq!(batcher.max_bytes_per_batch, 256 * 1024);
1523 }
1524
1525 #[test]
1526 fn test_item_batcher_new() {
1527 let batcher = ItemBatcher::new(50, 128 * 1024);
1528 assert_eq!(batcher.max_items_per_batch, 50);
1529 assert_eq!(batcher.max_bytes_per_batch, 128 * 1024);
1530 }
1531
1532 #[test]
1533 fn test_callback_config_default() {
1534 let config = CallbackConfig::default();
1535 assert_eq!(config.timeout.to_seconds(), 0);
1536 assert_eq!(config.heartbeat_timeout.to_seconds(), 0);
1537 }
1538
1539 #[test]
1540 fn test_invoke_config_default() {
1541 let config: InvokeConfig<String, String> = InvokeConfig::default();
1542 assert_eq!(config.timeout.to_seconds(), 0);
1543 assert!(config.tenant_id.is_none());
1544 }
1545
1546 #[test]
1547 fn test_map_config_default() {
1548 let config = MapConfig::default();
1549 assert!(config.max_concurrency.is_none());
1550 assert!(config.item_batcher.is_none());
1551 }
1552
1553 #[test]
1554 fn test_parallel_config_default() {
1555 let config = ParallelConfig::default();
1556 assert!(config.max_concurrency.is_none());
1557 }
1558
1559 #[test]
1560 fn test_child_config_default() {
1561 let config = ChildConfig::default();
1562 assert!(!config.replay_children);
1563 assert!(config.serdes.is_none());
1564 assert!(config.error_mapper.is_none());
1565 assert!(config.summary_generator.is_none());
1566 }
1567
1568 #[test]
1569 fn test_child_config_with_replay_children() {
1570 let config = ChildConfig::with_replay_children();
1571 assert!(config.replay_children);
1572 }
1573
1574 #[test]
1575 fn test_child_config_set_replay_children() {
1576 let config = ChildConfig::new().set_replay_children(true);
1577 assert!(config.replay_children);
1578 }
1579
1580 #[test]
1581 fn test_context_config_type_alias() {
1582 let config: ContextConfig = ContextConfig::with_replay_children();
1584 assert!(config.replay_children);
1585 }
1586
1587 #[test]
1588 fn test_checkpointing_mode_default() {
1589 let mode = CheckpointingMode::default();
1590 assert_eq!(mode, CheckpointingMode::Batched);
1591 assert!(mode.is_batched());
1592 }
1593
1594 #[test]
1595 fn test_checkpointing_mode_eager() {
1596 let mode = CheckpointingMode::Eager;
1597 assert!(mode.is_eager());
1598 assert!(!mode.is_batched());
1599 assert!(!mode.is_optimistic());
1600 }
1601
1602 #[test]
1603 fn test_checkpointing_mode_batched() {
1604 let mode = CheckpointingMode::Batched;
1605 assert!(!mode.is_eager());
1606 assert!(mode.is_batched());
1607 assert!(!mode.is_optimistic());
1608 }
1609
1610 #[test]
1611 fn test_checkpointing_mode_optimistic() {
1612 let mode = CheckpointingMode::Optimistic;
1613 assert!(!mode.is_eager());
1614 assert!(!mode.is_batched());
1615 assert!(mode.is_optimistic());
1616 }
1617
1618 #[test]
1619 fn test_checkpointing_mode_description() {
1620 assert!(CheckpointingMode::Eager
1621 .description()
1622 .contains("maximum durability"));
1623 assert!(CheckpointingMode::Batched
1624 .description()
1625 .contains("balanced"));
1626 assert!(CheckpointingMode::Optimistic
1627 .description()
1628 .contains("best performance"));
1629 }
1630
1631 #[test]
1632 fn test_checkpointing_mode_serialization() {
1633 let mode = CheckpointingMode::Eager;
1635 let serialized = serde_json::to_string(&mode).unwrap();
1636 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1637 assert_eq!(mode, deserialized);
1638
1639 let mode = CheckpointingMode::Batched;
1640 let serialized = serde_json::to_string(&mode).unwrap();
1641 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1642 assert_eq!(mode, deserialized);
1643
1644 let mode = CheckpointingMode::Optimistic;
1645 let serialized = serde_json::to_string(&mode).unwrap();
1646 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
1647 assert_eq!(mode, deserialized);
1648 }
1649
1650 #[test]
1655 fn test_exponential_backoff_new() {
1656 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
1657 assert_eq!(strategy.max_attempts, 5);
1658 assert_eq!(strategy.base_delay.to_seconds(), 1);
1659 assert_eq!(strategy.max_delay.to_seconds(), 3600); assert!((strategy.multiplier - 2.0).abs() < f64::EPSILON);
1661 }
1662
1663 #[test]
1664 fn test_exponential_backoff_builder() {
1665 let strategy = ExponentialBackoff::builder()
1666 .max_attempts(10)
1667 .base_delay(Duration::from_seconds(2))
1668 .max_delay(Duration::from_minutes(30))
1669 .multiplier(3.0)
1670 .build();
1671
1672 assert_eq!(strategy.max_attempts, 10);
1673 assert_eq!(strategy.base_delay.to_seconds(), 2);
1674 assert_eq!(strategy.max_delay.to_seconds(), 1800); assert!((strategy.multiplier - 3.0).abs() < f64::EPSILON);
1676 }
1677
1678 #[test]
1679 fn test_exponential_backoff_delays() {
1680 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
1681
1682 assert_eq!(
1684 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1685 Some(1)
1686 );
1687 assert_eq!(
1689 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1690 Some(2)
1691 );
1692 assert_eq!(
1694 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1695 Some(4)
1696 );
1697 assert_eq!(
1699 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1700 Some(8)
1701 );
1702 assert_eq!(
1704 strategy.next_delay(4, "error").map(|d| d.to_seconds()),
1705 Some(16)
1706 );
1707 assert_eq!(strategy.next_delay(5, "error"), None);
1709 }
1710
1711 #[test]
1712 fn test_exponential_backoff_max_delay_cap() {
1713 let strategy = ExponentialBackoff::builder()
1714 .max_attempts(10)
1715 .base_delay(Duration::from_seconds(10))
1716 .max_delay(Duration::from_seconds(30))
1717 .build();
1718
1719 assert_eq!(
1721 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1722 Some(10)
1723 );
1724 assert_eq!(
1726 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1727 Some(20)
1728 );
1729 assert_eq!(
1731 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1732 Some(30)
1733 );
1734 assert_eq!(
1736 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1737 Some(30)
1738 );
1739 }
1740
1741 #[test]
1742 fn test_fixed_delay_new() {
1743 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
1744 assert_eq!(strategy.max_attempts, 3);
1745 assert_eq!(strategy.delay.to_seconds(), 5);
1746 }
1747
1748 #[test]
1749 fn test_fixed_delay_constant() {
1750 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
1751
1752 assert_eq!(
1754 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1755 Some(5)
1756 );
1757 assert_eq!(
1758 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1759 Some(5)
1760 );
1761 assert_eq!(
1762 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1763 Some(5)
1764 );
1765 assert_eq!(strategy.next_delay(3, "error"), None);
1767 }
1768
1769 #[test]
1770 fn test_linear_backoff_new() {
1771 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
1772 assert_eq!(strategy.max_attempts, 5);
1773 assert_eq!(strategy.base_delay.to_seconds(), 2);
1774 assert_eq!(strategy.max_delay.to_seconds(), 3600); }
1776
1777 #[test]
1778 fn test_linear_backoff_with_max_delay() {
1779 let strategy = LinearBackoff::new(5, Duration::from_seconds(2))
1780 .with_max_delay(Duration::from_seconds(10));
1781 assert_eq!(strategy.max_delay.to_seconds(), 10);
1782 }
1783
1784 #[test]
1785 fn test_linear_backoff_delays() {
1786 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
1787
1788 assert_eq!(
1790 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1791 Some(2)
1792 );
1793 assert_eq!(
1795 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1796 Some(4)
1797 );
1798 assert_eq!(
1800 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1801 Some(6)
1802 );
1803 assert_eq!(
1805 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1806 Some(8)
1807 );
1808 assert_eq!(
1810 strategy.next_delay(4, "error").map(|d| d.to_seconds()),
1811 Some(10)
1812 );
1813 assert_eq!(strategy.next_delay(5, "error"), None);
1815 }
1816
1817 #[test]
1818 fn test_linear_backoff_max_delay_cap() {
1819 let strategy = LinearBackoff::new(10, Duration::from_seconds(5))
1820 .with_max_delay(Duration::from_seconds(15));
1821
1822 assert_eq!(
1824 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1825 Some(5)
1826 );
1827 assert_eq!(
1829 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1830 Some(10)
1831 );
1832 assert_eq!(
1834 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1835 Some(15)
1836 );
1837 assert_eq!(
1839 strategy.next_delay(3, "error").map(|d| d.to_seconds()),
1840 Some(15)
1841 );
1842 }
1843
1844 #[test]
1845 fn test_no_retry() {
1846 let strategy = NoRetry;
1847
1848 assert_eq!(strategy.next_delay(0, "error"), None);
1850 assert_eq!(strategy.next_delay(1, "error"), None);
1851 assert_eq!(strategy.next_delay(100, "error"), None);
1852 }
1853
1854 #[test]
1855 fn test_no_retry_default() {
1856 let strategy = NoRetry::default();
1857 assert_eq!(strategy.next_delay(0, "error"), None);
1858 }
1859
1860 #[test]
1861 fn test_custom_retry_basic() {
1862 let strategy = custom_retry(|attempt, _error| {
1863 if attempt >= 3 {
1864 None
1865 } else {
1866 Some(Duration::from_seconds(10))
1867 }
1868 });
1869
1870 assert_eq!(
1871 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1872 Some(10)
1873 );
1874 assert_eq!(
1875 strategy.next_delay(1, "error").map(|d| d.to_seconds()),
1876 Some(10)
1877 );
1878 assert_eq!(
1879 strategy.next_delay(2, "error").map(|d| d.to_seconds()),
1880 Some(10)
1881 );
1882 assert_eq!(strategy.next_delay(3, "error"), None);
1883 }
1884
1885 #[test]
1886 fn test_custom_retry_error_based() {
1887 let strategy = custom_retry(|attempt, error| {
1888 if attempt >= 5 {
1889 return None;
1890 }
1891 if error.contains("transient") {
1892 Some(Duration::from_seconds(1))
1893 } else if error.contains("rate_limit") {
1894 Some(Duration::from_seconds(30))
1895 } else {
1896 None }
1898 });
1899
1900 assert_eq!(
1902 strategy
1903 .next_delay(0, "transient error")
1904 .map(|d| d.to_seconds()),
1905 Some(1)
1906 );
1907 assert_eq!(
1909 strategy
1910 .next_delay(0, "rate_limit exceeded")
1911 .map(|d| d.to_seconds()),
1912 Some(30)
1913 );
1914 assert_eq!(strategy.next_delay(0, "permanent failure"), None);
1916 }
1917
1918 #[test]
1919 fn test_retry_strategy_clone_box() {
1920 let exp: Box<dyn RetryStrategy> =
1922 Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
1923 let exp_clone = exp.clone_box();
1924 assert_eq!(
1925 exp.next_delay(0, "e").map(|d| d.to_seconds()),
1926 exp_clone.next_delay(0, "e").map(|d| d.to_seconds())
1927 );
1928
1929 let fixed: Box<dyn RetryStrategy> = Box::new(FixedDelay::new(3, Duration::from_seconds(5)));
1930 let fixed_clone = fixed.clone_box();
1931 assert_eq!(
1932 fixed.next_delay(0, "e").map(|d| d.to_seconds()),
1933 fixed_clone.next_delay(0, "e").map(|d| d.to_seconds())
1934 );
1935
1936 let linear: Box<dyn RetryStrategy> =
1937 Box::new(LinearBackoff::new(3, Duration::from_seconds(2)));
1938 let linear_clone = linear.clone_box();
1939 assert_eq!(
1940 linear.next_delay(0, "e").map(|d| d.to_seconds()),
1941 linear_clone.next_delay(0, "e").map(|d| d.to_seconds())
1942 );
1943
1944 let no_retry: Box<dyn RetryStrategy> = Box::new(NoRetry);
1945 let no_retry_clone = no_retry.clone_box();
1946 assert_eq!(
1947 no_retry.next_delay(0, "e"),
1948 no_retry_clone.next_delay(0, "e")
1949 );
1950 }
1951
1952 #[test]
1953 fn test_boxed_retry_strategy_clone() {
1954 let strategy: Box<dyn RetryStrategy> =
1956 Box::new(ExponentialBackoff::new(3, Duration::from_seconds(1)));
1957 let cloned = strategy.clone();
1958
1959 assert_eq!(
1960 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1961 cloned.next_delay(0, "error").map(|d| d.to_seconds())
1962 );
1963 }
1964
1965 #[test]
1966 fn test_step_config_with_retry_strategy() {
1967 let config = StepConfig {
1968 retry_strategy: Some(Box::new(ExponentialBackoff::new(
1969 3,
1970 Duration::from_seconds(1),
1971 ))),
1972 step_semantics: StepSemantics::AtLeastOncePerRetry,
1973 serdes: None,
1974 retryable_error_filter: None,
1975 };
1976
1977 assert!(config.retry_strategy.is_some());
1978 let strategy = config.retry_strategy.as_ref().unwrap();
1979 assert_eq!(
1980 strategy.next_delay(0, "error").map(|d| d.to_seconds()),
1981 Some(1)
1982 );
1983 }
1984
1985 #[test]
1986 fn test_retry_strategy_debug() {
1987 let exp = ExponentialBackoff::new(3, Duration::from_seconds(1));
1989 let debug_str = format!("{:?}", exp);
1990 assert!(debug_str.contains("ExponentialBackoff"));
1991
1992 let fixed = FixedDelay::new(3, Duration::from_seconds(5));
1993 let debug_str = format!("{:?}", fixed);
1994 assert!(debug_str.contains("FixedDelay"));
1995
1996 let linear = LinearBackoff::new(3, Duration::from_seconds(2));
1997 let debug_str = format!("{:?}", linear);
1998 assert!(debug_str.contains("LinearBackoff"));
1999
2000 let no_retry = NoRetry;
2001 let debug_str = format!("{:?}", no_retry);
2002 assert!(debug_str.contains("NoRetry"));
2003
2004 let custom = custom_retry(|_, _| None);
2005 let debug_str = format!("{:?}", custom);
2006 assert!(debug_str.contains("CustomRetry"));
2007 }
2008
2009 fn step_semantics_strategy() -> impl Strategy<Value = StepSemantics> {
2015 prop_oneof![
2016 Just(StepSemantics::AtMostOncePerRetry),
2017 Just(StepSemantics::AtLeastOncePerRetry),
2018 ]
2019 }
2020
2021 fn checkpointing_mode_strategy() -> impl Strategy<Value = CheckpointingMode> {
2023 prop_oneof![
2024 Just(CheckpointingMode::Eager),
2025 Just(CheckpointingMode::Batched),
2026 Just(CheckpointingMode::Optimistic),
2027 ]
2028 }
2029
2030 proptest! {
2031 #[test]
2036 fn prop_step_config_validity(semantics in step_semantics_strategy()) {
2037 let config = StepConfig {
2038 retry_strategy: None,
2039 step_semantics: semantics,
2040 serdes: None,
2041 retryable_error_filter: None,
2042 };
2043
2044 let _ = config.retry_strategy.is_none();
2046 let _ = config.step_semantics;
2047 let _ = config.serdes.is_none();
2048
2049 let debug_str = format!("{:?}", config);
2051 prop_assert!(!debug_str.is_empty());
2052 }
2053
2054 #[test]
2058 fn prop_callback_config_positive_timeout(
2059 timeout_secs in 1u64..=86400u64,
2060 heartbeat_secs in 1u64..=86400u64
2061 ) {
2062 let config = CallbackConfig {
2063 timeout: Duration::from_seconds(timeout_secs),
2064 heartbeat_timeout: Duration::from_seconds(heartbeat_secs),
2065 serdes: None,
2066 };
2067
2068 prop_assert_eq!(config.timeout.to_seconds(), timeout_secs);
2070 prop_assert_eq!(config.heartbeat_timeout.to_seconds(), heartbeat_secs);
2071
2072 let debug_str = format!("{:?}", config);
2074 prop_assert!(!debug_str.is_empty());
2075 }
2076
2077 #[test]
2081 fn prop_duration_conversion_roundtrip(seconds in 0u64..=u64::MAX / 2) {
2082 let original = Duration::from_seconds(seconds);
2083 let extracted = original.to_seconds();
2084 let reconstructed = Duration::from_seconds(extracted);
2085
2086 prop_assert_eq!(original, reconstructed);
2087 prop_assert_eq!(original.to_seconds(), reconstructed.to_seconds());
2088 }
2089
2090 #[test]
2096 fn prop_completion_config_consistency(
2097 min_successful in proptest::option::of(0usize..100),
2098 tolerated_count in proptest::option::of(0usize..100),
2099 tolerated_pct in proptest::option::of(0.0f64..=1.0f64)
2100 ) {
2101 let config = CompletionConfig {
2102 min_successful,
2103 tolerated_failure_count: tolerated_count,
2104 tolerated_failure_percentage: tolerated_pct,
2105 };
2106
2107 prop_assert_eq!(config.min_successful, min_successful);
2109 prop_assert_eq!(config.tolerated_failure_count, tolerated_count);
2110 prop_assert_eq!(config.tolerated_failure_percentage, tolerated_pct);
2111
2112 let serialized = serde_json::to_string(&config).unwrap();
2114 let deserialized: CompletionConfig = serde_json::from_str(&serialized).unwrap();
2115
2116 prop_assert_eq!(config.min_successful, deserialized.min_successful);
2117 prop_assert_eq!(config.tolerated_failure_count, deserialized.tolerated_failure_count);
2118 match (config.tolerated_failure_percentage, deserialized.tolerated_failure_percentage) {
2120 (Some(a), Some(b)) => prop_assert!((a - b).abs() < f64::EPSILON),
2121 (None, None) => {},
2122 _ => prop_assert!(false, "tolerated_failure_percentage mismatch"),
2123 }
2124 }
2125
2126 #[test]
2130 fn prop_checkpointing_mode_roundtrip(mode in checkpointing_mode_strategy()) {
2131 let serialized = serde_json::to_string(&mode).unwrap();
2132 let deserialized: CheckpointingMode = serde_json::from_str(&serialized).unwrap();
2133 prop_assert_eq!(mode, deserialized);
2134 }
2135
2136 #[test]
2140 fn prop_checkpointing_mode_classification(mode in checkpointing_mode_strategy()) {
2141 let eager = mode.is_eager();
2142 let batched = mode.is_batched();
2143 let optimistic = mode.is_optimistic();
2144
2145 let count = [eager, batched, optimistic].iter().filter(|&&x| x).count();
2147 prop_assert_eq!(count, 1, "Exactly one classification should be true");
2148
2149 match mode {
2151 CheckpointingMode::Eager => prop_assert!(eager),
2152 CheckpointingMode::Batched => prop_assert!(batched),
2153 CheckpointingMode::Optimistic => prop_assert!(optimistic),
2154 }
2155 }
2156
2157 #[test]
2161 fn prop_step_semantics_roundtrip(semantics in step_semantics_strategy()) {
2162 let serialized = serde_json::to_string(&semantics).unwrap();
2163 let deserialized: StepSemantics = serde_json::from_str(&serialized).unwrap();
2164 prop_assert_eq!(semantics, deserialized);
2165 }
2166
2167 #[test]
2171 fn prop_item_batcher_validity(
2172 max_items in 1usize..=10000,
2173 max_bytes in 1usize..=10_000_000
2174 ) {
2175 let batcher = ItemBatcher::new(max_items, max_bytes);
2176
2177 prop_assert_eq!(batcher.max_items_per_batch, max_items);
2178 prop_assert_eq!(batcher.max_bytes_per_batch, max_bytes);
2179
2180 let debug_str = format!("{:?}", batcher);
2182 prop_assert!(!debug_str.is_empty());
2183 }
2184
2185 #[test]
2189 fn prop_child_config_builder_consistency(replay_children in proptest::bool::ANY) {
2190 let config = ChildConfig::new().set_replay_children(replay_children);
2191
2192 prop_assert_eq!(config.replay_children, replay_children);
2193
2194 let debug_str = format!("{:?}", config);
2196 prop_assert!(!debug_str.is_empty());
2197 }
2198
2199 #[test]
2203 fn prop_map_config_validity(
2204 max_concurrency in proptest::option::of(1usize..=1000)
2205 ) {
2206 let config = MapConfig {
2207 max_concurrency,
2208 item_batcher: None,
2209 completion_config: CompletionConfig::default(),
2210 serdes: None,
2211 };
2212
2213 prop_assert_eq!(config.max_concurrency, max_concurrency);
2214
2215 let debug_str = format!("{:?}", config);
2217 prop_assert!(!debug_str.is_empty());
2218 }
2219
2220 #[test]
2224 fn prop_parallel_config_validity(
2225 max_concurrency in proptest::option::of(1usize..=1000)
2226 ) {
2227 let config = ParallelConfig {
2228 max_concurrency,
2229 completion_config: CompletionConfig::default(),
2230 serdes: None,
2231 };
2232
2233 prop_assert_eq!(config.max_concurrency, max_concurrency);
2234
2235 let debug_str = format!("{:?}", config);
2237 prop_assert!(!debug_str.is_empty());
2238 }
2239
2240 #[test]
2246 fn prop_item_batcher_configuration_respected(
2247 max_items in 1usize..=50,
2248 max_bytes in 100usize..=10000,
2249 item_count in 0usize..=200
2250 ) {
2251 let batcher = ItemBatcher::new(max_items, max_bytes);
2252
2253 let items: Vec<String> = (0..item_count)
2255 .map(|i| format!("item_{:04}", i))
2256 .collect();
2257
2258 let batches = batcher.batch(&items);
2259
2260 for (_, batch) in &batches {
2262 prop_assert!(
2263 batch.len() <= max_items,
2264 "Batch has {} items but max is {}",
2265 batch.len(),
2266 max_items
2267 );
2268 }
2269
2270 for (_, batch) in &batches {
2272 let batch_bytes: usize = batch.iter()
2273 .map(|item| serde_json::to_string(item).map(|s| s.len()).unwrap_or(0))
2274 .sum();
2275
2276 if batch.len() > 1 {
2279 prop_assert!(
2280 batch_bytes <= max_bytes,
2281 "Batch has {} bytes but max is {} (batch has {} items)",
2282 batch_bytes,
2283 max_bytes,
2284 batch.len()
2285 );
2286 }
2287 }
2288 }
2289
2290 #[test]
2295 fn prop_item_batcher_ordering_preservation(
2296 max_items in 1usize..=50,
2297 max_bytes in 100usize..=10000,
2298 item_count in 0usize..=200
2299 ) {
2300 let batcher = ItemBatcher::new(max_items, max_bytes);
2301
2302 let items: Vec<String> = (0..item_count)
2304 .map(|i| format!("item_{:04}", i))
2305 .collect();
2306
2307 let batches = batcher.batch(&items);
2308
2309 let reconstructed: Vec<String> = batches
2311 .into_iter()
2312 .flat_map(|(_, batch)| batch)
2313 .collect();
2314
2315 prop_assert_eq!(
2317 items.len(),
2318 reconstructed.len(),
2319 "Reconstructed list has different length: expected {}, got {}",
2320 items.len(),
2321 reconstructed.len()
2322 );
2323
2324 for (i, (original, reconstructed_item)) in items.iter().zip(reconstructed.iter()).enumerate() {
2325 prop_assert_eq!(
2326 original,
2327 reconstructed_item,
2328 "Item at index {} differs: expected '{}', got '{}'",
2329 i,
2330 original,
2331 reconstructed_item
2332 );
2333 }
2334 }
2335 }
2336
2337 #[test]
2342 fn test_jitter_strategy_none_returns_exact_delay() {
2343 let jitter = JitterStrategy::None;
2344 assert_eq!(jitter.apply(10.0, 0), 10.0);
2345 assert_eq!(jitter.apply(5.5, 3), 5.5);
2346 assert_eq!(jitter.apply(0.0, 0), 0.0);
2347 assert_eq!(jitter.apply(100.0, 99), 100.0);
2348 }
2349
2350 #[test]
2351 fn test_jitter_strategy_full_bounds() {
2352 let jitter = JitterStrategy::Full;
2353 for attempt in 0..20 {
2354 let result = jitter.apply(10.0, attempt);
2355 assert!(
2356 result >= 0.0 && result <= 10.0,
2357 "Full jitter for attempt {} produced {}, expected [0, 10]",
2358 attempt,
2359 result
2360 );
2361 }
2362 }
2363
2364 #[test]
2365 fn test_jitter_strategy_half_bounds() {
2366 let jitter = JitterStrategy::Half;
2367 for attempt in 0..20 {
2368 let result = jitter.apply(10.0, attempt);
2369 assert!(
2370 result >= 5.0 && result <= 10.0,
2371 "Half jitter for attempt {} produced {}, expected [5, 10]",
2372 attempt,
2373 result
2374 );
2375 }
2376 }
2377
2378 #[test]
2379 fn test_jitter_strategy_deterministic() {
2380 let full = JitterStrategy::Full;
2382 let r1 = full.apply(10.0, 5);
2383 let r2 = full.apply(10.0, 5);
2384 assert_eq!(r1, r2);
2385
2386 let half = JitterStrategy::Half;
2387 let r1 = half.apply(10.0, 5);
2388 let r2 = half.apply(10.0, 5);
2389 assert_eq!(r1, r2);
2390 }
2391
2392 #[test]
2393 fn test_jitter_strategy_zero_delay() {
2394 assert_eq!(JitterStrategy::Full.apply(0.0, 0), 0.0);
2396 assert_eq!(JitterStrategy::Half.apply(0.0, 0), 0.0);
2397 assert_eq!(JitterStrategy::None.apply(0.0, 0), 0.0);
2398 }
2399
2400 #[test]
2401 fn test_jitter_strategy_default_is_none() {
2402 assert_eq!(JitterStrategy::default(), JitterStrategy::None);
2403 }
2404
2405 #[test]
2410 fn test_exponential_backoff_with_full_jitter() {
2411 let strategy = ExponentialBackoff::builder()
2412 .max_attempts(5)
2413 .base_delay(Duration::from_seconds(5))
2414 .max_delay(Duration::from_seconds(60))
2415 .jitter(JitterStrategy::Full)
2416 .build();
2417
2418 for attempt in 0..5 {
2419 let delay = strategy.next_delay(attempt, "error");
2420 assert!(delay.is_some());
2421 let secs = delay.unwrap().to_seconds();
2422 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2424 }
2425 assert!(strategy.next_delay(5, "error").is_none());
2426 }
2427
2428 #[test]
2429 fn test_exponential_backoff_with_half_jitter() {
2430 let strategy = ExponentialBackoff::builder()
2431 .max_attempts(5)
2432 .base_delay(Duration::from_seconds(10))
2433 .max_delay(Duration::from_seconds(60))
2434 .jitter(JitterStrategy::Half)
2435 .build();
2436
2437 for attempt in 0..5 {
2438 let delay = strategy.next_delay(attempt, "error");
2439 assert!(delay.is_some());
2440 let secs = delay.unwrap().to_seconds();
2441 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2442 }
2443 }
2444
2445 #[test]
2446 fn test_exponential_backoff_no_jitter_unchanged() {
2447 let strategy = ExponentialBackoff::new(5, Duration::from_seconds(1));
2449 assert_eq!(strategy.jitter, JitterStrategy::None);
2450 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(1));
2451 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(2));
2452 assert_eq!(strategy.next_delay(2, "e").map(|d| d.to_seconds()), Some(4));
2453 }
2454
2455 #[test]
2456 fn test_fixed_delay_with_jitter() {
2457 let strategy =
2458 FixedDelay::new(3, Duration::from_seconds(10)).with_jitter(JitterStrategy::Full);
2459
2460 for attempt in 0..3 {
2461 let delay = strategy.next_delay(attempt, "error");
2462 assert!(delay.is_some());
2463 let secs = delay.unwrap().to_seconds();
2464 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2465 }
2466 assert!(strategy.next_delay(3, "error").is_none());
2467 }
2468
2469 #[test]
2470 fn test_fixed_delay_no_jitter_unchanged() {
2471 let strategy = FixedDelay::new(3, Duration::from_seconds(5));
2472 assert_eq!(strategy.jitter, JitterStrategy::None);
2473 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(5));
2474 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(5));
2475 }
2476
2477 #[test]
2478 fn test_linear_backoff_with_jitter() {
2479 let strategy =
2480 LinearBackoff::new(5, Duration::from_seconds(5)).with_jitter(JitterStrategy::Half);
2481
2482 for attempt in 0..5 {
2483 let delay = strategy.next_delay(attempt, "error");
2484 assert!(delay.is_some());
2485 let secs = delay.unwrap().to_seconds();
2486 assert!(secs >= 1, "Attempt {} delay {} < 1", attempt, secs);
2487 }
2488 assert!(strategy.next_delay(5, "error").is_none());
2489 }
2490
2491 #[test]
2492 fn test_linear_backoff_no_jitter_unchanged() {
2493 let strategy = LinearBackoff::new(5, Duration::from_seconds(2));
2494 assert_eq!(strategy.jitter, JitterStrategy::None);
2495 assert_eq!(strategy.next_delay(0, "e").map(|d| d.to_seconds()), Some(2));
2496 assert_eq!(strategy.next_delay(1, "e").map(|d| d.to_seconds()), Some(4));
2497 }
2498
2499 #[test]
2500 fn test_jitter_minimum_floor_all_strategies() {
2501 let exp = ExponentialBackoff::builder()
2503 .max_attempts(3)
2504 .base_delay(Duration::from_seconds(1))
2505 .jitter(JitterStrategy::Full)
2506 .build();
2507 for attempt in 0..3 {
2508 let secs = exp.next_delay(attempt, "e").unwrap().to_seconds();
2509 assert!(
2510 secs >= 1,
2511 "ExponentialBackoff attempt {} delay {} < 1",
2512 attempt,
2513 secs
2514 );
2515 }
2516
2517 let fixed = FixedDelay::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
2518 for attempt in 0..3 {
2519 let secs = fixed.next_delay(attempt, "e").unwrap().to_seconds();
2520 assert!(
2521 secs >= 1,
2522 "FixedDelay attempt {} delay {} < 1",
2523 attempt,
2524 secs
2525 );
2526 }
2527
2528 let linear =
2529 LinearBackoff::new(3, Duration::from_seconds(1)).with_jitter(JitterStrategy::Full);
2530 for attempt in 0..3 {
2531 let secs = linear.next_delay(attempt, "e").unwrap().to_seconds();
2532 assert!(
2533 secs >= 1,
2534 "LinearBackoff attempt {} delay {} < 1",
2535 attempt,
2536 secs
2537 );
2538 }
2539 }
2540
2541 fn jitter_strategy_strategy() -> impl Strategy<Value = JitterStrategy> {
2547 prop_oneof![
2548 Just(JitterStrategy::None),
2549 Just(JitterStrategy::Full),
2550 Just(JitterStrategy::Half),
2551 ]
2552 }
2553
2554 proptest! {
2555 #[test]
2559 fn prop_jitter_none_identity(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2560 let result = JitterStrategy::None.apply(delay, attempt);
2561 prop_assert!((result - delay).abs() < f64::EPSILON,
2562 "None jitter changed delay from {} to {}", delay, result);
2563 }
2564
2565 #[test]
2569 fn prop_jitter_full_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2570 let result = JitterStrategy::Full.apply(delay, attempt);
2571 prop_assert!(result >= 0.0, "Full jitter result {} < 0", result);
2572 prop_assert!(result <= delay + f64::EPSILON,
2573 "Full jitter result {} > delay {}", result, delay);
2574 }
2575
2576 #[test]
2580 fn prop_jitter_half_bounds(delay in 0.0f64..1000.0, attempt in 0u32..100) {
2581 let result = JitterStrategy::Half.apply(delay, attempt);
2582 prop_assert!(result >= delay / 2.0 - f64::EPSILON,
2583 "Half jitter result {} < delay/2 {}", result, delay / 2.0);
2584 prop_assert!(result <= delay + f64::EPSILON,
2585 "Half jitter result {} > delay {}", result, delay);
2586 }
2587
2588 #[test]
2592 fn prop_jitter_deterministic(
2593 jitter in jitter_strategy_strategy(),
2594 delay in 0.0f64..1000.0,
2595 attempt in 0u32..100
2596 ) {
2597 let r1 = jitter.apply(delay, attempt);
2598 let r2 = jitter.apply(delay, attempt);
2599 prop_assert!((r1 - r2).abs() < f64::EPSILON,
2600 "Jitter not deterministic: {} vs {}", r1, r2);
2601 }
2602
2603 #[test]
2607 fn prop_jitter_minimum_floor(
2608 jitter in jitter_strategy_strategy(),
2609 attempt in 0u32..10,
2610 base_delay_secs in 1u64..100
2611 ) {
2612 let exp = ExponentialBackoff::builder()
2614 .max_attempts(10)
2615 .base_delay(Duration::from_seconds(base_delay_secs))
2616 .jitter(jitter)
2617 .build();
2618 if let Some(d) = exp.next_delay(attempt, "e") {
2619 prop_assert!(d.to_seconds() >= 1,
2620 "ExponentialBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2621 }
2622
2623 let fixed = FixedDelay::new(10, Duration::from_seconds(base_delay_secs))
2625 .with_jitter(jitter);
2626 if let Some(d) = fixed.next_delay(attempt, "e") {
2627 prop_assert!(d.to_seconds() >= 1,
2628 "FixedDelay delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2629 }
2630
2631 let linear = LinearBackoff::new(10, Duration::from_seconds(base_delay_secs))
2633 .with_jitter(jitter);
2634 if let Some(d) = linear.next_delay(attempt, "e") {
2635 prop_assert!(d.to_seconds() >= 1,
2636 "LinearBackoff delay {} < 1 for attempt {}", d.to_seconds(), attempt);
2637 }
2638 }
2639 }
2640}
2641
2642#[cfg(test)]
2643mod retryable_error_filter_tests {
2644 use super::*;
2645
2646 #[test]
2647 fn test_empty_filter_retries_all() {
2648 let filter = RetryableErrorFilter::default();
2649 assert!(filter.is_retryable("any error message"));
2650 assert!(filter.is_retryable(""));
2651 assert!(filter.is_retryable("timeout"));
2652 assert!(filter.is_retryable_with_type("any error", "AnyType"));
2653 }
2654
2655 #[test]
2656 fn test_contains_pattern_matches_substring() {
2657 let filter = RetryableErrorFilter {
2658 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2659 error_types: vec![],
2660 };
2661 assert!(filter.is_retryable("request timeout occurred"));
2662 assert!(filter.is_retryable("timeout"));
2663 assert!(filter.is_retryable("a timeout happened"));
2664 }
2665
2666 #[test]
2667 fn test_contains_pattern_no_match() {
2668 let filter = RetryableErrorFilter {
2669 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2670 error_types: vec![],
2671 };
2672 assert!(!filter.is_retryable("connection refused"));
2673 assert!(!filter.is_retryable("invalid input"));
2674 assert!(!filter.is_retryable(""));
2675 }
2676
2677 #[test]
2678 fn test_regex_pattern_matches() {
2679 let filter = RetryableErrorFilter {
2680 patterns: vec![ErrorPattern::Regex(
2681 regex::Regex::new(r"(?i)connection.*refused").unwrap(),
2682 )],
2683 error_types: vec![],
2684 };
2685 assert!(filter.is_retryable("Connection was refused"));
2686 assert!(filter.is_retryable("connection refused"));
2687 assert!(filter.is_retryable("CONNECTION actively REFUSED"));
2688 }
2689
2690 #[test]
2691 fn test_regex_pattern_no_match() {
2692 let filter = RetryableErrorFilter {
2693 patterns: vec![ErrorPattern::Regex(
2694 regex::Regex::new(r"(?i)connection.*refused").unwrap(),
2695 )],
2696 error_types: vec![],
2697 };
2698 assert!(!filter.is_retryable("timeout error"));
2699 assert!(!filter.is_retryable("refused connection")); }
2701
2702 #[test]
2703 fn test_or_logic_multiple_patterns() {
2704 let filter = RetryableErrorFilter {
2705 patterns: vec![
2706 ErrorPattern::Contains("timeout".to_string()),
2707 ErrorPattern::Regex(regex::Regex::new(r"(?i)connection.*refused").unwrap()),
2708 ],
2709 error_types: vec![],
2710 };
2711 assert!(filter.is_retryable("request timeout"));
2713 assert!(filter.is_retryable("Connection refused"));
2715 assert!(!filter.is_retryable("invalid input"));
2717 }
2718
2719 #[test]
2720 fn test_error_type_matching() {
2721 let filter = RetryableErrorFilter {
2722 patterns: vec![],
2723 error_types: vec!["TransientError".to_string()],
2724 };
2725 assert!(!filter.is_retryable("some error"));
2727 assert!(filter.is_retryable_with_type("some error", "TransientError"));
2729 assert!(!filter.is_retryable_with_type("some error", "PermanentError"));
2730 }
2731
2732 #[test]
2733 fn test_or_logic_patterns_and_types() {
2734 let filter = RetryableErrorFilter {
2735 patterns: vec![ErrorPattern::Contains("timeout".to_string())],
2736 error_types: vec!["TransientError".to_string()],
2737 };
2738 assert!(filter.is_retryable_with_type("request timeout", "PermanentError"));
2740 assert!(filter.is_retryable_with_type("invalid input", "TransientError"));
2742 assert!(filter.is_retryable_with_type("request timeout", "TransientError"));
2744 assert!(!filter.is_retryable_with_type("invalid input", "PermanentError"));
2746 }
2747
2748 #[test]
2749 fn test_error_pattern_debug() {
2750 let contains = ErrorPattern::Contains("test".to_string());
2751 let debug_str = format!("{:?}", contains);
2752 assert!(debug_str.contains("Contains"));
2753 assert!(debug_str.contains("test"));
2754
2755 let regex = ErrorPattern::Regex(regex::Regex::new(r"\d+").unwrap());
2756 let debug_str = format!("{:?}", regex);
2757 assert!(debug_str.contains("Regex"));
2758 }
2759
2760 #[test]
2761 fn test_retryable_error_filter_clone() {
2762 let filter = RetryableErrorFilter {
2763 patterns: vec![
2764 ErrorPattern::Contains("timeout".to_string()),
2765 ErrorPattern::Regex(regex::Regex::new(r"err\d+").unwrap()),
2766 ],
2767 error_types: vec!["TransientError".to_string()],
2768 };
2769 let cloned = filter.clone();
2770 assert!(cloned.is_retryable("timeout error"));
2771 assert!(cloned.is_retryable("err42"));
2772 assert!(cloned.is_retryable_with_type("x", "TransientError"));
2773 }
2774
2775 #[test]
2781 fn test_wait_decision_done_when_predicate_false() {
2782 let strategy = create_wait_strategy(WaitStrategyConfig {
2784 max_attempts: Some(10),
2785 initial_delay: Duration::from_seconds(5),
2786 max_delay: Duration::from_seconds(300),
2787 backoff_rate: 1.5,
2788 jitter: JitterStrategy::None,
2789 should_continue_polling: Box::new(|state: &String| state != "COMPLETED"),
2790 });
2791
2792 let decision = strategy(&"COMPLETED".to_string(), 1);
2794 assert_eq!(decision, WaitDecision::Done);
2795 }
2796
2797 #[test]
2798 fn test_wait_decision_continue_with_backoff() {
2799 let strategy = create_wait_strategy(WaitStrategyConfig {
2801 max_attempts: Some(10),
2802 initial_delay: Duration::from_seconds(5),
2803 max_delay: Duration::from_seconds(300),
2804 backoff_rate: 2.0,
2805 jitter: JitterStrategy::None,
2806 should_continue_polling: Box::new(|state: &String| state != "DONE"),
2807 });
2808
2809 let decision = strategy(&"PENDING".to_string(), 1);
2811 assert_eq!(
2812 decision,
2813 WaitDecision::Continue {
2814 delay: Duration::from_seconds(5)
2815 }
2816 );
2817
2818 let decision = strategy(&"PENDING".to_string(), 2);
2820 assert_eq!(
2821 decision,
2822 WaitDecision::Continue {
2823 delay: Duration::from_seconds(10)
2824 }
2825 );
2826
2827 let decision = strategy(&"PENDING".to_string(), 3);
2829 assert_eq!(
2830 decision,
2831 WaitDecision::Continue {
2832 delay: Duration::from_seconds(20)
2833 }
2834 );
2835 }
2836
2837 #[test]
2838 fn test_wait_strategy_delay_capped_at_max() {
2839 let strategy = create_wait_strategy(WaitStrategyConfig {
2841 max_attempts: Some(20),
2842 initial_delay: Duration::from_seconds(10),
2843 max_delay: Duration::from_seconds(30),
2844 backoff_rate: 2.0,
2845 jitter: JitterStrategy::None,
2846 should_continue_polling: Box::new(|_: &i32| true),
2847 });
2848
2849 let decision = strategy(&0, 3);
2851 assert_eq!(
2852 decision,
2853 WaitDecision::Continue {
2854 delay: Duration::from_seconds(30)
2855 }
2856 );
2857
2858 let decision = strategy(&0, 5);
2860 assert_eq!(
2861 decision,
2862 WaitDecision::Continue {
2863 delay: Duration::from_seconds(30)
2864 }
2865 );
2866 }
2867
2868 #[test]
2869 #[should_panic(expected = "waitForCondition exceeded maximum attempts")]
2870 fn test_wait_strategy_max_attempts_panic() {
2871 let strategy = create_wait_strategy(WaitStrategyConfig {
2873 max_attempts: Some(3),
2874 initial_delay: Duration::from_seconds(5),
2875 max_delay: Duration::from_seconds(300),
2876 backoff_rate: 1.5,
2877 jitter: JitterStrategy::None,
2878 should_continue_polling: Box::new(|_: &i32| true),
2879 });
2880
2881 let _ = strategy(&0, 3);
2883 }
2884
2885 #[test]
2886 fn test_wait_strategy_jitter_application() {
2887 let strategy = create_wait_strategy(WaitStrategyConfig {
2889 max_attempts: Some(10),
2890 initial_delay: Duration::from_seconds(10),
2891 max_delay: Duration::from_seconds(300),
2892 backoff_rate: 1.0,
2893 jitter: JitterStrategy::Full,
2894 should_continue_polling: Box::new(|_: &i32| true),
2895 });
2896
2897 let decision = strategy(&0, 1);
2900 match decision {
2901 WaitDecision::Continue { delay } => {
2902 assert!(
2903 delay.to_seconds() >= 1 && delay.to_seconds() <= 10,
2904 "Jittered delay {} should be in [1, 10]",
2905 delay.to_seconds()
2906 );
2907 }
2908 WaitDecision::Done => panic!("Expected Continue, got Done"),
2909 }
2910 }
2911
2912 #[test]
2913 fn test_wait_strategy_delay_minimum_floor() {
2914 let strategy = create_wait_strategy(WaitStrategyConfig {
2916 max_attempts: Some(10),
2917 initial_delay: Duration::from_seconds(1),
2918 max_delay: Duration::from_seconds(300),
2919 backoff_rate: 1.0,
2920 jitter: JitterStrategy::Full,
2921 should_continue_polling: Box::new(|_: &i32| true),
2922 });
2923
2924 let decision = strategy(&0, 1);
2926 match decision {
2927 WaitDecision::Continue { delay } => {
2928 assert!(
2929 delay.to_seconds() >= 1,
2930 "Delay {} should be at least 1 second",
2931 delay.to_seconds()
2932 );
2933 }
2934 WaitDecision::Done => panic!("Expected Continue, got Done"),
2935 }
2936 }
2937
2938 #[test]
2939 fn test_wait_strategy_default_max_attempts() {
2940 let strategy = create_wait_strategy(WaitStrategyConfig {
2942 max_attempts: None, initial_delay: Duration::from_seconds(1),
2944 max_delay: Duration::from_seconds(10),
2945 backoff_rate: 1.0,
2946 jitter: JitterStrategy::None,
2947 should_continue_polling: Box::new(|_: &i32| true),
2948 });
2949
2950 let decision = strategy(&0, 59);
2952 assert!(matches!(decision, WaitDecision::Continue { .. }));
2953 }
2954
2955 #[test]
2956 #[should_panic(expected = "waitForCondition exceeded maximum attempts")]
2957 fn test_wait_strategy_default_max_attempts_panic() {
2958 let strategy = create_wait_strategy(WaitStrategyConfig {
2960 max_attempts: None, initial_delay: Duration::from_seconds(1),
2962 max_delay: Duration::from_seconds(10),
2963 backoff_rate: 1.0,
2964 jitter: JitterStrategy::None,
2965 should_continue_polling: Box::new(|_: &i32| true),
2966 });
2967
2968 let _ = strategy(&0, 60);
2970 }
2971
2972 #[test]
2973 fn test_wait_decision_enum_variants() {
2974 let cont = WaitDecision::Continue {
2976 delay: Duration::from_seconds(5),
2977 };
2978 let done = WaitDecision::Done;
2979
2980 assert!(format!("{:?}", cont).contains("Continue"));
2982 assert!(format!("{:?}", done).contains("Done"));
2983
2984 assert_eq!(
2986 WaitDecision::Continue {
2987 delay: Duration::from_seconds(5)
2988 },
2989 WaitDecision::Continue {
2990 delay: Duration::from_seconds(5)
2991 }
2992 );
2993 assert_ne!(
2994 WaitDecision::Continue {
2995 delay: Duration::from_seconds(5)
2996 },
2997 WaitDecision::Done
2998 );
2999 }
3000}