1#![allow(clippy::module_name_repetitions)]
12
13use std::time::Duration;
14
15use rand::Rng;
16
17use crate::ant_protocol::CLOSE_GROUP_SIZE;
18
19pub const K_BUCKET_SIZE: usize = 20;
25
26pub const QUORUM_THRESHOLD: usize = 4; pub const PAID_LIST_CLOSE_GROUP_SIZE: usize = 20;
34
35pub const NEIGHBOR_SYNC_SCOPE: usize = 20;
37
38pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4;
41
42const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60;
45const NEIGHBOR_SYNC_INTERVAL_MAX_SECS: u64 = 20 * 60;
47
48pub const NEIGHBOR_SYNC_INTERVAL_MIN: Duration =
50 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MIN_SECS);
51
52pub const NEIGHBOR_SYNC_INTERVAL_MAX: Duration =
54 Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MAX_SECS);
55
56const NEIGHBOR_SYNC_COOLDOWN_SECS: u64 = 60 * 60; pub const NEIGHBOR_SYNC_COOLDOWN: Duration = Duration::from_secs(NEIGHBOR_SYNC_COOLDOWN_SECS);
60
61const SELF_LOOKUP_INTERVAL_MIN_SECS: u64 = 5 * 60;
63const SELF_LOOKUP_INTERVAL_MAX_SECS: u64 = 10 * 60;
65
66pub const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MIN_SECS);
69
70pub const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MAX_SECS);
72
73pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3;
81
82const AVAILABLE_PARALLELISM_FALLBACK: usize = 4;
87
88#[allow(clippy::incompatible_msrv)] pub fn max_parallel_fetch() -> usize {
92 std::thread::available_parallelism()
93 .map_or(AVAILABLE_PARALLELISM_FALLBACK, std::num::NonZero::get)
94}
95
96const AUDIT_TICK_INTERVAL_MIN_SECS: u64 = 30 * 60;
98const AUDIT_TICK_INTERVAL_MAX_SECS: u64 = 60 * 60;
100
101pub const AUDIT_TICK_INTERVAL_MIN: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MIN_SECS);
103
104pub const AUDIT_TICK_INTERVAL_MAX: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MAX_SECS);
106
107const AUDIT_RESPONSE_BASE_SECS: u64 = 6;
109const AUDIT_RESPONSE_PER_CHUNK_MS: u64 = 10;
111
112const BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS: u64 = 24 * 60 * 60; pub const BOOTSTRAP_CLAIM_GRACE_PERIOD: Duration =
116 Duration::from_secs(BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS);
117
118const PRUNE_HYSTERESIS_DURATION_SECS: u64 = 6 * 60 * 60; pub const PRUNE_HYSTERESIS_DURATION: Duration = Duration::from_secs(PRUNE_HYSTERESIS_DURATION_SECS);
122
123pub const REPLICATION_PROTOCOL_ID: &str = "autonomi.ant.replication.v1";
125
126const REPLICATION_MESSAGE_SIZE_MIB: usize = 10;
128pub const MAX_REPLICATION_MESSAGE_SIZE: usize = REPLICATION_MESSAGE_SIZE_MIB * 1024 * 1024;
130
131const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15;
133pub const VERIFICATION_REQUEST_TIMEOUT: Duration =
135 Duration::from_secs(VERIFICATION_REQUEST_TIMEOUT_SECS);
136
137const FETCH_REQUEST_TIMEOUT_SECS: u64 = 30;
139pub const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_secs(FETCH_REQUEST_TIMEOUT_SECS);
141
142const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60;
144pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS);
146
147pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 2.0;
149
150const BOOTSTRAP_COMPLETE_TIMEOUT_SECS: u64 = 60;
153
154#[derive(Debug, Clone)]
162pub struct ReplicationConfig {
163 pub close_group_size: usize,
165 pub quorum_threshold: usize,
167 pub paid_list_close_group_size: usize,
169 pub neighbor_sync_scope: usize,
171 pub neighbor_sync_peer_count: usize,
173 pub neighbor_sync_interval_min: Duration,
175 pub neighbor_sync_interval_max: Duration,
177 pub neighbor_sync_cooldown: Duration,
179 pub self_lookup_interval_min: Duration,
181 pub self_lookup_interval_max: Duration,
183 pub audit_tick_interval_min: Duration,
185 pub audit_tick_interval_max: Duration,
187 pub audit_response_base: Duration,
189 pub audit_response_per_chunk: Duration,
191 pub bootstrap_claim_grace_period: Duration,
193 pub prune_hysteresis_duration: Duration,
195 pub verification_request_timeout: Duration,
197 pub fetch_request_timeout: Duration,
199 pub bootstrap_complete_timeout_secs: u64,
202}
203
204impl Default for ReplicationConfig {
205 fn default() -> Self {
206 Self {
207 close_group_size: CLOSE_GROUP_SIZE,
208 quorum_threshold: QUORUM_THRESHOLD,
209 paid_list_close_group_size: PAID_LIST_CLOSE_GROUP_SIZE,
210 neighbor_sync_scope: NEIGHBOR_SYNC_SCOPE,
211 neighbor_sync_peer_count: NEIGHBOR_SYNC_PEER_COUNT,
212 neighbor_sync_interval_min: NEIGHBOR_SYNC_INTERVAL_MIN,
213 neighbor_sync_interval_max: NEIGHBOR_SYNC_INTERVAL_MAX,
214 neighbor_sync_cooldown: NEIGHBOR_SYNC_COOLDOWN,
215 self_lookup_interval_min: SELF_LOOKUP_INTERVAL_MIN,
216 self_lookup_interval_max: SELF_LOOKUP_INTERVAL_MAX,
217 audit_tick_interval_min: AUDIT_TICK_INTERVAL_MIN,
218 audit_tick_interval_max: AUDIT_TICK_INTERVAL_MAX,
219 audit_response_base: Duration::from_secs(AUDIT_RESPONSE_BASE_SECS),
220 audit_response_per_chunk: Duration::from_millis(AUDIT_RESPONSE_PER_CHUNK_MS),
221 bootstrap_claim_grace_period: BOOTSTRAP_CLAIM_GRACE_PERIOD,
222 prune_hysteresis_duration: PRUNE_HYSTERESIS_DURATION,
223 verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT,
224 fetch_request_timeout: FETCH_REQUEST_TIMEOUT,
225 bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS,
226 }
227 }
228}
229
230impl ReplicationConfig {
231 pub fn validate(&self) -> Result<(), String> {
239 if self.close_group_size == 0 {
240 return Err("close_group_size must be >= 1".to_string());
241 }
242 if self.quorum_threshold == 0 || self.quorum_threshold > self.close_group_size {
243 return Err(format!(
244 "quorum_threshold ({}) must satisfy 1 <= quorum_threshold <= close_group_size ({})",
245 self.quorum_threshold, self.close_group_size,
246 ));
247 }
248 if self.paid_list_close_group_size == 0 {
249 return Err("paid_list_close_group_size must be >= 1".to_string());
250 }
251 if self.neighbor_sync_interval_min > self.neighbor_sync_interval_max {
252 return Err(format!(
253 "neighbor_sync_interval_min ({:?}) must be <= neighbor_sync_interval_max ({:?})",
254 self.neighbor_sync_interval_min, self.neighbor_sync_interval_max,
255 ));
256 }
257 if self.audit_tick_interval_min > self.audit_tick_interval_max {
258 return Err(format!(
259 "audit_tick_interval_min ({:?}) must be <= audit_tick_interval_max ({:?})",
260 self.audit_tick_interval_min, self.audit_tick_interval_max,
261 ));
262 }
263 if self.self_lookup_interval_min > self.self_lookup_interval_max {
264 return Err(format!(
265 "self_lookup_interval_min ({:?}) must be <= self_lookup_interval_max ({:?})",
266 self.self_lookup_interval_min, self.self_lookup_interval_max,
267 ));
268 }
269 if self.neighbor_sync_peer_count == 0 {
270 return Err("neighbor_sync_peer_count must be >= 1".to_string());
271 }
272 if self.neighbor_sync_scope == 0 {
273 return Err("neighbor_sync_scope must be >= 1".to_string());
274 }
275 Ok(())
276 }
277
278 #[must_use]
283 pub fn quorum_needed(&self, quorum_targets_count: usize) -> usize {
284 if quorum_targets_count == 0 {
285 return 0;
286 }
287 let majority = quorum_targets_count / 2 + 1;
288 self.quorum_threshold.min(majority)
289 }
290
291 #[must_use]
296 pub fn confirm_needed(paid_group_size: usize) -> usize {
297 paid_group_size / 2 + 1
298 }
299
300 #[must_use]
303 pub fn random_neighbor_sync_interval(&self) -> Duration {
304 random_duration_in_range(
305 self.neighbor_sync_interval_min,
306 self.neighbor_sync_interval_max,
307 )
308 }
309
310 #[must_use]
315 pub fn audit_sample_count(total_keys: usize) -> usize {
316 #[allow(
317 clippy::cast_possible_truncation,
318 clippy::cast_sign_loss,
319 clippy::cast_precision_loss
320 )]
321 let sqrt = (total_keys as f64).sqrt() as usize;
322 sqrt.max(1).min(total_keys)
323 }
324
325 #[must_use]
331 pub fn max_incoming_audit_keys(stored_chunks: usize) -> usize {
332 (2 * Self::audit_sample_count(stored_chunks)).max(1)
334 }
335
336 #[must_use]
339 pub fn audit_response_timeout(&self, chunk_count: usize) -> Duration {
340 let chunks = u32::try_from(chunk_count).unwrap_or(u32::MAX);
341 self.audit_response_base + self.audit_response_per_chunk * chunks
342 }
343
344 #[must_use]
347 pub fn random_audit_tick_interval(&self) -> Duration {
348 random_duration_in_range(self.audit_tick_interval_min, self.audit_tick_interval_max)
349 }
350
351 #[must_use]
354 pub fn random_self_lookup_interval(&self) -> Duration {
355 random_duration_in_range(self.self_lookup_interval_min, self.self_lookup_interval_max)
356 }
357}
358
359fn random_duration_in_range(min: Duration, max: Duration) -> Duration {
364 if min == max {
365 return min;
366 }
367 let to_u64_millis = |d: Duration| -> u64 { u64::try_from(d.as_millis()).unwrap_or(u64::MAX) };
370 let chosen = rand::thread_rng().gen_range(to_u64_millis(min)..=to_u64_millis(max));
371 Duration::from_millis(chosen)
372}
373
374#[cfg(test)]
379#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn defaults_pass_validation() {
385 let config = ReplicationConfig::default();
386 assert!(config.validate().is_ok(), "default config must be valid");
387 }
388
389 #[test]
390 fn quorum_threshold_zero_rejected() {
391 let config = ReplicationConfig {
392 quorum_threshold: 0,
393 ..ReplicationConfig::default()
394 };
395 assert!(config.validate().is_err());
396 }
397
398 #[test]
399 fn quorum_threshold_exceeds_close_group_rejected() {
400 let defaults = ReplicationConfig::default();
401 let config = ReplicationConfig {
402 quorum_threshold: defaults.close_group_size + 1,
403 ..defaults
404 };
405 assert!(config.validate().is_err());
406 }
407
408 #[test]
409 fn close_group_size_zero_rejected() {
410 let config = ReplicationConfig {
411 close_group_size: 0,
412 ..ReplicationConfig::default()
413 };
414 assert!(config.validate().is_err());
415 }
416
417 #[test]
418 fn paid_list_close_group_size_zero_rejected() {
419 let config = ReplicationConfig {
420 paid_list_close_group_size: 0,
421 ..ReplicationConfig::default()
422 };
423 assert!(config.validate().is_err());
424 }
425
426 #[test]
427 fn neighbor_sync_interval_inverted_rejected() {
428 let config = ReplicationConfig {
429 neighbor_sync_interval_min: Duration::from_secs(100),
430 neighbor_sync_interval_max: Duration::from_secs(50),
431 ..ReplicationConfig::default()
432 };
433 assert!(config.validate().is_err());
434 }
435
436 #[test]
437 fn audit_tick_interval_inverted_rejected() {
438 let config = ReplicationConfig {
439 audit_tick_interval_min: Duration::from_secs(100),
440 audit_tick_interval_max: Duration::from_secs(50),
441 ..ReplicationConfig::default()
442 };
443 assert!(config.validate().is_err());
444 }
445
446 #[test]
447 fn self_lookup_interval_inverted_rejected() {
448 let config = ReplicationConfig {
449 self_lookup_interval_min: Duration::from_secs(100),
450 self_lookup_interval_max: Duration::from_secs(50),
451 ..ReplicationConfig::default()
452 };
453 assert!(config.validate().is_err());
454 }
455
456 #[test]
457 fn neighbor_sync_peer_count_zero_rejected() {
458 let config = ReplicationConfig {
459 neighbor_sync_peer_count: 0,
460 ..ReplicationConfig::default()
461 };
462 assert!(config.validate().is_err());
463 }
464
465 #[test]
466 fn audit_sample_count_scales_with_sqrt() {
467 assert_eq!(ReplicationConfig::audit_sample_count(0), 0);
469
470 assert_eq!(ReplicationConfig::audit_sample_count(1), 1);
472
473 assert_eq!(ReplicationConfig::audit_sample_count(3), 1);
475
476 assert_eq!(ReplicationConfig::audit_sample_count(4), 2);
478 assert_eq!(ReplicationConfig::audit_sample_count(25), 5);
479 assert_eq!(ReplicationConfig::audit_sample_count(100), 10);
480 assert_eq!(ReplicationConfig::audit_sample_count(1_000), 31);
481 assert_eq!(ReplicationConfig::audit_sample_count(10_000), 100);
482 assert_eq!(ReplicationConfig::audit_sample_count(1_000_000), 1_000);
483 }
484
485 #[test]
486 fn max_incoming_audit_keys_scales_dynamically() {
487 assert_eq!(ReplicationConfig::max_incoming_audit_keys(0), 1);
489
490 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1), 2);
492
493 assert_eq!(ReplicationConfig::max_incoming_audit_keys(100), 20);
495
496 assert_eq!(ReplicationConfig::max_incoming_audit_keys(1_000_000), 2_000);
498
499 assert_eq!(ReplicationConfig::max_incoming_audit_keys(5_000_000), 4_472);
501 }
502
503 #[test]
504 fn quorum_needed_uses_smaller_of_threshold_and_majority() {
505 let config = ReplicationConfig::default();
506
507 assert_eq!(config.quorum_needed(7), 4);
509
510 assert_eq!(config.quorum_needed(3), 2);
512
513 assert_eq!(config.quorum_needed(0), 0);
515
516 assert_eq!(config.quorum_needed(100), 4);
518 }
519
520 #[test]
521 fn confirm_needed_is_strict_majority() {
522 assert_eq!(ReplicationConfig::confirm_needed(1), 1);
523 assert_eq!(ReplicationConfig::confirm_needed(2), 2);
524 assert_eq!(ReplicationConfig::confirm_needed(3), 2);
525 assert_eq!(ReplicationConfig::confirm_needed(4), 3);
526 assert_eq!(ReplicationConfig::confirm_needed(20), 11);
527 }
528
529 #[test]
530 fn random_intervals_within_bounds() {
531 let config = ReplicationConfig::default();
532
533 let iterations = 50;
535 for _ in 0..iterations {
536 let ns = config.random_neighbor_sync_interval();
537 assert!(ns >= config.neighbor_sync_interval_min);
538 assert!(ns <= config.neighbor_sync_interval_max);
539
540 let at = config.random_audit_tick_interval();
541 assert!(at >= config.audit_tick_interval_min);
542 assert!(at <= config.audit_tick_interval_max);
543
544 let sl = config.random_self_lookup_interval();
545 assert!(sl >= config.self_lookup_interval_min);
546 assert!(sl <= config.self_lookup_interval_max);
547 }
548 }
549
550 #[test]
551 fn random_interval_equal_bounds_is_deterministic() {
552 let fixed = Duration::from_secs(42);
553 let config = ReplicationConfig {
554 neighbor_sync_interval_min: fixed,
555 neighbor_sync_interval_max: fixed,
556 ..ReplicationConfig::default()
557 };
558 assert_eq!(config.random_neighbor_sync_interval(), fixed);
559 }
560
561 #[test]
567 fn scenario_18_invalid_config_rejected() {
568 let config = ReplicationConfig {
570 quorum_threshold: 10,
571 close_group_size: 7,
572 ..ReplicationConfig::default()
573 };
574 let err = config.validate().unwrap_err();
575 assert!(
576 err.contains("quorum_threshold"),
577 "error should mention quorum_threshold: {err}"
578 );
579
580 let config = ReplicationConfig {
582 close_group_size: 0,
583 ..ReplicationConfig::default()
584 };
585 let err = config.validate().unwrap_err();
586 assert!(
587 err.contains("close_group_size"),
588 "error should mention close_group_size: {err}"
589 );
590
591 let config = ReplicationConfig {
593 neighbor_sync_interval_min: Duration::from_secs(200),
594 neighbor_sync_interval_max: Duration::from_secs(100),
595 ..ReplicationConfig::default()
596 };
597 let err = config.validate().unwrap_err();
598 assert!(
599 err.contains("neighbor_sync_interval"),
600 "error should mention neighbor_sync_interval: {err}"
601 );
602
603 let config = ReplicationConfig {
605 self_lookup_interval_min: Duration::from_secs(999),
606 self_lookup_interval_max: Duration::from_secs(1),
607 ..ReplicationConfig::default()
608 };
609 let err = config.validate().unwrap_err();
610 assert!(
611 err.contains("self_lookup_interval"),
612 "error should mention self_lookup_interval: {err}"
613 );
614
615 let config = ReplicationConfig {
617 audit_tick_interval_min: Duration::from_secs(500),
618 audit_tick_interval_max: Duration::from_secs(10),
619 ..ReplicationConfig::default()
620 };
621 let err = config.validate().unwrap_err();
622 assert!(
623 err.contains("audit_tick_interval"),
624 "error should mention audit_tick_interval: {err}"
625 );
626 }
627
628 #[test]
631 fn scenario_26_dynamic_paid_threshold_undersized() {
632 assert_eq!(ReplicationConfig::confirm_needed(8), 5, "floor(8/2)+1 = 5");
633
634 assert_eq!(
636 ReplicationConfig::confirm_needed(1),
637 1,
638 "single peer requires 1 confirmation"
639 );
640 assert_eq!(
641 ReplicationConfig::confirm_needed(2),
642 2,
643 "2 peers require 2 confirmations"
644 );
645 assert_eq!(
646 ReplicationConfig::confirm_needed(3),
647 2,
648 "3 peers require 2 confirmations"
649 );
650 assert_eq!(
651 ReplicationConfig::confirm_needed(0),
652 1,
653 "0 peers yields floor(0/2)+1 = 1 (degenerate case)"
654 );
655 }
656
657 #[test]
661 fn scenario_31_audit_cadence_within_jitter_bounds() {
662 let config = ReplicationConfig {
663 audit_tick_interval_min: Duration::from_secs(1800),
664 audit_tick_interval_max: Duration::from_secs(3600),
665 ..ReplicationConfig::default()
666 };
667
668 let iterations = 100;
670 let mut saw_different = false;
671 let mut prev = Duration::ZERO;
672
673 for _ in 0..iterations {
674 let interval = config.random_audit_tick_interval();
675 assert!(
676 interval >= config.audit_tick_interval_min,
677 "interval {interval:?} below min {:?}",
678 config.audit_tick_interval_min,
679 );
680 assert!(
681 interval <= config.audit_tick_interval_max,
682 "interval {interval:?} above max {:?}",
683 config.audit_tick_interval_max,
684 );
685 if interval != prev && prev != Duration::ZERO {
686 saw_different = true;
687 }
688 prev = interval;
689 }
690
691 assert!(
694 saw_different,
695 "audit intervals should exhibit randomized jitter across samples"
696 );
697 }
698}