1#![allow(clippy::module_name_repetitions)]
12
13use std::time::Duration;
14
15use rand::Rng;
16
17use crate::ant_protocol::CLOSE_GROUP_SIZE;
18
19pub const K_BUCKET_SIZE: usize = 20;
25
26pub const QUORUM_THRESHOLD: usize = 4; pub const PAID_LIST_CLOSE_GROUP_SIZE: usize = 20;
34
35pub const NEIGHBOR_SYNC_SCOPE: usize = 20;
37
38pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4;
41
42const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60;
45const NEIGHBOR_SYNC_INTERVAL_MAX_SECS: u64 = 20 * 60;
47
48pub const NEIGHBOR_SYNC_INTERVAL_MIN: Duration =
50 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MIN_SECS);
51
52pub const NEIGHBOR_SYNC_INTERVAL_MAX: Duration =
54 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MAX_SECS);
55
56const NEIGHBOR_SYNC_COOLDOWN_SECS: u64 = 60 * 60; pub const NEIGHBOR_SYNC_COOLDOWN: Duration = Duration::from_secs(NEIGHBOR_SYNC_COOLDOWN_SECS);
60
61const SELF_LOOKUP_INTERVAL_MIN_SECS: u64 = 5 * 60;
63const SELF_LOOKUP_INTERVAL_MAX_SECS: u64 = 10 * 60;
65
66pub const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MIN_SECS);
69
70pub const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MAX_SECS);
72
73pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3;
81
82const AVAILABLE_PARALLELISM_FALLBACK: usize = 4;
87
88#[allow(clippy::incompatible_msrv)] pub fn max_parallel_fetch() -> usize {
92 std::thread::available_parallelism()
93 .map_or(AVAILABLE_PARALLELISM_FALLBACK, std::num::NonZero::get)
94}
95
96const AUDIT_TICK_INTERVAL_MIN_SECS: u64 = 10 * 60;
98const AUDIT_TICK_INTERVAL_MAX_SECS: u64 = 20 * 60;
100
101pub const AUDIT_TICK_INTERVAL_MIN: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MIN_SECS);
103
104pub const AUDIT_TICK_INTERVAL_MAX: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MAX_SECS);
106
107const AUDIT_RESPONSE_BASE_SECS: u64 = 10;
109const AUDIT_RESPONSE_PER_KEY_MS: u64 = 20;
111
112const BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS: u64 = 24 * 60 * 60; pub const BOOTSTRAP_CLAIM_GRACE_PERIOD: Duration =
116 Duration::from_secs(BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS);
117
118const PRUNE_HYSTERESIS_DURATION_SECS: u64 = 3 * 24 * 60 * 60; pub const PRUNE_HYSTERESIS_DURATION: Duration = Duration::from_secs(PRUNE_HYSTERESIS_DURATION_SECS);
122
123pub const REPLICATION_PROTOCOL_ID: &str = "autonomi.ant.replication.v1";
125
126const REPLICATION_MESSAGE_SIZE_MIB: usize = 10;
128pub const MAX_REPLICATION_MESSAGE_SIZE: usize = REPLICATION_MESSAGE_SIZE_MIB * 1024 * 1024;
130
131const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15;
133pub const VERIFICATION_REQUEST_TIMEOUT: Duration =
135 Duration::from_secs(VERIFICATION_REQUEST_TIMEOUT_SECS);
136
137const FETCH_REQUEST_TIMEOUT_SECS: u64 = 30;
139pub const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_secs(FETCH_REQUEST_TIMEOUT_SECS);
141
142const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60;
144pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS);
146
147pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0;
149
150pub const MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS: usize = 64;
152
153const BOOTSTRAP_COMPLETE_TIMEOUT_SECS: u64 = 60;
156
157#[derive(Debug, Clone)]
165pub struct ReplicationConfig {
166 pub close_group_size: usize,
168 pub quorum_threshold: usize,
170 pub paid_list_close_group_size: usize,
172 pub neighbor_sync_scope: usize,
174 pub neighbor_sync_peer_count: usize,
176 pub neighbor_sync_interval_min: Duration,
178 pub neighbor_sync_interval_max: Duration,
180 pub neighbor_sync_cooldown: Duration,
182 pub self_lookup_interval_min: Duration,
184 pub self_lookup_interval_max: Duration,
186 pub audit_tick_interval_min: Duration,
188 pub audit_tick_interval_max: Duration,
190 pub audit_response_base: Duration,
192 pub audit_response_per_key: Duration,
194 pub bootstrap_claim_grace_period: Duration,
196 pub prune_hysteresis_duration: Duration,
198 pub verification_request_timeout: Duration,
200 pub fetch_request_timeout: Duration,
202 pub bootstrap_complete_timeout_secs: u64,
205}
206
207impl Default for ReplicationConfig {
208 fn default() -> Self {
209 Self {
210 close_group_size: CLOSE_GROUP_SIZE,
211 quorum_threshold: QUORUM_THRESHOLD,
212 paid_list_close_group_size: PAID_LIST_CLOSE_GROUP_SIZE,
213 neighbor_sync_scope: NEIGHBOR_SYNC_SCOPE,
214 neighbor_sync_peer_count: NEIGHBOR_SYNC_PEER_COUNT,
215 neighbor_sync_interval_min: NEIGHBOR_SYNC_INTERVAL_MIN,
216 neighbor_sync_interval_max: NEIGHBOR_SYNC_INTERVAL_MAX,
217 neighbor_sync_cooldown: NEIGHBOR_SYNC_COOLDOWN,
218 self_lookup_interval_min: SELF_LOOKUP_INTERVAL_MIN,
219 self_lookup_interval_max: SELF_LOOKUP_INTERVAL_MAX,
220 audit_tick_interval_min: AUDIT_TICK_INTERVAL_MIN,
221 audit_tick_interval_max: AUDIT_TICK_INTERVAL_MAX,
222 audit_response_base: Duration::from_secs(AUDIT_RESPONSE_BASE_SECS),
223 audit_response_per_key: Duration::from_millis(AUDIT_RESPONSE_PER_KEY_MS),
224 bootstrap_claim_grace_period: BOOTSTRAP_CLAIM_GRACE_PERIOD,
225 prune_hysteresis_duration: PRUNE_HYSTERESIS_DURATION,
226 verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT,
227 fetch_request_timeout: FETCH_REQUEST_TIMEOUT,
228 bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS,
229 }
230 }
231}
232
233impl ReplicationConfig {
234 pub fn validate(&self) -> Result<(), String> {
242 if self.close_group_size == 0 {
243 return Err("close_group_size must be >= 1".to_string());
244 }
245 if self.quorum_threshold == 0 || self.quorum_threshold > self.close_group_size {
246 return Err(format!(
247 "quorum_threshold ({}) must satisfy 1 <= quorum_threshold <= close_group_size ({})",
248 self.quorum_threshold, self.close_group_size,
249 ));
250 }
251 if self.close_group_size > MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS {
252 return Err(format!(
253 "close_group_size ({}) must be <= MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS ({})",
254 self.close_group_size, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
255 ));
256 }
257 if self.paid_list_close_group_size == 0 {
258 return Err("paid_list_close_group_size must be >= 1".to_string());
259 }
260 if self.neighbor_sync_interval_min > self.neighbor_sync_interval_max {
261 return Err(format!(
262 "neighbor_sync_interval_min ({:?}) must be <= neighbor_sync_interval_max ({:?})",
263 self.neighbor_sync_interval_min, self.neighbor_sync_interval_max,
264 ));
265 }
266 if self.audit_tick_interval_min > self.audit_tick_interval_max {
267 return Err(format!(
268 "audit_tick_interval_min ({:?}) must be <= audit_tick_interval_max ({:?})",
269 self.audit_tick_interval_min, self.audit_tick_interval_max,
270 ));
271 }
272 if self.self_lookup_interval_min > self.self_lookup_interval_max {
273 return Err(format!(
274 "self_lookup_interval_min ({:?}) must be <= self_lookup_interval_max ({:?})",
275 self.self_lookup_interval_min, self.self_lookup_interval_max,
276 ));
277 }
278 if self.neighbor_sync_peer_count == 0 {
279 return Err("neighbor_sync_peer_count must be >= 1".to_string());
280 }
281 if self.neighbor_sync_scope == 0 {
282 return Err("neighbor_sync_scope must be >= 1".to_string());
283 }
284 Ok(())
285 }
286
287 #[must_use]
292 pub fn quorum_needed(&self, quorum_targets_count: usize) -> usize {
293 if quorum_targets_count == 0 {
294 return 0;
295 }
296 let majority = quorum_targets_count / 2 + 1;
297 self.quorum_threshold.min(majority)
298 }
299
300 #[must_use]
305 pub fn confirm_needed(paid_group_size: usize) -> usize {
306 paid_group_size / 2 + 1
307 }
308
309 #[must_use]
312 pub fn random_neighbor_sync_interval(&self) -> Duration {
313 random_duration_in_range(
314 self.neighbor_sync_interval_min,
315 self.neighbor_sync_interval_max,
316 )
317 }
318
319 #[must_use]
324 pub fn audit_sample_count(total_keys: usize) -> usize {
325 #[allow(
326 clippy::cast_possible_truncation,
327 clippy::cast_sign_loss,
328 clippy::cast_precision_loss
329 )]
330 let sqrt = (total_keys as f64).sqrt() as usize;
331 sqrt.max(1).min(total_keys)
332 }
333
334 #[must_use]
340 pub fn max_incoming_audit_keys(stored_chunks: usize) -> usize {
341 (2 * Self::audit_sample_count(stored_chunks)).max(1)
343 }
344
345 #[must_use]
348 pub fn audit_response_timeout(&self, challenged_key_count: usize) -> Duration {
349 let keys = u32::try_from(challenged_key_count).unwrap_or(u32::MAX);
350 self.audit_response_base + self.audit_response_per_key * keys
351 }
352
353 #[must_use]
356 pub fn random_audit_tick_interval(&self) -> Duration {
357 random_duration_in_range(self.audit_tick_interval_min, self.audit_tick_interval_max)
358 }
359
360 #[must_use]
363 pub fn random_self_lookup_interval(&self) -> Duration {
364 random_duration_in_range(self.self_lookup_interval_min, self.self_lookup_interval_max)
365 }
366}
367
368fn random_duration_in_range(min: Duration, max: Duration) -> Duration {
373 if min == max {
374 return min;
375 }
376 let to_u64_millis = |d: Duration| -> u64 { u64::try_from(d.as_millis()).unwrap_or(u64::MAX) };
379 let chosen = rand::thread_rng().gen_range(to_u64_millis(min)..=to_u64_millis(max));
380 Duration::from_millis(chosen)
381}
382
383#[cfg(test)]
388#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn defaults_pass_validation() {
394 let config = ReplicationConfig::default();
395 assert!(config.validate().is_ok(), "default config must be valid");
396 }
397
398 #[test]
399 fn default_prune_hysteresis_is_three_days() {
400 let config = ReplicationConfig::default();
401 assert_eq!(
402 config.prune_hysteresis_duration,
403 Duration::from_secs(3 * 24 * 60 * 60)
404 );
405 }
406
407 #[test]
408 fn audit_failure_weight_is_five() {
409 assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON);
410 }
411
412 #[test]
413 fn quorum_threshold_zero_rejected() {
414 let config = ReplicationConfig {
415 quorum_threshold: 0,
416 ..ReplicationConfig::default()
417 };
418 assert!(config.validate().is_err());
419 }
420
421 #[test]
422 fn quorum_threshold_exceeds_close_group_rejected() {
423 let defaults = ReplicationConfig::default();
424 let config = ReplicationConfig {
425 quorum_threshold: defaults.close_group_size + 1,
426 ..defaults
427 };
428 assert!(config.validate().is_err());
429 }
430
431 #[test]
432 fn close_group_size_zero_rejected() {
433 let config = ReplicationConfig {
434 close_group_size: 0,
435 ..ReplicationConfig::default()
436 };
437 assert!(config.validate().is_err());
438 }
439
440 #[test]
441 fn close_group_size_exceeding_prune_audit_budget_rejected() {
442 let config = ReplicationConfig {
443 close_group_size: MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS + 1,
444 quorum_threshold: QUORUM_THRESHOLD,
445 ..ReplicationConfig::default()
446 };
447
448 let err = config.validate().unwrap_err();
449
450 assert!(
451 err.contains("MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS"),
452 "error should mention prune audit budget: {err}"
453 );
454 }
455
456 #[test]
457 fn paid_list_close_group_size_zero_rejected() {
458 let config = ReplicationConfig {
459 paid_list_close_group_size: 0,
460 ..ReplicationConfig::default()
461 };
462 assert!(config.validate().is_err());
463 }
464
465 #[test]
466 fn neighbor_sync_interval_inverted_rejected() {
467 let config = ReplicationConfig {
468 neighbor_sync_interval_min: Duration::from_secs(100),
469 neighbor_sync_interval_max: Duration::from_secs(50),
470 ..ReplicationConfig::default()
471 };
472 assert!(config.validate().is_err());
473 }
474
475 #[test]
476 fn audit_tick_interval_inverted_rejected() {
477 let config = ReplicationConfig {
478 audit_tick_interval_min: Duration::from_secs(100),
479 audit_tick_interval_max: Duration::from_secs(50),
480 ..ReplicationConfig::default()
481 };
482 assert!(config.validate().is_err());
483 }
484
485 #[test]
486 fn self_lookup_interval_inverted_rejected() {
487 let config = ReplicationConfig {
488 self_lookup_interval_min: Duration::from_secs(100),
489 self_lookup_interval_max: Duration::from_secs(50),
490 ..ReplicationConfig::default()
491 };
492 assert!(config.validate().is_err());
493 }
494
495 #[test]
496 fn neighbor_sync_peer_count_zero_rejected() {
497 let config = ReplicationConfig {
498 neighbor_sync_peer_count: 0,
499 ..ReplicationConfig::default()
500 };
501 assert!(config.validate().is_err());
502 }
503
504 #[test]
505 fn audit_sample_count_scales_with_sqrt() {
506 assert_eq!(ReplicationConfig::audit_sample_count(0), 0);
508
509 assert_eq!(ReplicationConfig::audit_sample_count(1), 1);
511
512 assert_eq!(ReplicationConfig::audit_sample_count(3), 1);
514
515 assert_eq!(ReplicationConfig::audit_sample_count(4), 2);
517 assert_eq!(ReplicationConfig::audit_sample_count(25), 5);
518 assert_eq!(ReplicationConfig::audit_sample_count(100), 10);
519 assert_eq!(ReplicationConfig::audit_sample_count(1_000), 31);
520 assert_eq!(ReplicationConfig::audit_sample_count(10_000), 100);
521 assert_eq!(ReplicationConfig::audit_sample_count(1_000_000), 1_000);
522 }
523
524 #[test]
525 fn max_incoming_audit_keys_scales_dynamically() {
526 assert_eq!(ReplicationConfig::max_incoming_audit_keys(0), 1);
528
529 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1), 2);
531
532 assert_eq!(ReplicationConfig::max_incoming_audit_keys(100), 20);
534
535 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1_000_000), 2_000);
537
538 assert_eq!(ReplicationConfig::max_incoming_audit_keys(5_000_000), 4_472);
540 }
541
542 #[test]
543 fn quorum_needed_uses_smaller_of_threshold_and_majority() {
544 let config = ReplicationConfig::default();
545
546 assert_eq!(config.quorum_needed(7), 4);
548
549 assert_eq!(config.quorum_needed(3), 2);
551
552 assert_eq!(config.quorum_needed(0), 0);
554
555 assert_eq!(config.quorum_needed(100), 4);
557 }
558
559 #[test]
560 fn confirm_needed_is_strict_majority() {
561 assert_eq!(ReplicationConfig::confirm_needed(1), 1);
562 assert_eq!(ReplicationConfig::confirm_needed(2), 2);
563 assert_eq!(ReplicationConfig::confirm_needed(3), 2);
564 assert_eq!(ReplicationConfig::confirm_needed(4), 3);
565 assert_eq!(ReplicationConfig::confirm_needed(20), 11);
566 }
567
568 #[test]
569 fn random_intervals_within_bounds() {
570 let config = ReplicationConfig::default();
571
572 let iterations = 50;
574 for _ in 0..iterations {
575 let ns = config.random_neighbor_sync_interval();
576 assert!(ns >= config.neighbor_sync_interval_min);
577 assert!(ns <= config.neighbor_sync_interval_max);
578
579 let at = config.random_audit_tick_interval();
580 assert!(at >= config.audit_tick_interval_min);
581 assert!(at <= config.audit_tick_interval_max);
582
583 let sl = config.random_self_lookup_interval();
584 assert!(sl >= config.self_lookup_interval_min);
585 assert!(sl <= config.self_lookup_interval_max);
586 }
587 }
588
589 #[test]
590 fn random_interval_equal_bounds_is_deterministic() {
591 let fixed = Duration::from_secs(42);
592 let config = ReplicationConfig {
593 neighbor_sync_interval_min: fixed,
594 neighbor_sync_interval_max: fixed,
595 ..ReplicationConfig::default()
596 };
597 assert_eq!(config.random_neighbor_sync_interval(), fixed);
598 }
599
600 #[test]
606 fn scenario_18_invalid_config_rejected() {
607 let config = ReplicationConfig {
609 quorum_threshold: 10,
610 close_group_size: 7,
611 ..ReplicationConfig::default()
612 };
613 let err = config.validate().unwrap_err();
614 assert!(
615 err.contains("quorum_threshold"),
616 "error should mention quorum_threshold: {err}"
617 );
618
619 let config = ReplicationConfig {
621 close_group_size: 0,
622 ..ReplicationConfig::default()
623 };
624 let err = config.validate().unwrap_err();
625 assert!(
626 err.contains("close_group_size"),
627 "error should mention close_group_size: {err}"
628 );
629
630 let config = ReplicationConfig {
632 neighbor_sync_interval_min: Duration::from_secs(200),
633 neighbor_sync_interval_max: Duration::from_secs(100),
634 ..ReplicationConfig::default()
635 };
636 let err = config.validate().unwrap_err();
637 assert!(
638 err.contains("neighbor_sync_interval"),
639 "error should mention neighbor_sync_interval: {err}"
640 );
641
642 let config = ReplicationConfig {
644 self_lookup_interval_min: Duration::from_secs(999),
645 self_lookup_interval_max: Duration::from_secs(1),
646 ..ReplicationConfig::default()
647 };
648 let err = config.validate().unwrap_err();
649 assert!(
650 err.contains("self_lookup_interval"),
651 "error should mention self_lookup_interval: {err}"
652 );
653
654 let config = ReplicationConfig {
656 audit_tick_interval_min: Duration::from_secs(500),
657 audit_tick_interval_max: Duration::from_secs(10),
658 ..ReplicationConfig::default()
659 };
660 let err = config.validate().unwrap_err();
661 assert!(
662 err.contains("audit_tick_interval"),
663 "error should mention audit_tick_interval: {err}"
664 );
665 }
666
667 #[test]
670 fn scenario_26_dynamic_paid_threshold_undersized() {
671 assert_eq!(ReplicationConfig::confirm_needed(8), 5, "floor(8/2)+1 = 5");
672
673 assert_eq!(
675 ReplicationConfig::confirm_needed(1),
676 1,
677 "single peer requires 1 confirmation"
678 );
679 assert_eq!(
680 ReplicationConfig::confirm_needed(2),
681 2,
682 "2 peers require 2 confirmations"
683 );
684 assert_eq!(
685 ReplicationConfig::confirm_needed(3),
686 2,
687 "3 peers require 2 confirmations"
688 );
689 assert_eq!(
690 ReplicationConfig::confirm_needed(0),
691 1,
692 "0 peers yields floor(0/2)+1 = 1 (degenerate case)"
693 );
694 }
695
696 #[test]
700 fn scenario_31_audit_cadence_within_jitter_bounds() {
701 let config = ReplicationConfig {
702 audit_tick_interval_min: Duration::from_secs(600),
703 audit_tick_interval_max: Duration::from_secs(1200),
704 ..ReplicationConfig::default()
705 };
706
707 let iterations = 100;
709 let mut saw_different = false;
710 let mut prev = Duration::ZERO;
711
712 for _ in 0..iterations {
713 let interval = config.random_audit_tick_interval();
714 assert!(
715 interval >= config.audit_tick_interval_min,
716 "interval {interval:?} below min {:?}",
717 config.audit_tick_interval_min,
718 );
719 assert!(
720 interval <= config.audit_tick_interval_max,
721 "interval {interval:?} above max {:?}",
722 config.audit_tick_interval_max,
723 );
724 if interval != prev && prev != Duration::ZERO {
725 saw_different = true;
726 }
727 prev = interval;
728 }
729
730 assert!(
733 saw_different,
734 "audit intervals should exhibit randomized jitter across samples"
735 );
736 }
737}