#![allow(clippy::result_large_err)]
use crate::error::{Error, ErrorKind};
use crate::types::{Budget, RegionId, Time};
use std::collections::VecDeque;
use std::time::Duration;
const MAX_TRANSITION_HISTORY: usize = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DistributedRegionState {
Initializing,
Active,
Degraded,
Recovering,
Closing,
Closed,
}
impl DistributedRegionState {
#[must_use]
pub const fn can_spawn(&self) -> bool {
matches!(self, Self::Active)
}
#[must_use]
pub const fn is_terminal(&self) -> bool {
matches!(self, Self::Closed)
}
#[must_use]
pub const fn is_unhealthy(&self) -> bool {
matches!(self, Self::Degraded | Self::Recovering)
}
#[must_use]
pub const fn can_read(&self) -> bool {
matches!(self, Self::Active | Self::Degraded | Self::Recovering)
}
#[must_use]
pub const fn can_write(&self) -> bool {
matches!(self, Self::Active)
}
#[must_use]
pub const fn is_closing(&self) -> bool {
matches!(self, Self::Closing)
}
#[must_use]
pub const fn allowed_transitions(&self) -> &'static [Self] {
match self {
Self::Initializing => &[Self::Active, Self::Degraded, Self::Closing],
Self::Active => &[Self::Degraded, Self::Closing],
Self::Degraded => &[Self::Recovering, Self::Closing],
Self::Recovering => &[Self::Active, Self::Closing],
Self::Closing => &[Self::Closed],
Self::Closed => &[],
}
}
#[must_use]
pub fn can_transition_to(&self, target: Self) -> bool {
self.allowed_transitions().contains(&target)
}
}
impl std::fmt::Display for DistributedRegionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Initializing => "initializing",
Self::Active => "active",
Self::Degraded => "degraded",
Self::Recovering => "recovering",
Self::Closing => "closing",
Self::Closed => "closed",
};
write!(f, "{s}")
}
}
#[derive(Debug, Clone)]
pub struct StateTransition {
pub from: DistributedRegionState,
pub to: DistributedRegionState,
pub reason: TransitionReason,
pub timestamp: Time,
pub context: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransitionReason {
QuorumReached {
replicas: u32,
required: u32,
},
InitTimeout {
achieved: u32,
required: u32,
},
ReplicaLost {
replica_id: String,
remaining: u32,
},
QuorumLost {
remaining: u32,
required: u32,
},
RecoveryTriggered {
initiator: String,
},
RecoveryComplete {
symbols_used: u32,
duration_ms: u64,
},
RecoveryFailed {
reason: String,
},
LocalClose,
UserClose {
reason: Option<String>,
},
CloseComplete,
Cancelled {
reason: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsistencyLevel {
One,
Quorum,
All,
Local,
}
#[derive(Debug, Clone)]
pub struct DistributedRegionConfig {
pub min_quorum: u32,
pub replication_factor: u32,
pub init_timeout: Duration,
pub recovery_timeout: Duration,
pub allow_degraded: bool,
pub read_consistency: ConsistencyLevel,
pub write_consistency: ConsistencyLevel,
pub replica_timeout: Duration,
}
impl Default for DistributedRegionConfig {
fn default() -> Self {
Self {
min_quorum: 2,
replication_factor: 3,
init_timeout: Duration::from_secs(30),
recovery_timeout: Duration::from_mins(1),
allow_degraded: true,
read_consistency: ConsistencyLevel::One,
write_consistency: ConsistencyLevel::Quorum,
replica_timeout: Duration::from_secs(5),
}
}
}
impl DistributedRegionConfig {
pub(crate) fn validate(&self) -> Result<(), Error> {
if self.replication_factor == 0 {
return Err(Error::new(ErrorKind::ConfigError)
.with_message("distributed region config requires replication_factor >= 1"));
}
if self.min_quorum == 0 || self.min_quorum > self.replication_factor {
return Err(Error::new(ErrorKind::ConfigError).with_message(
"distributed region config requires min_quorum in 1..=replication_factor",
));
}
Ok(())
}
fn assert_valid(&self) {
self.validate()
.expect("distributed region config must satisfy replication/quorum invariants");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicaStatus {
Healthy,
Suspect,
Unavailable,
Syncing,
}
#[derive(Debug, Clone)]
pub struct ReplicaInfo {
pub id: String,
pub address: String,
pub status: ReplicaStatus,
pub last_heartbeat: Time,
pub symbol_count: u32,
}
impl ReplicaInfo {
#[must_use]
pub fn new(id: &str, address: &str) -> Self {
Self {
id: id.to_string(),
address: address.to_string(),
status: ReplicaStatus::Healthy,
last_heartbeat: Time::ZERO,
symbol_count: 0,
}
}
}
#[derive(Debug)]
pub struct DistributedRegionRecord {
pub id: RegionId,
pub state: DistributedRegionState,
pub config: DistributedRegionConfig,
pub replicas: Vec<ReplicaInfo>,
pub transitions: VecDeque<StateTransition>,
pub last_replicated: Option<Time>,
pub parent: Option<RegionId>,
pub budget: Budget,
}
impl DistributedRegionRecord {
#[must_use]
pub fn new(
id: RegionId,
config: DistributedRegionConfig,
parent: Option<RegionId>,
budget: Budget,
) -> Self {
config.assert_valid();
Self {
id,
state: DistributedRegionState::Initializing,
config,
replicas: Vec::with_capacity(3),
transitions: VecDeque::with_capacity(MAX_TRANSITION_HISTORY),
last_replicated: None,
parent,
budget,
}
}
pub fn activate(&mut self, now: Time) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Active)?;
let healthy = self.healthy_replicas();
if healthy < self.config.min_quorum {
return Err(Error::quorum_not_reached(healthy, self.config.min_quorum));
}
let transition = self.record_transition(
DistributedRegionState::Active,
TransitionReason::QuorumReached {
replicas: healthy,
required: self.config.min_quorum,
},
now,
);
Ok(transition)
}
pub fn replica_lost(&mut self, replica_id: &str, now: Time) -> Result<StateTransition, Error> {
self.ensure_replica_mutation_allowed("mark replica lost")?;
let replica = self
.replicas
.iter_mut()
.find(|r| r.id == replica_id)
.ok_or_else(|| {
Error::new(ErrorKind::Internal)
.with_message(format!("replica {replica_id} not found"))
})?;
replica.status = ReplicaStatus::Unavailable;
if let Some(transition) = self.reconcile_replica_change(now) {
return Ok(transition);
}
let healthy = self.healthy_replicas();
Err(Error::new(ErrorKind::Internal).with_message(format!(
"replica {replica_id} lost but quorum maintained ({healthy} healthy)"
)))
}
pub fn trigger_recovery(
&mut self,
initiator: &str,
now: Time,
) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Recovering)?;
let transition = self.record_transition(
DistributedRegionState::Recovering,
TransitionReason::RecoveryTriggered {
initiator: initiator.to_string(),
},
now,
);
Ok(transition)
}
pub fn complete_recovery(
&mut self,
symbols_used: u32,
now: Time,
) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Active)?;
let healthy = self.healthy_replicas();
if healthy < self.config.min_quorum {
return Err(Error::quorum_not_reached(healthy, self.config.min_quorum));
}
let duration_ms = self.transitions.back().map_or(0, |last| {
now.as_nanos().saturating_sub(last.timestamp.as_nanos()) / 1_000_000
});
let transition = self.record_transition(
DistributedRegionState::Active,
TransitionReason::RecoveryComplete {
symbols_used,
duration_ms,
},
now,
);
Ok(transition)
}
pub fn fail_recovery(&mut self, reason: String, now: Time) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Closing)?;
let transition = self.record_transition(
DistributedRegionState::Closing,
TransitionReason::RecoveryFailed { reason },
now,
);
Ok(transition)
}
pub fn begin_close(
&mut self,
reason: TransitionReason,
now: Time,
) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Closing)?;
let transition = self.record_transition(DistributedRegionState::Closing, reason, now);
Ok(transition)
}
pub fn complete_close(&mut self, now: Time) -> Result<StateTransition, Error> {
self.validate_transition(DistributedRegionState::Closed)?;
let transition = self.record_transition(
DistributedRegionState::Closed,
TransitionReason::CloseComplete,
now,
);
Ok(transition)
}
#[must_use]
pub fn current_quorum(&self) -> u32 {
self.healthy_replicas()
}
#[must_use]
pub fn has_quorum(&self) -> bool {
self.healthy_replicas() >= self.config.min_quorum
}
#[must_use]
pub fn healthy_replicas(&self) -> u32 {
self.replicas
.iter()
.filter(|r| r.status == ReplicaStatus::Healthy || r.status == ReplicaStatus::Syncing)
.count() as u32
}
pub fn add_replica(&mut self, info: ReplicaInfo) -> Result<(), Error> {
self.ensure_replica_mutation_allowed("add replica")?;
if self.replicas.iter().any(|r| r.id == info.id) {
return Err(Error::new(ErrorKind::Internal)
.with_message(format!("replica {} already exists", info.id)));
}
self.replicas.push(info);
Ok(())
}
pub fn remove_replica(&mut self, replica_id: &str, now: Time) -> Result<ReplicaInfo, Error> {
self.ensure_replica_mutation_allowed("remove replica")?;
let pos = self
.replicas
.iter()
.position(|r| r.id == replica_id)
.ok_or_else(|| {
Error::new(ErrorKind::Internal)
.with_message(format!("replica {replica_id} not found"))
})?;
let removed = self.replicas.remove(pos);
let _ = self.reconcile_replica_change(now);
Ok(removed)
}
pub fn update_replica_status(
&mut self,
replica_id: &str,
status: ReplicaStatus,
now: Time,
) -> Result<(), Error> {
self.ensure_replica_mutation_allowed("update replica status")?;
let replica = self
.replicas
.iter_mut()
.find(|r| r.id == replica_id)
.ok_or_else(|| {
Error::new(ErrorKind::Internal)
.with_message(format!("replica {replica_id} not found"))
})?;
replica.status = status;
if status == ReplicaStatus::Healthy {
replica.last_heartbeat = now;
}
let _ = self.reconcile_replica_change(now);
Ok(())
}
fn validate_transition(&self, target: DistributedRegionState) -> Result<(), Error> {
if !self.state.can_transition_to(target) {
return Err(
Error::new(ErrorKind::InvalidStateTransition).with_message(format!(
"cannot transition from {} to {}",
self.state, target
)),
);
}
Ok(())
}
fn ensure_replica_mutation_allowed(&self, operation: &str) -> Result<(), Error> {
if self.state.is_terminal() || self.state.is_closing() {
return Err(Error::new(ErrorKind::InvalidStateTransition)
.with_message(format!("cannot {operation} in {} region", self.state)));
}
Ok(())
}
fn reconcile_replica_change(&mut self, now: Time) -> Option<StateTransition> {
let healthy = self.healthy_replicas();
let next_state = match self.state {
DistributedRegionState::Active if healthy < self.config.min_quorum => {
Some(if healthy == 0 || !self.config.allow_degraded {
DistributedRegionState::Closing
} else {
DistributedRegionState::Degraded
})
}
DistributedRegionState::Degraded | DistributedRegionState::Recovering
if healthy == 0 =>
{
Some(DistributedRegionState::Closing)
}
_ => None,
}?;
Some(self.record_transition(
next_state,
TransitionReason::QuorumLost {
remaining: healthy,
required: self.config.min_quorum,
},
now,
))
}
fn record_transition(
&mut self,
to: DistributedRegionState,
reason: TransitionReason,
timestamp: Time,
) -> StateTransition {
let from = self.state;
self.state = to;
let transition = StateTransition {
from,
to,
reason,
timestamp,
context: None,
};
self.transitions.push_back(transition.clone());
if self.transitions.len() > MAX_TRANSITION_HISTORY {
self.transitions.pop_front();
}
transition
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn initializing_predicates() {
let state = DistributedRegionState::Initializing;
assert!(!state.can_spawn());
assert!(!state.is_terminal());
assert!(!state.is_unhealthy());
assert!(!state.can_read());
assert!(!state.can_write());
}
#[test]
fn active_predicates() {
let state = DistributedRegionState::Active;
assert!(state.can_spawn());
assert!(!state.is_terminal());
assert!(!state.is_unhealthy());
assert!(state.can_read());
assert!(state.can_write());
}
#[test]
fn degraded_predicates() {
let state = DistributedRegionState::Degraded;
assert!(!state.can_spawn());
assert!(!state.is_terminal());
assert!(state.is_unhealthy());
assert!(state.can_read());
assert!(!state.can_write());
}
#[test]
fn recovering_predicates() {
let state = DistributedRegionState::Recovering;
assert!(!state.can_spawn());
assert!(!state.is_terminal());
assert!(state.is_unhealthy());
assert!(state.can_read());
assert!(!state.can_write());
}
#[test]
fn closed_is_terminal() {
let state = DistributedRegionState::Closed;
assert!(state.is_terminal());
assert!(!state.can_spawn());
assert!(!state.can_read());
assert!(!state.can_write());
}
#[test]
fn initializing_valid_transitions() {
let state = DistributedRegionState::Initializing;
assert!(state.can_transition_to(DistributedRegionState::Active));
assert!(state.can_transition_to(DistributedRegionState::Degraded));
assert!(state.can_transition_to(DistributedRegionState::Closing));
assert!(!state.can_transition_to(DistributedRegionState::Recovering));
assert!(!state.can_transition_to(DistributedRegionState::Closed));
}
#[test]
fn active_valid_transitions() {
let state = DistributedRegionState::Active;
assert!(state.can_transition_to(DistributedRegionState::Degraded));
assert!(state.can_transition_to(DistributedRegionState::Closing));
assert!(!state.can_transition_to(DistributedRegionState::Initializing));
assert!(!state.can_transition_to(DistributedRegionState::Recovering));
}
#[test]
fn degraded_valid_transitions() {
let state = DistributedRegionState::Degraded;
assert!(state.can_transition_to(DistributedRegionState::Recovering));
assert!(state.can_transition_to(DistributedRegionState::Closing));
assert!(!state.can_transition_to(DistributedRegionState::Active));
}
#[test]
fn recovering_valid_transitions() {
let state = DistributedRegionState::Recovering;
assert!(state.can_transition_to(DistributedRegionState::Active));
assert!(state.can_transition_to(DistributedRegionState::Closing));
assert!(!state.can_transition_to(DistributedRegionState::Degraded));
}
#[test]
fn closed_no_transitions() {
let state = DistributedRegionState::Closed;
assert!(state.allowed_transitions().is_empty());
assert!(!state.can_transition_to(DistributedRegionState::Initializing));
assert!(!state.can_transition_to(DistributedRegionState::Active));
}
#[test]
fn happy_path_lifecycle() {
let config = DistributedRegionConfig::default();
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
assert_eq!(region.state, DistributedRegionState::Initializing);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
region.add_replica(ReplicaInfo::new("r2", "addr2")).unwrap();
let transition = region.activate(Time::from_secs(1)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Active);
assert_eq!(region.state, DistributedRegionState::Active);
let _transition = region
.begin_close(
TransitionReason::UserClose { reason: None },
Time::from_secs(10),
)
.unwrap();
assert_eq!(region.state, DistributedRegionState::Closing);
let _transition = region.complete_close(Time::from_secs(11)).unwrap();
assert_eq!(region.state, DistributedRegionState::Closed);
}
#[test]
fn degraded_path() {
let mut region = create_active_region();
let transition = region.replica_lost("r2", Time::from_secs(5)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Degraded);
assert_eq!(region.state, DistributedRegionState::Degraded);
assert!(region.state.can_read());
assert!(!region.state.can_write());
}
#[test]
fn recovery_path() {
let mut region = create_degraded_region();
let transition = region
.trigger_recovery("operator", Time::from_secs(10))
.unwrap();
assert_eq!(transition.to, DistributedRegionState::Recovering);
assert_eq!(region.state, DistributedRegionState::Recovering);
region
.update_replica_status("r2", ReplicaStatus::Healthy, Time::from_secs(14))
.unwrap();
let transition = region.complete_recovery(42, Time::from_secs(15)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Active);
assert_eq!(region.state, DistributedRegionState::Active);
}
#[test]
fn complete_recovery_requires_quorum() {
let mut region = create_degraded_region();
region
.trigger_recovery("operator", Time::from_secs(10))
.unwrap();
let result = region.complete_recovery(42, Time::from_secs(15));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::QuorumNotReached);
assert_eq!(region.state, DistributedRegionState::Recovering);
}
#[test]
fn recovery_failure() {
let mut region = create_degraded_region();
region
.trigger_recovery("operator", Time::from_secs(10))
.unwrap();
let transition = region
.fail_recovery("insufficient symbols".to_string(), Time::from_secs(15))
.unwrap();
assert_eq!(transition.to, DistributedRegionState::Closing);
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn invalid_transition_error() {
let mut region = create_active_region();
let result = region.trigger_recovery("test", Time::from_secs(1));
assert!(result.is_err());
assert_eq!(
result.unwrap_err().kind(),
ErrorKind::InvalidStateTransition
);
}
#[test]
fn activate_without_quorum_error() {
let config = DistributedRegionConfig {
min_quorum: 2,
..Default::default()
};
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
let result = region.activate(Time::from_secs(1));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::QuorumNotReached);
}
#[test]
fn close_from_any_non_terminal_state() {
for state in [
DistributedRegionState::Initializing,
DistributedRegionState::Active,
DistributedRegionState::Degraded,
DistributedRegionState::Recovering,
] {
assert!(
state.can_transition_to(DistributedRegionState::Closing),
"should be able to close from {state}"
);
}
}
#[test]
fn duplicate_replica_error() {
let mut region = create_active_region();
let result = region.add_replica(ReplicaInfo::new("r1", "addr1"));
assert!(result.is_err());
}
#[test]
fn remove_unknown_replica_error() {
let mut region = create_active_region();
let result = region.remove_replica("nonexistent", Time::from_secs(7));
assert!(result.is_err());
}
#[test]
fn replica_lost_unknown_replica_error_does_not_mutate_state() {
let mut region = create_active_region();
let prev_state = region.state;
let prev_healthy = region.healthy_replicas();
let prev_transitions = region.transitions.len();
let result = region.replica_lost("nonexistent", Time::from_secs(9));
assert!(result.is_err());
assert_eq!(region.state, prev_state);
assert_eq!(region.healthy_replicas(), prev_healthy);
assert_eq!(region.transitions.len(), prev_transitions);
}
#[test]
fn quorum_calculation() {
let config = DistributedRegionConfig {
min_quorum: 2,
replication_factor: 3,
..Default::default()
};
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
assert_eq!(region.current_quorum(), 0);
assert!(!region.has_quorum());
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
assert_eq!(region.current_quorum(), 1);
assert!(!region.has_quorum());
region.add_replica(ReplicaInfo::new("r2", "addr2")).unwrap();
assert_eq!(region.current_quorum(), 2);
assert!(region.has_quorum());
}
#[test]
fn replica_status_update() {
let mut region = create_active_region();
region
.update_replica_status("r1", ReplicaStatus::Suspect, Time::from_secs(3))
.unwrap();
let r1 = region.replicas.iter().find(|r| r.id == "r1").unwrap();
assert_eq!(r1.status, ReplicaStatus::Suspect);
assert_eq!(region.state, DistributedRegionState::Degraded);
}
#[test]
fn quorum_loss_closes_when_degraded_mode_disabled() {
let config = DistributedRegionConfig {
min_quorum: 2,
replication_factor: 3,
allow_degraded: false,
..Default::default()
};
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
region.add_replica(ReplicaInfo::new("r2", "addr2")).unwrap();
region.add_replica(ReplicaInfo::new("r3", "addr3")).unwrap();
region.activate(Time::from_secs(0)).unwrap();
let first_loss = region.replica_lost("r1", Time::from_secs(5));
assert!(first_loss.is_err());
assert_eq!(region.state, DistributedRegionState::Active);
let transition = region.replica_lost("r2", Time::from_secs(6)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Closing);
assert_eq!(
transition.reason,
TransitionReason::QuorumLost {
remaining: 1,
required: 2,
}
);
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn active_region_closes_when_last_available_replica_is_lost() {
let config = DistributedRegionConfig {
min_quorum: 1,
replication_factor: 1,
allow_degraded: true,
..Default::default()
};
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
region.activate(Time::from_secs(0)).unwrap();
let transition = region.replica_lost("r1", Time::from_secs(1)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Closing);
assert_eq!(
transition.reason,
TransitionReason::QuorumLost {
remaining: 0,
required: 1,
}
);
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn degraded_region_closes_when_last_available_replica_is_lost() {
let mut region = create_degraded_region();
let transition = region.replica_lost("r1", Time::from_secs(6)).unwrap();
assert_eq!(transition.to, DistributedRegionState::Closing);
assert_eq!(
transition.reason,
TransitionReason::QuorumLost {
remaining: 0,
required: 2,
}
);
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn recovering_region_closes_when_last_available_replica_becomes_unavailable() {
let mut region = create_degraded_region();
region
.trigger_recovery("operator", Time::from_secs(10))
.unwrap();
region
.update_replica_status("r1", ReplicaStatus::Unavailable, Time::from_secs(11))
.unwrap();
let transition = region.transitions.back().expect("closing transition");
assert_eq!(transition.to, DistributedRegionState::Closing);
assert_eq!(
transition.reason,
TransitionReason::QuorumLost {
remaining: 0,
required: 2,
}
);
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn closing_region_rejects_replica_mutations() {
let mut region = create_active_region();
region
.begin_close(
TransitionReason::UserClose { reason: None },
Time::from_secs(10),
)
.unwrap();
let add_err = region
.add_replica(ReplicaInfo::new("r3", "addr3"))
.unwrap_err();
assert_eq!(add_err.kind(), ErrorKind::InvalidStateTransition);
let update_err = region
.update_replica_status("r1", ReplicaStatus::Suspect, Time::from_secs(12))
.unwrap_err();
assert_eq!(update_err.kind(), ErrorKind::InvalidStateTransition);
let remove_err = region
.remove_replica("r1", Time::from_secs(13))
.unwrap_err();
assert_eq!(remove_err.kind(), ErrorKind::InvalidStateTransition);
let lost_err = region.replica_lost("r1", Time::from_secs(14)).unwrap_err();
assert_eq!(lost_err.kind(), ErrorKind::InvalidStateTransition);
}
#[test]
fn remove_replica() {
let mut region = create_active_region();
let removed = region.remove_replica("r2", Time::from_secs(6)).unwrap();
assert_eq!(removed.id, "r2");
assert_eq!(region.replicas.len(), 1);
assert_eq!(region.state, DistributedRegionState::Degraded);
}
#[test]
fn remove_replica_closes_when_degraded_mode_disabled() {
let config = DistributedRegionConfig {
min_quorum: 2,
replication_factor: 3,
allow_degraded: false,
..Default::default()
};
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
region.add_replica(ReplicaInfo::new("r2", "addr2")).unwrap();
region.activate(Time::from_secs(0)).unwrap();
let removed = region.remove_replica("r2", Time::from_secs(6)).unwrap();
assert_eq!(removed.id, "r2");
assert_eq!(region.state, DistributedRegionState::Closing);
}
#[test]
fn closed_region_rejects_replica_mutations() {
let mut region = create_active_region();
region
.begin_close(
TransitionReason::UserClose { reason: None },
Time::from_secs(10),
)
.unwrap();
region.complete_close(Time::from_secs(11)).unwrap();
let add_err = region
.add_replica(ReplicaInfo::new("r3", "addr3"))
.unwrap_err();
assert_eq!(add_err.kind(), ErrorKind::InvalidStateTransition);
let update_err = region
.update_replica_status("r1", ReplicaStatus::Suspect, Time::from_secs(12))
.unwrap_err();
assert_eq!(update_err.kind(), ErrorKind::InvalidStateTransition);
let remove_err = region
.remove_replica("r1", Time::from_secs(13))
.unwrap_err();
assert_eq!(remove_err.kind(), ErrorKind::InvalidStateTransition);
let lost_err = region.replica_lost("r1", Time::from_secs(14)).unwrap_err();
assert_eq!(lost_err.kind(), ErrorKind::InvalidStateTransition);
}
#[test]
fn state_display() {
assert_eq!(
format!("{}", DistributedRegionState::Initializing),
"initializing"
);
assert_eq!(format!("{}", DistributedRegionState::Active), "active");
assert_eq!(format!("{}", DistributedRegionState::Degraded), "degraded");
assert_eq!(
format!("{}", DistributedRegionState::Recovering),
"recovering"
);
assert_eq!(format!("{}", DistributedRegionState::Closing), "closing");
assert_eq!(format!("{}", DistributedRegionState::Closed), "closed");
}
#[test]
fn transition_history_bounded() {
let mut region = create_active_region();
for _ in 0..MAX_TRANSITION_HISTORY + 10 {
region.state = DistributedRegionState::Initializing;
let _ = region.activate(Time::from_secs(1));
}
assert!(region.transitions.len() <= MAX_TRANSITION_HISTORY);
}
#[test]
fn config_default() {
let config = DistributedRegionConfig::default();
assert_eq!(config.min_quorum, 2);
assert_eq!(config.replication_factor, 3);
assert!(config.allow_degraded);
assert_eq!(config.read_consistency, ConsistencyLevel::One);
assert_eq!(config.write_consistency, ConsistencyLevel::Quorum);
}
#[test]
#[should_panic(expected = "replication_factor >= 1")]
fn distributed_region_new_rejects_zero_replication_factor() {
let config = DistributedRegionConfig {
min_quorum: 1,
replication_factor: 0,
..Default::default()
};
let _ = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
}
#[test]
#[should_panic(expected = "min_quorum in 1..=replication_factor")]
fn distributed_region_new_rejects_zero_quorum() {
let config = DistributedRegionConfig {
min_quorum: 0,
replication_factor: 1,
..Default::default()
};
let _ = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
}
#[test]
#[should_panic(expected = "min_quorum in 1..=replication_factor")]
fn distributed_region_new_rejects_quorum_above_replication_factor() {
let config = DistributedRegionConfig {
min_quorum: 3,
replication_factor: 2,
..Default::default()
};
let _ = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
}
fn create_active_region() -> DistributedRegionRecord {
let config = DistributedRegionConfig::default();
let mut region = DistributedRegionRecord::new(
RegionId::new_ephemeral(),
config,
None,
Budget::default(),
);
region.add_replica(ReplicaInfo::new("r1", "addr1")).unwrap();
region.add_replica(ReplicaInfo::new("r2", "addr2")).unwrap();
region.activate(Time::from_secs(0)).unwrap();
region
}
fn create_degraded_region() -> DistributedRegionRecord {
let mut region = create_active_region();
region.replica_lost("r2", Time::from_secs(5)).unwrap();
region
}
#[test]
fn distributed_region_state_debug_clone_copy_hash_eq() {
use std::collections::HashSet;
let s = DistributedRegionState::Active;
let dbg = format!("{s:?}");
assert!(dbg.contains("Active"), "{dbg}");
let copied: DistributedRegionState = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, DistributedRegionState::Closed);
let mut set = HashSet::new();
set.insert(DistributedRegionState::Initializing);
set.insert(DistributedRegionState::Active);
set.insert(DistributedRegionState::Degraded);
assert_eq!(set.len(), 3);
}
#[test]
fn consistency_level_debug_clone_copy_eq() {
let c = ConsistencyLevel::Quorum;
let dbg = format!("{c:?}");
assert!(dbg.contains("Quorum"), "{dbg}");
let copied: ConsistencyLevel = c;
let cloned = c;
assert_eq!(copied, cloned);
assert_ne!(c, ConsistencyLevel::All);
}
#[test]
fn distributed_region_config_debug_clone_default() {
let c = DistributedRegionConfig::default();
let dbg = format!("{c:?}");
assert!(dbg.contains("DistributedRegionConfig"), "{dbg}");
assert_eq!(c.min_quorum, 2);
let cloned = c;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn replica_status_debug_clone_copy_eq() {
let s = ReplicaStatus::Healthy;
let dbg = format!("{s:?}");
assert!(dbg.contains("Healthy"), "{dbg}");
let copied: ReplicaStatus = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, ReplicaStatus::Unavailable);
}
}