#![allow(clippy::module_name_repetitions)]
use std::time::Duration;
use rand::Rng;
use crate::ant_protocol::CLOSE_GROUP_SIZE;
pub const K_BUCKET_SIZE: usize = 20;
pub const QUORUM_THRESHOLD: usize = 4;
pub const PAID_LIST_CLOSE_GROUP_SIZE: usize = 20;
pub const NEIGHBOR_SYNC_SCOPE: usize = 20;
pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4;
const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60;
const NEIGHBOR_SYNC_INTERVAL_MAX_SECS: u64 = 20 * 60;
pub const NEIGHBOR_SYNC_INTERVAL_MIN: Duration =
Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MIN_SECS);
pub const NEIGHBOR_SYNC_INTERVAL_MAX: Duration =
Duration::from_secs(NEIGHBOR_SYNC_INTERVAL_MAX_SECS);
const NEIGHBOR_SYNC_COOLDOWN_SECS: u64 = 60 * 60; pub const NEIGHBOR_SYNC_COOLDOWN: Duration = Duration::from_secs(NEIGHBOR_SYNC_COOLDOWN_SECS);
const SELF_LOOKUP_INTERVAL_MIN_SECS: u64 = 5 * 60;
const SELF_LOOKUP_INTERVAL_MAX_SECS: u64 = 10 * 60;
pub const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MIN_SECS);
pub const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(SELF_LOOKUP_INTERVAL_MAX_SECS);
pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3;
const AVAILABLE_PARALLELISM_FALLBACK: usize = 4;
#[allow(clippy::incompatible_msrv)] pub fn max_parallel_fetch() -> usize {
std::thread::available_parallelism()
.map_or(AVAILABLE_PARALLELISM_FALLBACK, std::num::NonZero::get)
}
const AUDIT_TICK_INTERVAL_MIN_SECS: u64 = 30 * 60;
const AUDIT_TICK_INTERVAL_MAX_SECS: u64 = 60 * 60;
pub const AUDIT_TICK_INTERVAL_MIN: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MIN_SECS);
pub const AUDIT_TICK_INTERVAL_MAX: Duration = Duration::from_secs(AUDIT_TICK_INTERVAL_MAX_SECS);
const AUDIT_RESPONSE_BASE_SECS: u64 = 6;
const AUDIT_RESPONSE_PER_CHUNK_MS: u64 = 10;
const BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS: u64 = 24 * 60 * 60; pub const BOOTSTRAP_CLAIM_GRACE_PERIOD: Duration =
Duration::from_secs(BOOTSTRAP_CLAIM_GRACE_PERIOD_SECS);
const PRUNE_HYSTERESIS_DURATION_SECS: u64 = 6 * 60 * 60; pub const PRUNE_HYSTERESIS_DURATION: Duration = Duration::from_secs(PRUNE_HYSTERESIS_DURATION_SECS);
pub const REPLICATION_PROTOCOL_ID: &str = "autonomi.ant.replication.v1";
const REPLICATION_MESSAGE_SIZE_MIB: usize = 10;
pub const MAX_REPLICATION_MESSAGE_SIZE: usize = REPLICATION_MESSAGE_SIZE_MIB * 1024 * 1024;
const VERIFICATION_REQUEST_TIMEOUT_SECS: u64 = 15;
pub const VERIFICATION_REQUEST_TIMEOUT: Duration =
Duration::from_secs(VERIFICATION_REQUEST_TIMEOUT_SECS);
const FETCH_REQUEST_TIMEOUT_SECS: u64 = 30;
pub const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_secs(FETCH_REQUEST_TIMEOUT_SECS);
const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60;
pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS);
pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 2.0;
const BOOTSTRAP_COMPLETE_TIMEOUT_SECS: u64 = 60;
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub close_group_size: usize,
pub quorum_threshold: usize,
pub paid_list_close_group_size: usize,
pub neighbor_sync_scope: usize,
pub neighbor_sync_peer_count: usize,
pub neighbor_sync_interval_min: Duration,
pub neighbor_sync_interval_max: Duration,
pub neighbor_sync_cooldown: Duration,
pub self_lookup_interval_min: Duration,
pub self_lookup_interval_max: Duration,
pub audit_tick_interval_min: Duration,
pub audit_tick_interval_max: Duration,
pub audit_response_base: Duration,
pub audit_response_per_chunk: Duration,
pub bootstrap_claim_grace_period: Duration,
pub prune_hysteresis_duration: Duration,
pub verification_request_timeout: Duration,
pub fetch_request_timeout: Duration,
pub bootstrap_complete_timeout_secs: u64,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
close_group_size: CLOSE_GROUP_SIZE,
quorum_threshold: QUORUM_THRESHOLD,
paid_list_close_group_size: PAID_LIST_CLOSE_GROUP_SIZE,
neighbor_sync_scope: NEIGHBOR_SYNC_SCOPE,
neighbor_sync_peer_count: NEIGHBOR_SYNC_PEER_COUNT,
neighbor_sync_interval_min: NEIGHBOR_SYNC_INTERVAL_MIN,
neighbor_sync_interval_max: NEIGHBOR_SYNC_INTERVAL_MAX,
neighbor_sync_cooldown: NEIGHBOR_SYNC_COOLDOWN,
self_lookup_interval_min: SELF_LOOKUP_INTERVAL_MIN,
self_lookup_interval_max: SELF_LOOKUP_INTERVAL_MAX,
audit_tick_interval_min: AUDIT_TICK_INTERVAL_MIN,
audit_tick_interval_max: AUDIT_TICK_INTERVAL_MAX,
audit_response_base: Duration::from_secs(AUDIT_RESPONSE_BASE_SECS),
audit_response_per_chunk: Duration::from_millis(AUDIT_RESPONSE_PER_CHUNK_MS),
bootstrap_claim_grace_period: BOOTSTRAP_CLAIM_GRACE_PERIOD,
prune_hysteresis_duration: PRUNE_HYSTERESIS_DURATION,
verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT,
fetch_request_timeout: FETCH_REQUEST_TIMEOUT,
bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS,
}
}
}
impl ReplicationConfig {
pub fn validate(&self) -> Result<(), String> {
if self.close_group_size == 0 {
return Err("close_group_size must be >= 1".to_string());
}
if self.quorum_threshold == 0 || self.quorum_threshold > self.close_group_size {
return Err(format!(
"quorum_threshold ({}) must satisfy 1 <= quorum_threshold <= close_group_size ({})",
self.quorum_threshold, self.close_group_size,
));
}
if self.paid_list_close_group_size == 0 {
return Err("paid_list_close_group_size must be >= 1".to_string());
}
if self.neighbor_sync_interval_min > self.neighbor_sync_interval_max {
return Err(format!(
"neighbor_sync_interval_min ({:?}) must be <= neighbor_sync_interval_max ({:?})",
self.neighbor_sync_interval_min, self.neighbor_sync_interval_max,
));
}
if self.audit_tick_interval_min > self.audit_tick_interval_max {
return Err(format!(
"audit_tick_interval_min ({:?}) must be <= audit_tick_interval_max ({:?})",
self.audit_tick_interval_min, self.audit_tick_interval_max,
));
}
if self.self_lookup_interval_min > self.self_lookup_interval_max {
return Err(format!(
"self_lookup_interval_min ({:?}) must be <= self_lookup_interval_max ({:?})",
self.self_lookup_interval_min, self.self_lookup_interval_max,
));
}
if self.neighbor_sync_peer_count == 0 {
return Err("neighbor_sync_peer_count must be >= 1".to_string());
}
if self.neighbor_sync_scope == 0 {
return Err("neighbor_sync_scope must be >= 1".to_string());
}
Ok(())
}
#[must_use]
pub fn quorum_needed(&self, quorum_targets_count: usize) -> usize {
if quorum_targets_count == 0 {
return 0;
}
let majority = quorum_targets_count / 2 + 1;
self.quorum_threshold.min(majority)
}
#[must_use]
pub fn confirm_needed(paid_group_size: usize) -> usize {
paid_group_size / 2 + 1
}
#[must_use]
pub fn random_neighbor_sync_interval(&self) -> Duration {
random_duration_in_range(
self.neighbor_sync_interval_min,
self.neighbor_sync_interval_max,
)
}
#[must_use]
pub fn audit_sample_count(total_keys: usize) -> usize {
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let sqrt = (total_keys as f64).sqrt() as usize;
sqrt.max(1).min(total_keys)
}
#[must_use]
pub fn max_incoming_audit_keys(stored_chunks: usize) -> usize {
(2 * Self::audit_sample_count(stored_chunks)).max(1)
}
#[must_use]
pub fn audit_response_timeout(&self, chunk_count: usize) -> Duration {
let chunks = u32::try_from(chunk_count).unwrap_or(u32::MAX);
self.audit_response_base + self.audit_response_per_chunk * chunks
}
#[must_use]
pub fn random_audit_tick_interval(&self) -> Duration {
random_duration_in_range(self.audit_tick_interval_min, self.audit_tick_interval_max)
}
#[must_use]
pub fn random_self_lookup_interval(&self) -> Duration {
random_duration_in_range(self.self_lookup_interval_min, self.self_lookup_interval_max)
}
}
fn random_duration_in_range(min: Duration, max: Duration) -> Duration {
if min == max {
return min;
}
let to_u64_millis = |d: Duration| -> u64 { u64::try_from(d.as_millis()).unwrap_or(u64::MAX) };
let chosen = rand::thread_rng().gen_range(to_u64_millis(min)..=to_u64_millis(max));
Duration::from_millis(chosen)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn defaults_pass_validation() {
let config = ReplicationConfig::default();
assert!(config.validate().is_ok(), "default config must be valid");
}
#[test]
fn quorum_threshold_zero_rejected() {
let config = ReplicationConfig {
quorum_threshold: 0,
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn quorum_threshold_exceeds_close_group_rejected() {
let defaults = ReplicationConfig::default();
let config = ReplicationConfig {
quorum_threshold: defaults.close_group_size + 1,
..defaults
};
assert!(config.validate().is_err());
}
#[test]
fn close_group_size_zero_rejected() {
let config = ReplicationConfig {
close_group_size: 0,
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn paid_list_close_group_size_zero_rejected() {
let config = ReplicationConfig {
paid_list_close_group_size: 0,
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn neighbor_sync_interval_inverted_rejected() {
let config = ReplicationConfig {
neighbor_sync_interval_min: Duration::from_secs(100),
neighbor_sync_interval_max: Duration::from_secs(50),
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn audit_tick_interval_inverted_rejected() {
let config = ReplicationConfig {
audit_tick_interval_min: Duration::from_secs(100),
audit_tick_interval_max: Duration::from_secs(50),
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn self_lookup_interval_inverted_rejected() {
let config = ReplicationConfig {
self_lookup_interval_min: Duration::from_secs(100),
self_lookup_interval_max: Duration::from_secs(50),
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn neighbor_sync_peer_count_zero_rejected() {
let config = ReplicationConfig {
neighbor_sync_peer_count: 0,
..ReplicationConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn audit_sample_count_scales_with_sqrt() {
assert_eq!(ReplicationConfig::audit_sample_count(0), 0);
assert_eq!(ReplicationConfig::audit_sample_count(1), 1);
assert_eq!(ReplicationConfig::audit_sample_count(3), 1);
assert_eq!(ReplicationConfig::audit_sample_count(4), 2);
assert_eq!(ReplicationConfig::audit_sample_count(25), 5);
assert_eq!(ReplicationConfig::audit_sample_count(100), 10);
assert_eq!(ReplicationConfig::audit_sample_count(1_000), 31);
assert_eq!(ReplicationConfig::audit_sample_count(10_000), 100);
assert_eq!(ReplicationConfig::audit_sample_count(1_000_000), 1_000);
}
#[test]
fn max_incoming_audit_keys_scales_dynamically() {
assert_eq!(ReplicationConfig::max_incoming_audit_keys(0), 1);
assert_eq!(ReplicationConfig::max_incoming_audit_keys(1), 2);
assert_eq!(ReplicationConfig::max_incoming_audit_keys(100), 20);
assert_eq!(ReplicationConfig::max_incoming_audit_keys(1_000_000), 2_000);
assert_eq!(ReplicationConfig::max_incoming_audit_keys(5_000_000), 4_472);
}
#[test]
fn quorum_needed_uses_smaller_of_threshold_and_majority() {
let config = ReplicationConfig::default();
assert_eq!(config.quorum_needed(7), 4);
assert_eq!(config.quorum_needed(3), 2);
assert_eq!(config.quorum_needed(0), 0);
assert_eq!(config.quorum_needed(100), 4);
}
#[test]
fn confirm_needed_is_strict_majority() {
assert_eq!(ReplicationConfig::confirm_needed(1), 1);
assert_eq!(ReplicationConfig::confirm_needed(2), 2);
assert_eq!(ReplicationConfig::confirm_needed(3), 2);
assert_eq!(ReplicationConfig::confirm_needed(4), 3);
assert_eq!(ReplicationConfig::confirm_needed(20), 11);
}
#[test]
fn random_intervals_within_bounds() {
let config = ReplicationConfig::default();
let iterations = 50;
for _ in 0..iterations {
let ns = config.random_neighbor_sync_interval();
assert!(ns >= config.neighbor_sync_interval_min);
assert!(ns <= config.neighbor_sync_interval_max);
let at = config.random_audit_tick_interval();
assert!(at >= config.audit_tick_interval_min);
assert!(at <= config.audit_tick_interval_max);
let sl = config.random_self_lookup_interval();
assert!(sl >= config.self_lookup_interval_min);
assert!(sl <= config.self_lookup_interval_max);
}
}
#[test]
fn random_interval_equal_bounds_is_deterministic() {
let fixed = Duration::from_secs(42);
let config = ReplicationConfig {
neighbor_sync_interval_min: fixed,
neighbor_sync_interval_max: fixed,
..ReplicationConfig::default()
};
assert_eq!(config.random_neighbor_sync_interval(), fixed);
}
#[test]
fn scenario_18_invalid_config_rejected() {
let config = ReplicationConfig {
quorum_threshold: 10,
close_group_size: 7,
..ReplicationConfig::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("quorum_threshold"),
"error should mention quorum_threshold: {err}"
);
let config = ReplicationConfig {
close_group_size: 0,
..ReplicationConfig::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("close_group_size"),
"error should mention close_group_size: {err}"
);
let config = ReplicationConfig {
neighbor_sync_interval_min: Duration::from_secs(200),
neighbor_sync_interval_max: Duration::from_secs(100),
..ReplicationConfig::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("neighbor_sync_interval"),
"error should mention neighbor_sync_interval: {err}"
);
let config = ReplicationConfig {
self_lookup_interval_min: Duration::from_secs(999),
self_lookup_interval_max: Duration::from_secs(1),
..ReplicationConfig::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("self_lookup_interval"),
"error should mention self_lookup_interval: {err}"
);
let config = ReplicationConfig {
audit_tick_interval_min: Duration::from_secs(500),
audit_tick_interval_max: Duration::from_secs(10),
..ReplicationConfig::default()
};
let err = config.validate().unwrap_err();
assert!(
err.contains("audit_tick_interval"),
"error should mention audit_tick_interval: {err}"
);
}
#[test]
fn scenario_26_dynamic_paid_threshold_undersized() {
assert_eq!(ReplicationConfig::confirm_needed(8), 5, "floor(8/2)+1 = 5");
assert_eq!(
ReplicationConfig::confirm_needed(1),
1,
"single peer requires 1 confirmation"
);
assert_eq!(
ReplicationConfig::confirm_needed(2),
2,
"2 peers require 2 confirmations"
);
assert_eq!(
ReplicationConfig::confirm_needed(3),
2,
"3 peers require 2 confirmations"
);
assert_eq!(
ReplicationConfig::confirm_needed(0),
1,
"0 peers yields floor(0/2)+1 = 1 (degenerate case)"
);
}
#[test]
fn scenario_31_audit_cadence_within_jitter_bounds() {
let config = ReplicationConfig {
audit_tick_interval_min: Duration::from_secs(1800),
audit_tick_interval_max: Duration::from_secs(3600),
..ReplicationConfig::default()
};
let iterations = 100;
let mut saw_different = false;
let mut prev = Duration::ZERO;
for _ in 0..iterations {
let interval = config.random_audit_tick_interval();
assert!(
interval >= config.audit_tick_interval_min,
"interval {interval:?} below min {:?}",
config.audit_tick_interval_min,
);
assert!(
interval <= config.audit_tick_interval_max,
"interval {interval:?} above max {:?}",
config.audit_tick_interval_max,
);
if interval != prev && prev != Duration::ZERO {
saw_different = true;
}
prev = interval;
}
assert!(
saw_different,
"audit intervals should exhibit randomized jitter across samples"
);
}
}