1#![allow(clippy::module_name_repetitions)]
12
13use std::time::Duration;
14
15use rand::Rng;
16
17use crate::ant_protocol::CLOSE_GROUP_SIZE;
18
19pub const K_BUCKET_SIZE: usize = 20;
25
26pub const STORAGE_ADMISSION_MARGIN: usize = 2;
33
34pub const QUORUM_THRESHOLD: usize = 4; pub const PAID_LIST_CLOSE_GROUP_SIZE: usize = 20;
42
43pub const NEIGHBOR_SYNC_SCOPE: usize = 20;
45
46pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4;
49
50#[must_use]
53pub const fn storage_admission_width(close_group_size: usize) -> usize {
54 close_group_size.saturating_add(STORAGE_ADMISSION_MARGIN)
55}
56
57const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60;
60const NEIGHBOR_SYNC_INTERVAL_MAX_SECS: u64 = 20 * 60;
62
63pub const NEIGHBOR_SYNC_INTERVAL_MIN: Duration =
65 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MIN_SECS);
66
67pub const NEIGHBOR_SYNC_INTERVAL_MAX: Duration =
69 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MAX_SECS);
70
71const NEIGHBOR_SYNC_COOLDOWN_SECS: u64 = 60 * 60; pub const NEIGHBOR_SYNC_COOLDOWN: Duration = Duration::from_secs(NEIGHBOR_SYNC_COOLDOWN_SECS);
75
76const REPAIR_HINT_MIN_AGE_SECS: u64 = 60 * 60; pub const REPAIR_HINT_MIN_AGE: Duration = Duration::from_secs(REPAIR_HINT_MIN_AGE_SECS);
82
83const SELF_LOOKUP_INTERVAL_MIN_SECS: u64 = 5 * 60;
85const SELF_LOOKUP_INTERVAL_MAX_SECS: u64 = 10 * 60;
87
88pub const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MIN_SECS);
91
92pub const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MAX_SECS);
94
95pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3;
103
104const AVAILABLE_PARALLELISM_FALLBACK: usize = 4;
109
110#[allow(clippy::incompatible_msrv)] pub fn max_parallel_fetch() -> usize {
114 std::thread::available_parallelism()
115 .map_or(AVAILABLE_PARALLELISM_FALLBACK, std::num::NonZero::get)
116}
117
118const AUDIT_TICK_INTERVAL_MIN_SECS: u64 = 10 * 60;
120const AUDIT_TICK_INTERVAL_MAX_SECS: u64 = 20 * 60;
122
123pub const AUDIT_TICK_INTERVAL_MIN: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MIN_SECS);
125
126pub const AUDIT_TICK_INTERVAL_MAX: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MAX_SECS);
128
129const AUDIT_RESPONSE_BASE_SECS: u64 = 10;
131const AUDIT_RESPONSE_PER_KEY_MS: u64 = 20;
133
134const BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS: u64 = 24 * 60 * 60; pub const BOOTSTRAP_CLAIM_GRACE_PERIOD: Duration =
138 Duration::from_secs(BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS);
139
140const PRUNE_HYSTERESIS_DURATION_SECS: u64 = 3 * 24 * 60 * 60; pub const PRUNE_HYSTERESIS_DURATION: Duration = Duration::from_secs(PRUNE_HYSTERESIS_DURATION_SECS);
144
145pub const REPLICATION_PROTOCOL_ID: &str = "autonomi.ant.replication.v1";
147
148const REPLICATION_MESSAGE_SIZE_MIB: usize = 10;
150pub const MAX_REPLICATION_MESSAGE_SIZE: usize = REPLICATION_MESSAGE_SIZE_MIB * 1024 * 1024;
152
153const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15;
155pub const VERIFICATION_REQUEST_TIMEOUT: Duration =
157 Duration::from_secs(VERIFICATION_REQUEST_TIMEOUT_SECS);
158
159const FETCH_REQUEST_TIMEOUT_SECS: u64 = 30;
161pub const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_secs(FETCH_REQUEST_TIMEOUT_SECS);
163
164const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60;
166pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS);
168
169pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0;
171
172pub const MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS: usize = 64;
174
175const BOOTSTRAP_COMPLETE_TIMEOUT_SECS: u64 = 60;
178
179#[derive(Debug, Clone)]
187pub struct ReplicationConfig {
188 pub close_group_size: usize,
190 pub quorum_threshold: usize,
192 pub paid_list_close_group_size: usize,
194 pub neighbor_sync_scope: usize,
196 pub neighbor_sync_peer_count: usize,
198 pub neighbor_sync_interval_min: Duration,
200 pub neighbor_sync_interval_max: Duration,
202 pub neighbor_sync_cooldown: Duration,
204 pub self_lookup_interval_min: Duration,
206 pub self_lookup_interval_max: Duration,
208 pub audit_tick_interval_min: Duration,
210 pub audit_tick_interval_max: Duration,
212 pub audit_response_base: Duration,
214 pub audit_response_per_key: Duration,
216 pub bootstrap_claim_grace_period: Duration,
218 pub prune_hysteresis_duration: Duration,
220 pub verification_request_timeout: Duration,
222 pub fetch_request_timeout: Duration,
224 pub bootstrap_complete_timeout_secs: u64,
227}
228
229impl Default for ReplicationConfig {
230 fn default() -> Self {
231 Self {
232 close_group_size: CLOSE_GROUP_SIZE,
233 quorum_threshold: QUORUM_THRESHOLD,
234 paid_list_close_group_size: PAID_LIST_CLOSE_GROUP_SIZE,
235 neighbor_sync_scope: NEIGHBOR_SYNC_SCOPE,
236 neighbor_sync_peer_count: NEIGHBOR_SYNC_PEER_COUNT,
237 neighbor_sync_interval_min: NEIGHBOR_SYNC_INTERVAL_MIN,
238 neighbor_sync_interval_max: NEIGHBOR_SYNC_INTERVAL_MAX,
239 neighbor_sync_cooldown: NEIGHBOR_SYNC_COOLDOWN,
240 self_lookup_interval_min: SELF_LOOKUP_INTERVAL_MIN,
241 self_lookup_interval_max: SELF_LOOKUP_INTERVAL_MAX,
242 audit_tick_interval_min: AUDIT_TICK_INTERVAL_MIN,
243 audit_tick_interval_max: AUDIT_TICK_INTERVAL_MAX,
244 audit_response_base: Duration::from_secs(AUDIT_RESPONSE_BASE_SECS),
245 audit_response_per_key: Duration::from_millis(AUDIT_RESPONSE_PER_KEY_MS),
246 bootstrap_claim_grace_period: BOOTSTRAP_CLAIM_GRACE_PERIOD,
247 prune_hysteresis_duration: PRUNE_HYSTERESIS_DURATION,
248 verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT,
249 fetch_request_timeout: FETCH_REQUEST_TIMEOUT,
250 bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS,
251 }
252 }
253}
254
255impl ReplicationConfig {
256 pub fn validate(&self) -> Result<(), String> {
264 if self.close_group_size == 0 {
265 return Err("close_group_size must be >= 1".to_string());
266 }
267 if self.quorum_threshold == 0 || self.quorum_threshold > self.close_group_size {
268 return Err(format!(
269 "quorum_threshold ({}) must satisfy 1 <= quorum_threshold <= close_group_size ({})",
270 self.quorum_threshold, self.close_group_size,
271 ));
272 }
273 if self.close_group_size > MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS {
274 return Err(format!(
275 "close_group_size ({}) must be <= MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS ({})",
276 self.close_group_size, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
277 ));
278 }
279 if self.paid_list_close_group_size == 0 {
280 return Err("paid_list_close_group_size must be >= 1".to_string());
281 }
282 if self.neighbor_sync_interval_min > self.neighbor_sync_interval_max {
283 return Err(format!(
284 "neighbor_sync_interval_min ({:?}) must be <= neighbor_sync_interval_max ({:?})",
285 self.neighbor_sync_interval_min, self.neighbor_sync_interval_max,
286 ));
287 }
288 if self.audit_tick_interval_min > self.audit_tick_interval_max {
289 return Err(format!(
290 "audit_tick_interval_min ({:?}) must be <= audit_tick_interval_max ({:?})",
291 self.audit_tick_interval_min, self.audit_tick_interval_max,
292 ));
293 }
294 if self.self_lookup_interval_min > self.self_lookup_interval_max {
295 return Err(format!(
296 "self_lookup_interval_min ({:?}) must be <= self_lookup_interval_max ({:?})",
297 self.self_lookup_interval_min, self.self_lookup_interval_max,
298 ));
299 }
300 if self.neighbor_sync_peer_count == 0 {
301 return Err("neighbor_sync_peer_count must be >= 1".to_string());
302 }
303 if self.neighbor_sync_scope == 0 {
304 return Err("neighbor_sync_scope must be >= 1".to_string());
305 }
306 if self.neighbor_sync_scope > K_BUCKET_SIZE {
307 return Err(format!(
308 "neighbor_sync_scope ({}) must be <= K_BUCKET_SIZE ({})",
309 self.neighbor_sync_scope, K_BUCKET_SIZE,
310 ));
311 }
312 Ok(())
313 }
314
315 #[must_use]
320 pub fn quorum_needed(&self, quorum_targets_count: usize) -> usize {
321 if quorum_targets_count == 0 {
322 return 0;
323 }
324 let majority = quorum_targets_count / 2 + 1;
325 self.quorum_threshold.min(majority)
326 }
327
328 #[must_use]
333 pub fn confirm_needed(paid_group_size: usize) -> usize {
334 paid_group_size / 2 + 1
335 }
336
337 #[must_use]
340 pub fn random_neighbor_sync_interval(&self) -> Duration {
341 random_duration_in_range(
342 self.neighbor_sync_interval_min,
343 self.neighbor_sync_interval_max,
344 )
345 }
346
347 #[must_use]
352 pub fn audit_sample_count(total_keys: usize) -> usize {
353 #[allow(
354 clippy::cast_possible_truncation,
355 clippy::cast_sign_loss,
356 clippy::cast_precision_loss
357 )]
358 let sqrt = (total_keys as f64).sqrt() as usize;
359 sqrt.max(1).min(total_keys)
360 }
361
362 #[must_use]
368 pub fn max_incoming_audit_keys(stored_chunks: usize) -> usize {
369 (2 * Self::audit_sample_count(stored_chunks)).max(1)
371 }
372
373 #[must_use]
376 pub fn audit_response_timeout(&self, challenged_key_count: usize) -> Duration {
377 let keys = u32::try_from(challenged_key_count).unwrap_or(u32::MAX);
378 self.audit_response_base + self.audit_response_per_key * keys
379 }
380
381 #[must_use]
384 pub fn random_audit_tick_interval(&self) -> Duration {
385 random_duration_in_range(self.audit_tick_interval_min, self.audit_tick_interval_max)
386 }
387
388 #[must_use]
391 pub fn random_self_lookup_interval(&self) -> Duration {
392 random_duration_in_range(self.self_lookup_interval_min, self.self_lookup_interval_max)
393 }
394}
395
396fn random_duration_in_range(min: Duration, max: Duration) -> Duration {
401 if min == max {
402 return min;
403 }
404 let to_u64_millis = |d: Duration| -> u64 { u64::try_from(d.as_millis()).unwrap_or(u64::MAX) };
407 let chosen = rand::thread_rng().gen_range(to_u64_millis(min)..=to_u64_millis(max));
408 Duration::from_millis(chosen)
409}
410
411#[cfg(test)]
416#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn defaults_pass_validation() {
422 let config = ReplicationConfig::default();
423 assert!(config.validate().is_ok(), "default config must be valid");
424 }
425
426 #[test]
427 fn default_prune_hysteresis_is_three_days() {
428 let config = ReplicationConfig::default();
429 assert_eq!(
430 config.prune_hysteresis_duration,
431 Duration::from_secs(3 * 24 * 60 * 60)
432 );
433 }
434
435 #[test]
436 fn storage_admission_width_adds_margin() {
437 const TEST_CLOSE_GROUP_SIZE: usize = 7;
438
439 assert_eq!(
440 storage_admission_width(TEST_CLOSE_GROUP_SIZE),
441 TEST_CLOSE_GROUP_SIZE + STORAGE_ADMISSION_MARGIN
442 );
443 assert_eq!(storage_admission_width(usize::MAX), usize::MAX);
444 }
445
446 #[test]
447 fn audit_failure_weight_is_five() {
448 assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON);
449 }
450
451 #[test]
452 fn quorum_threshold_zero_rejected() {
453 let config = ReplicationConfig {
454 quorum_threshold: 0,
455 ..ReplicationConfig::default()
456 };
457 assert!(config.validate().is_err());
458 }
459
460 #[test]
461 fn quorum_threshold_exceeds_close_group_rejected() {
462 let defaults = ReplicationConfig::default();
463 let config = ReplicationConfig {
464 quorum_threshold: defaults.close_group_size + 1,
465 ..defaults
466 };
467 assert!(config.validate().is_err());
468 }
469
470 #[test]
471 fn close_group_size_zero_rejected() {
472 let config = ReplicationConfig {
473 close_group_size: 0,
474 ..ReplicationConfig::default()
475 };
476 assert!(config.validate().is_err());
477 }
478
479 #[test]
480 fn close_group_size_exceeding_prune_audit_budget_rejected() {
481 let config = ReplicationConfig {
482 close_group_size: MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS + 1,
483 quorum_threshold: QUORUM_THRESHOLD,
484 ..ReplicationConfig::default()
485 };
486
487 let err = config.validate().unwrap_err();
488
489 assert!(
490 err.contains("MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS"),
491 "error should mention prune audit budget: {err}"
492 );
493 }
494
495 #[test]
496 fn paid_list_close_group_size_zero_rejected() {
497 let config = ReplicationConfig {
498 paid_list_close_group_size: 0,
499 ..ReplicationConfig::default()
500 };
501 assert!(config.validate().is_err());
502 }
503
504 #[test]
505 fn neighbor_sync_interval_inverted_rejected() {
506 let config = ReplicationConfig {
507 neighbor_sync_interval_min: Duration::from_secs(100),
508 neighbor_sync_interval_max: Duration::from_secs(50),
509 ..ReplicationConfig::default()
510 };
511 assert!(config.validate().is_err());
512 }
513
514 #[test]
515 fn audit_tick_interval_inverted_rejected() {
516 let config = ReplicationConfig {
517 audit_tick_interval_min: Duration::from_secs(100),
518 audit_tick_interval_max: Duration::from_secs(50),
519 ..ReplicationConfig::default()
520 };
521 assert!(config.validate().is_err());
522 }
523
524 #[test]
525 fn self_lookup_interval_inverted_rejected() {
526 let config = ReplicationConfig {
527 self_lookup_interval_min: Duration::from_secs(100),
528 self_lookup_interval_max: Duration::from_secs(50),
529 ..ReplicationConfig::default()
530 };
531 assert!(config.validate().is_err());
532 }
533
534 #[test]
535 fn neighbor_sync_peer_count_zero_rejected() {
536 let config = ReplicationConfig {
537 neighbor_sync_peer_count: 0,
538 ..ReplicationConfig::default()
539 };
540 assert!(config.validate().is_err());
541 }
542
543 #[test]
544 fn neighbor_sync_scope_exceeding_k_bucket_size_rejected() {
545 let config = ReplicationConfig {
546 neighbor_sync_scope: K_BUCKET_SIZE + 1,
547 ..ReplicationConfig::default()
548 };
549 assert!(config.validate().is_err());
550 }
551
552 #[test]
553 fn audit_sample_count_scales_with_sqrt() {
554 assert_eq!(ReplicationConfig::audit_sample_count(0), 0);
556
557 assert_eq!(ReplicationConfig::audit_sample_count(1), 1);
559
560 assert_eq!(ReplicationConfig::audit_sample_count(3), 1);
562
563 assert_eq!(ReplicationConfig::audit_sample_count(4), 2);
565 assert_eq!(ReplicationConfig::audit_sample_count(25), 5);
566 assert_eq!(ReplicationConfig::audit_sample_count(100), 10);
567 assert_eq!(ReplicationConfig::audit_sample_count(1_000), 31);
568 assert_eq!(ReplicationConfig::audit_sample_count(10_000), 100);
569 assert_eq!(ReplicationConfig::audit_sample_count(1_000_000), 1_000);
570 }
571
572 #[test]
573 fn max_incoming_audit_keys_scales_dynamically() {
574 assert_eq!(ReplicationConfig::max_incoming_audit_keys(0), 1);
576
577 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1), 2);
579
580 assert_eq!(ReplicationConfig::max_incoming_audit_keys(100), 20);
582
583 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1_000_000), 2_000);
585
586 assert_eq!(ReplicationConfig::max_incoming_audit_keys(5_000_000), 4_472);
588 }
589
590 #[test]
591 fn quorum_needed_uses_smaller_of_threshold_and_majority() {
592 let config = ReplicationConfig::default();
593
594 assert_eq!(config.quorum_needed(7), 4);
596
597 assert_eq!(config.quorum_needed(3), 2);
599
600 assert_eq!(config.quorum_needed(0), 0);
602
603 assert_eq!(config.quorum_needed(100), 4);
605 }
606
607 #[test]
608 fn confirm_needed_is_strict_majority() {
609 assert_eq!(ReplicationConfig::confirm_needed(1), 1);
610 assert_eq!(ReplicationConfig::confirm_needed(2), 2);
611 assert_eq!(ReplicationConfig::confirm_needed(3), 2);
612 assert_eq!(ReplicationConfig::confirm_needed(4), 3);
613 assert_eq!(ReplicationConfig::confirm_needed(20), 11);
614 }
615
616 #[test]
617 fn random_intervals_within_bounds() {
618 let config = ReplicationConfig::default();
619
620 let iterations = 50;
622 for _ in 0..iterations {
623 let ns = config.random_neighbor_sync_interval();
624 assert!(ns >= config.neighbor_sync_interval_min);
625 assert!(ns <= config.neighbor_sync_interval_max);
626
627 let at = config.random_audit_tick_interval();
628 assert!(at >= config.audit_tick_interval_min);
629 assert!(at <= config.audit_tick_interval_max);
630
631 let sl = config.random_self_lookup_interval();
632 assert!(sl >= config.self_lookup_interval_min);
633 assert!(sl <= config.self_lookup_interval_max);
634 }
635 }
636
637 #[test]
638 fn random_interval_equal_bounds_is_deterministic() {
639 let fixed = Duration::from_secs(42);
640 let config = ReplicationConfig {
641 neighbor_sync_interval_min: fixed,
642 neighbor_sync_interval_max: fixed,
643 ..ReplicationConfig::default()
644 };
645 assert_eq!(config.random_neighbor_sync_interval(), fixed);
646 }
647
648 #[test]
654 fn scenario_18_invalid_config_rejected() {
655 let config = ReplicationConfig {
657 quorum_threshold: 10,
658 close_group_size: 7,
659 ..ReplicationConfig::default()
660 };
661 let err = config.validate().unwrap_err();
662 assert!(
663 err.contains("quorum_threshold"),
664 "error should mention quorum_threshold: {err}"
665 );
666
667 let config = ReplicationConfig {
669 close_group_size: 0,
670 ..ReplicationConfig::default()
671 };
672 let err = config.validate().unwrap_err();
673 assert!(
674 err.contains("close_group_size"),
675 "error should mention close_group_size: {err}"
676 );
677
678 let config = ReplicationConfig {
680 neighbor_sync_interval_min: Duration::from_secs(200),
681 neighbor_sync_interval_max: Duration::from_secs(100),
682 ..ReplicationConfig::default()
683 };
684 let err = config.validate().unwrap_err();
685 assert!(
686 err.contains("neighbor_sync_interval"),
687 "error should mention neighbor_sync_interval: {err}"
688 );
689
690 let config = ReplicationConfig {
692 self_lookup_interval_min: Duration::from_secs(999),
693 self_lookup_interval_max: Duration::from_secs(1),
694 ..ReplicationConfig::default()
695 };
696 let err = config.validate().unwrap_err();
697 assert!(
698 err.contains("self_lookup_interval"),
699 "error should mention self_lookup_interval: {err}"
700 );
701
702 let config = ReplicationConfig {
704 audit_tick_interval_min: Duration::from_secs(500),
705 audit_tick_interval_max: Duration::from_secs(10),
706 ..ReplicationConfig::default()
707 };
708 let err = config.validate().unwrap_err();
709 assert!(
710 err.contains("audit_tick_interval"),
711 "error should mention audit_tick_interval: {err}"
712 );
713 }
714
715 #[test]
718 fn scenario_26_dynamic_paid_threshold_undersized() {
719 assert_eq!(ReplicationConfig::confirm_needed(8), 5, "floor(8/2)+1 = 5");
720
721 assert_eq!(
723 ReplicationConfig::confirm_needed(1),
724 1,
725 "single peer requires 1 confirmation"
726 );
727 assert_eq!(
728 ReplicationConfig::confirm_needed(2),
729 2,
730 "2 peers require 2 confirmations"
731 );
732 assert_eq!(
733 ReplicationConfig::confirm_needed(3),
734 2,
735 "3 peers require 2 confirmations"
736 );
737 assert_eq!(
738 ReplicationConfig::confirm_needed(0),
739 1,
740 "0 peers yields floor(0/2)+1 = 1 (degenerate case)"
741 );
742 }
743
744 #[test]
748 fn scenario_31_audit_cadence_within_jitter_bounds() {
749 let config = ReplicationConfig {
750 audit_tick_interval_min: Duration::from_secs(600),
751 audit_tick_interval_max: Duration::from_secs(1200),
752 ..ReplicationConfig::default()
753 };
754
755 let iterations = 100;
757 let mut saw_different = false;
758 let mut prev = Duration::ZERO;
759
760 for _ in 0..iterations {
761 let interval = config.random_audit_tick_interval();
762 assert!(
763 interval >= config.audit_tick_interval_min,
764 "interval {interval:?} below min {:?}",
765 config.audit_tick_interval_min,
766 );
767 assert!(
768 interval <= config.audit_tick_interval_max,
769 "interval {interval:?} above max {:?}",
770 config.audit_tick_interval_max,
771 );
772 if interval != prev && prev != Duration::ZERO {
773 saw_different = true;
774 }
775 prev = interval;
776 }
777
778 assert!(
781 saw_different,
782 "audit intervals should exhibit randomized jitter across samples"
783 );
784 }
785}