use crate::adapter::net::behavior::placement::NodeId;
pub const REPLICATION_FACTOR_MIN: u8 = 1;
pub const REPLICATION_FACTOR_MAX: u8 = 16;
pub const REPLICATION_FACTOR_DEFAULT: u8 = 3;
pub const HEARTBEAT_MS_MIN: u64 = 100;
pub const HEARTBEAT_MS_MAX: u64 = 300_000;
pub const HEARTBEAT_MS_DEFAULT: u64 = 500;
pub const REPLICATION_BUDGET_FRACTION_DEFAULT: f32 = 0.5;
pub const BACKGROUND_FRACTION_DEFAULT: f32 = 0.3;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub enum PlacementStrategy {
#[default]
Standard,
Pinned(Vec<NodeId>),
ColocationStrict,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum UnderCapacity {
#[default]
Withdraw,
EvictOldest,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReplicationConfig {
pub factor: u8,
pub placement: PlacementStrategy,
pub heartbeat_ms: u64,
pub leader_pinned: Option<NodeId>,
pub on_under_capacity: UnderCapacity,
pub replication_budget_fraction: f32,
pub default_bandwidth_class: super::bandwidth::BandwidthClass,
pub background_fraction: f32,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self::new()
}
}
impl ReplicationConfig {
pub fn new() -> Self {
Self {
factor: REPLICATION_FACTOR_DEFAULT,
placement: PlacementStrategy::default(),
heartbeat_ms: HEARTBEAT_MS_DEFAULT,
leader_pinned: None,
on_under_capacity: UnderCapacity::default(),
replication_budget_fraction: REPLICATION_BUDGET_FRACTION_DEFAULT,
default_bandwidth_class: super::bandwidth::BandwidthClass::Foreground,
background_fraction: BACKGROUND_FRACTION_DEFAULT,
}
}
pub fn with_factor(mut self, factor: u8) -> Self {
self.factor = factor;
self
}
pub fn with_placement(mut self, placement: PlacementStrategy) -> Self {
self.placement = placement;
self
}
pub fn with_heartbeat_ms(mut self, heartbeat_ms: u64) -> Self {
self.heartbeat_ms = heartbeat_ms;
self
}
pub fn with_leader_pinned(mut self, leader: Option<NodeId>) -> Self {
self.leader_pinned = leader;
self
}
pub fn with_on_under_capacity(mut self, on_under_capacity: UnderCapacity) -> Self {
self.on_under_capacity = on_under_capacity;
self
}
pub fn with_replication_budget_fraction(mut self, fraction: f32) -> Self {
self.replication_budget_fraction = fraction;
self
}
pub fn with_default_bandwidth_class(mut self, class: super::bandwidth::BandwidthClass) -> Self {
self.default_bandwidth_class = class;
self
}
pub fn with_background_fraction(mut self, fraction: f32) -> Self {
self.background_fraction = fraction;
self
}
pub fn effective_factor(&self) -> u8 {
match &self.placement {
PlacementStrategy::Pinned(nodes) => {
u8::try_from(nodes.len()).unwrap_or(REPLICATION_FACTOR_MAX)
}
_ => self.factor,
}
}
pub fn validate(&self) -> Result<(), ReplicationConfigError> {
if self.factor < REPLICATION_FACTOR_MIN {
return Err(ReplicationConfigError::FactorBelowMin {
got: self.factor,
min: REPLICATION_FACTOR_MIN,
});
}
if self.factor > REPLICATION_FACTOR_MAX {
return Err(ReplicationConfigError::FactorAboveMax {
got: self.factor,
max: REPLICATION_FACTOR_MAX,
});
}
if self.heartbeat_ms < HEARTBEAT_MS_MIN {
return Err(ReplicationConfigError::HeartbeatTooLow {
got: self.heartbeat_ms,
min: HEARTBEAT_MS_MIN,
});
}
if self.heartbeat_ms > HEARTBEAT_MS_MAX {
return Err(ReplicationConfigError::HeartbeatTooHigh {
got: self.heartbeat_ms,
max: HEARTBEAT_MS_MAX,
});
}
if !self.replication_budget_fraction.is_finite()
|| self.replication_budget_fraction <= 0.0
|| self.replication_budget_fraction > 1.0
{
return Err(ReplicationConfigError::BudgetFractionOutOfRange {
got: self.replication_budget_fraction,
});
}
if !self.background_fraction.is_finite()
|| self.background_fraction < 0.0
|| self.background_fraction >= 1.0
{
return Err(ReplicationConfigError::BackgroundFractionOutOfRange {
got: self.background_fraction,
});
}
if let PlacementStrategy::Pinned(nodes) = &self.placement {
if nodes.is_empty() {
return Err(ReplicationConfigError::PinnedSetEmpty);
}
if nodes.len() > REPLICATION_FACTOR_MAX as usize {
return Err(ReplicationConfigError::PinnedSetTooLarge {
got: nodes.len(),
max: REPLICATION_FACTOR_MAX as usize,
});
}
let mut sorted = nodes.clone();
sorted.sort_unstable();
for w in sorted.windows(2) {
if w[0] == w[1] {
return Err(ReplicationConfigError::PinnedDuplicate { node_id: w[0] });
}
}
if let Some(leader) = self.leader_pinned {
if !nodes.contains(&leader) {
return Err(ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader });
}
}
}
Ok(())
}
}
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ReplicationConfigError {
#[error("replication factor {got} below minimum {min}")]
FactorBelowMin {
got: u8,
min: u8,
},
#[error("replication factor {got} above maximum {max}")]
FactorAboveMax {
got: u8,
max: u8,
},
#[error("heartbeat_ms {got} below minimum {min} ms")]
HeartbeatTooLow {
got: u64,
min: u64,
},
#[error("heartbeat_ms {got} above maximum {max} ms")]
HeartbeatTooHigh {
got: u64,
max: u64,
},
#[error("replication_budget_fraction {got} outside (0.0, 1.0] or non-finite")]
BudgetFractionOutOfRange {
got: f32,
},
#[error("background_fraction {got} outside [0.0, 1.0) or non-finite")]
BackgroundFractionOutOfRange {
got: f32,
},
#[error("PlacementStrategy::Pinned must list at least one NodeId")]
PinnedSetEmpty,
#[error("PlacementStrategy::Pinned has {got} nodes; ceiling is {max}")]
PinnedSetTooLarge {
got: usize,
max: usize,
},
#[error("PlacementStrategy::Pinned contains duplicate NodeId {node_id:#x}")]
PinnedDuplicate {
node_id: NodeId,
},
#[error("leader_pinned {leader:#x} is not in the PlacementStrategy::Pinned set")]
LeaderPinnedOutsidePinnedSet {
leader: NodeId,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_validates() {
let cfg = ReplicationConfig::new();
assert_eq!(cfg.factor, REPLICATION_FACTOR_DEFAULT);
assert_eq!(cfg.heartbeat_ms, HEARTBEAT_MS_DEFAULT);
assert_eq!(cfg.placement, PlacementStrategy::Standard);
assert_eq!(cfg.on_under_capacity, UnderCapacity::Withdraw);
assert!(cfg.leader_pinned.is_none());
assert!((cfg.replication_budget_fraction - 0.5).abs() < f32::EPSILON);
cfg.validate().expect("defaults must validate");
}
#[test]
fn builder_chain_threads_through() {
let cfg = ReplicationConfig::new()
.with_factor(5)
.with_heartbeat_ms(250)
.with_placement(PlacementStrategy::ColocationStrict)
.with_on_under_capacity(UnderCapacity::EvictOldest)
.with_leader_pinned(Some(0xDEAD_BEEF))
.with_replication_budget_fraction(0.75);
assert_eq!(cfg.factor, 5);
assert_eq!(cfg.heartbeat_ms, 250);
assert_eq!(cfg.placement, PlacementStrategy::ColocationStrict);
assert_eq!(cfg.on_under_capacity, UnderCapacity::EvictOldest);
assert_eq!(cfg.leader_pinned, Some(0xDEAD_BEEF));
assert!((cfg.replication_budget_fraction - 0.75).abs() < f32::EPSILON);
cfg.validate().expect("built config must validate");
}
#[test]
fn factor_below_min_rejected() {
let cfg = ReplicationConfig::new().with_factor(0);
let err = cfg.validate().expect_err("factor=0 must fail");
assert!(matches!(
err,
ReplicationConfigError::FactorBelowMin { got: 0, min: 1 }
));
}
#[test]
fn factor_above_max_rejected() {
let cfg = ReplicationConfig::new().with_factor(REPLICATION_FACTOR_MAX + 1);
let err = cfg.validate().expect_err("factor>max must fail");
assert!(matches!(
err,
ReplicationConfigError::FactorAboveMax { got: 17, max: 16 }
));
}
#[test]
fn heartbeat_below_min_rejected() {
let cfg = ReplicationConfig::new().with_heartbeat_ms(50);
let err = cfg.validate().expect_err("heartbeat=50ms must fail");
assert!(matches!(
err,
ReplicationConfigError::HeartbeatTooLow { got: 50, min: 100 }
));
}
#[test]
fn heartbeat_above_max_rejected() {
let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX + 1);
let err = cfg.validate().expect_err("heartbeat above max must fail");
match err {
ReplicationConfigError::HeartbeatTooHigh { got, max } => {
assert_eq!(got, HEARTBEAT_MS_MAX + 1);
assert_eq!(max, HEARTBEAT_MS_MAX);
}
other => panic!("expected HeartbeatTooHigh, got {other:?}"),
}
}
#[test]
fn heartbeat_at_max_accepted() {
let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX);
cfg.validate()
.expect("heartbeat at the ceiling is permitted (inclusive)");
}
#[test]
fn budget_fraction_out_of_range_rejected() {
for bad in [0.0, -0.5, 1.5, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
let cfg = ReplicationConfig::new().with_replication_budget_fraction(bad);
let err = cfg
.validate()
.expect_err(&format!("budget={bad} must fail but didn't"));
assert!(
matches!(err, ReplicationConfigError::BudgetFractionOutOfRange { .. }),
"budget={bad} produced wrong error: {err:?}"
);
}
ReplicationConfig::new()
.with_replication_budget_fraction(1.0)
.validate()
.expect("budget=1.0 is the inclusive upper bound");
}
#[test]
fn pinned_empty_rejected() {
let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(vec![]));
let err = cfg.validate().expect_err("empty pinned set must fail");
assert_eq!(err, ReplicationConfigError::PinnedSetEmpty);
}
#[test]
fn pinned_too_large_rejected() {
let nodes = (0..(REPLICATION_FACTOR_MAX as u64 + 1)).collect();
let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(nodes));
let err = cfg.validate().expect_err("oversized pinned set must fail");
assert!(matches!(
err,
ReplicationConfigError::PinnedSetTooLarge { got: 17, max: 16 }
));
}
#[test]
fn pinned_duplicate_rejected() {
let cfg = ReplicationConfig::new()
.with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xAA]));
let err = cfg.validate().expect_err("duplicate NodeId must fail");
assert!(matches!(
err,
ReplicationConfigError::PinnedDuplicate { node_id: 0xAA }
));
}
#[test]
fn pinned_leader_outside_set_rejected() {
let cfg = ReplicationConfig::new()
.with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
.with_leader_pinned(Some(0xDD));
let err = cfg.validate().expect_err("leader outside set must fail");
assert!(matches!(
err,
ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader: 0xDD }
));
}
#[test]
fn pinned_leader_inside_set_validates() {
let cfg = ReplicationConfig::new()
.with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
.with_leader_pinned(Some(0xBB));
cfg.validate().expect("leader in set must validate");
}
#[test]
fn pinned_leader_with_standard_placement_validates() {
let cfg = ReplicationConfig::new().with_leader_pinned(Some(0x1234_5678));
cfg.validate()
.expect("standard + leader_pinned must validate");
}
#[test]
fn effective_factor_honors_pinned_length() {
let cfg = ReplicationConfig::new()
.with_factor(7) .with_placement(PlacementStrategy::Pinned(vec![1, 2, 3, 4]));
assert_eq!(cfg.effective_factor(), 4);
}
#[test]
fn effective_factor_falls_back_to_factor_for_non_pinned() {
let cfg = ReplicationConfig::new()
.with_factor(7)
.with_placement(PlacementStrategy::Standard);
assert_eq!(cfg.effective_factor(), 7);
let cfg = ReplicationConfig::new()
.with_factor(5)
.with_placement(PlacementStrategy::ColocationStrict);
assert_eq!(cfg.effective_factor(), 5);
}
#[test]
fn factor_boundary_min_and_max_validate() {
ReplicationConfig::new()
.with_factor(REPLICATION_FACTOR_MIN)
.validate()
.expect("factor=min must validate");
ReplicationConfig::new()
.with_factor(REPLICATION_FACTOR_MAX)
.validate()
.expect("factor=max must validate");
}
#[test]
fn heartbeat_at_min_validates() {
ReplicationConfig::new()
.with_heartbeat_ms(HEARTBEAT_MS_MIN)
.validate()
.expect("heartbeat=min must validate");
}
}