#![allow(clippy::result_large_err)]
use std::time::Duration;
use super::snapshot::{BudgetSnapshot, RegionSnapshot, TaskSnapshot, TaskState};
use crate::error::{Error, ErrorKind};
use crate::record::distributed_region::{
ConsistencyLevel, DistributedRegionConfig, DistributedRegionRecord, DistributedRegionState,
ReplicaInfo, StateTransition, TransitionReason,
};
use crate::record::region::{RegionRecord, RegionState};
use crate::types::budget::Budget;
use crate::types::cancel::CancelReason;
use crate::types::{RegionId, TaskId, Time};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum RegionMode {
#[default]
Local,
Distributed {
replication_factor: u32,
consistency: ConsistencyLevel,
},
Hybrid {
replication_factor: u32,
max_lag: Duration,
},
}
impl RegionMode {
#[must_use]
pub const fn local() -> Self {
Self::Local
}
#[must_use]
pub fn distributed(replication_factor: u32) -> Self {
assert!(
replication_factor > 0,
"distributed region mode requires at least one replica"
);
Self::Distributed {
replication_factor,
consistency: ConsistencyLevel::Quorum,
}
}
#[must_use]
pub fn hybrid(replication_factor: u32) -> Self {
assert!(
replication_factor > 0,
"hybrid region mode requires at least one replica"
);
Self::Hybrid {
replication_factor,
max_lag: Duration::from_secs(5),
}
}
fn assert_valid(self) {
match self {
Self::Local => {}
Self::Distributed {
replication_factor, ..
} => assert!(
replication_factor > 0,
"distributed region mode requires at least one replica"
),
Self::Hybrid {
replication_factor, ..
} => assert!(
replication_factor > 0,
"hybrid region mode requires at least one replica"
),
}
}
const fn min_quorum(replication_factor: u32, consistency: ConsistencyLevel) -> u32 {
match consistency {
ConsistencyLevel::One | ConsistencyLevel::Local => 1,
ConsistencyLevel::Quorum => replication_factor / 2 + 1,
ConsistencyLevel::All => replication_factor,
}
}
#[must_use]
pub const fn is_replicated(&self) -> bool {
!matches!(self, Self::Local)
}
#[must_use]
pub const fn is_distributed(&self) -> bool {
matches!(self, Self::Distributed { .. })
}
#[must_use]
pub const fn replication_factor(&self) -> u32 {
match self {
Self::Local => 1,
Self::Distributed {
replication_factor, ..
}
| Self::Hybrid {
replication_factor, ..
} => *replication_factor,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
Synchronous,
Asynchronous,
WriteSync,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
DistributedWins,
LocalWins,
HighestSequence,
Error,
}
#[derive(Debug, Clone)]
pub struct BridgeConfig {
pub allow_upgrade: bool,
pub sync_timeout: Duration,
pub sync_mode: SyncMode,
pub conflict_resolution: ConflictResolution,
}
impl Default for BridgeConfig {
fn default() -> Self {
Self {
allow_upgrade: true,
sync_timeout: Duration::from_secs(5),
sync_mode: SyncMode::Synchronous,
conflict_resolution: ConflictResolution::DistributedWins,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SyncState {
pub last_synced_sequence: u64,
pub sync_pending: bool,
pub pending_ops: u32,
pub last_sync_time: Option<Time>,
pub last_sync_error: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EffectiveState {
Open,
Degraded,
Recovering,
Closing,
Closed,
Inconsistent {
local: RegionState,
distributed: DistributedRegionState,
},
}
impl EffectiveState {
#[must_use]
pub fn compute(local: RegionState, distributed: Option<DistributedRegionState>) -> Self {
match (local, distributed) {
(local_s, None) => Self::from_local(local_s),
(
RegionState::Open,
Some(DistributedRegionState::Active | DistributedRegionState::Initializing),
) => Self::Open,
(RegionState::Open, Some(DistributedRegionState::Degraded)) => Self::Degraded,
(RegionState::Open, Some(DistributedRegionState::Recovering)) => Self::Recovering,
(
RegionState::Closing | RegionState::Draining | RegionState::Finalizing,
Some(DistributedRegionState::Closing),
) => Self::Closing,
(RegionState::Closed, Some(DistributedRegionState::Closed)) => Self::Closed,
(local_s, Some(dist_s)) => Self::Inconsistent {
local: local_s,
distributed: dist_s,
},
}
}
fn from_local(local: RegionState) -> Self {
match local {
RegionState::Open => Self::Open,
RegionState::Closing | RegionState::Draining | RegionState::Finalizing => Self::Closing,
RegionState::Closed => Self::Closed,
}
}
#[must_use]
pub const fn can_spawn(&self) -> bool {
matches!(self, Self::Open)
}
#[must_use]
pub const fn is_inconsistent(&self) -> bool {
matches!(self, Self::Inconsistent { .. })
}
#[must_use]
pub const fn needs_recovery(&self) -> bool {
matches!(
self,
Self::Degraded | Self::Recovering | Self::Inconsistent { .. }
)
}
}
pub trait LocalToDistributed {
type Distributed;
fn to_distributed(&self) -> Self::Distributed;
}
pub trait DistributedToLocal {
type Local;
fn to_local(&self) -> Self::Local;
fn is_lossless(&self) -> bool;
}
impl LocalToDistributed for RegionState {
type Distributed = DistributedRegionState;
fn to_distributed(&self) -> DistributedRegionState {
match self {
Self::Open => DistributedRegionState::Active,
Self::Closing | Self::Draining | Self::Finalizing => DistributedRegionState::Closing,
Self::Closed => DistributedRegionState::Closed,
}
}
}
impl DistributedToLocal for DistributedRegionState {
type Local = RegionState;
fn to_local(&self) -> RegionState {
match self {
Self::Initializing | Self::Active | Self::Degraded | Self::Recovering => {
RegionState::Open
}
Self::Closing => RegionState::Closing,
Self::Closed => RegionState::Closed,
}
}
fn is_lossless(&self) -> bool {
matches!(self, Self::Active | Self::Closing | Self::Closed)
}
}
impl LocalToDistributed for Budget {
type Distributed = BudgetSnapshot;
fn to_distributed(&self) -> BudgetSnapshot {
BudgetSnapshot {
deadline_nanos: self.deadline.map(Time::as_nanos),
polls_remaining: if self.poll_quota > 0 {
Some(self.poll_quota)
} else {
None
},
cost_remaining: self.cost_quota,
}
}
}
impl DistributedToLocal for BudgetSnapshot {
type Local = Budget;
fn to_local(&self) -> Budget {
let mut budget = Budget::default();
if let Some(d) = self.deadline_nanos {
budget.deadline = Some(Time::from_nanos(d));
}
if let Some(p) = self.polls_remaining {
budget.poll_quota = p;
}
if let Some(c) = self.cost_remaining {
budget.cost_quota = Some(c);
}
budget
}
fn is_lossless(&self) -> bool {
false }
}
#[derive(Debug)]
pub struct CloseResult {
pub local_changed: bool,
pub distributed_transition: Option<StateTransition>,
pub effective_state: EffectiveState,
}
#[derive(Debug)]
pub struct UpgradeResult {
pub previous_mode: RegionMode,
pub new_mode: RegionMode,
pub snapshot_sequence: u64,
}
#[derive(Debug)]
pub enum SyncResult {
NotNeeded,
Synced {
sequence: u64,
},
Pending {
sequence: u64,
},
}
#[derive(Debug)]
pub struct RegionBridge {
local: RegionRecord,
distributed: Option<DistributedRegionRecord>,
mode: RegionMode,
pub sync_state: SyncState,
pub config: BridgeConfig,
sequence: u64,
}
impl RegionBridge {
fn mark_sync_pending(&mut self) {
self.sync_state.sync_pending = true;
self.sync_state.pending_ops = self.sync_state.pending_ops.saturating_add(1);
}
#[must_use]
pub fn new_local(id: RegionId, parent: Option<RegionId>, budget: Budget) -> Self {
Self {
local: RegionRecord::new(id, parent, budget),
distributed: None,
mode: RegionMode::Local,
sync_state: SyncState::default(),
config: BridgeConfig::default(),
sequence: 0,
}
}
#[must_use]
pub fn new_distributed(
id: RegionId,
parent: Option<RegionId>,
budget: Budget,
config: DistributedRegionConfig,
) -> Self {
let replication_factor = config.replication_factor;
let consistency = config.write_consistency;
let distributed = DistributedRegionRecord::new(id, config, parent, budget);
Self {
local: RegionRecord::new(id, parent, budget),
distributed: Some(distributed),
mode: RegionMode::Distributed {
replication_factor,
consistency,
},
sync_state: SyncState::default(),
config: BridgeConfig::default(),
sequence: 0,
}
}
#[must_use]
pub fn with_mode(
id: RegionId,
parent: Option<RegionId>,
budget: Budget,
mode: RegionMode,
) -> Self {
mode.assert_valid();
match mode {
RegionMode::Local | RegionMode::Hybrid { .. } => Self {
local: RegionRecord::new(id, parent, budget),
distributed: None,
mode,
sync_state: SyncState::default(),
config: BridgeConfig::default(),
sequence: 0,
},
RegionMode::Distributed {
replication_factor,
consistency,
} => {
let config = DistributedRegionConfig {
min_quorum: RegionMode::min_quorum(replication_factor, consistency),
replication_factor,
write_consistency: consistency,
..Default::default()
};
Self::new_distributed(id, parent, budget, config)
}
}
}
#[must_use]
pub fn id(&self) -> RegionId {
self.local.id
}
#[must_use]
pub fn mode(&self) -> RegionMode {
self.mode
}
#[must_use]
pub fn local_state(&self) -> RegionState {
self.local.state()
}
#[must_use]
pub fn distributed_state(&self) -> Option<DistributedRegionState> {
self.distributed.as_ref().map(|d| d.state)
}
#[must_use]
pub fn effective_state(&self) -> EffectiveState {
EffectiveState::compute(self.local_state(), self.distributed_state())
}
#[must_use]
pub fn can_spawn(&self) -> bool {
self.effective_state().can_spawn()
}
#[must_use]
pub fn has_live_work(&self) -> bool {
self.local.has_live_work()
}
#[must_use]
pub fn local(&self) -> &RegionRecord {
&self.local
}
#[must_use]
pub fn distributed(&self) -> Option<&DistributedRegionRecord> {
self.distributed.as_ref()
}
pub fn begin_close(
&mut self,
reason: Option<CancelReason>,
now: Time,
) -> Result<CloseResult, Error> {
let transition_reason = reason.as_ref().map_or(TransitionReason::LocalClose, |r| {
TransitionReason::Cancelled {
reason: r.kind.as_str().to_owned(),
}
});
let local_changed = self.local.begin_close(reason);
let distributed_transition = if let Some(ref mut dist) = self.distributed {
match dist.state {
DistributedRegionState::Closing | DistributedRegionState::Closed => None,
_ => Some(dist.begin_close(transition_reason, now)?),
}
} else {
None
};
if local_changed || distributed_transition.is_some() {
self.mark_sync_pending();
}
Ok(CloseResult {
local_changed,
distributed_transition,
effective_state: self.effective_state(),
})
}
pub fn begin_drain(&mut self) -> Result<bool, Error> {
let changed = self.local.begin_drain();
if changed {
self.mark_sync_pending();
}
Ok(changed)
}
pub fn begin_finalize(&mut self) -> Result<bool, Error> {
let changed = self.local.begin_finalize();
if changed {
self.mark_sync_pending();
}
Ok(changed)
}
pub fn complete_close(&mut self, now: Time) -> Result<CloseResult, Error> {
let local_changed = self.local.complete_close();
let distributed_transition = if let Some(ref mut dist) = self.distributed {
match dist.state {
DistributedRegionState::Closed => None,
_ => Some(dist.complete_close(now)?),
}
} else {
None
};
if local_changed || distributed_transition.is_some() {
self.mark_sync_pending();
}
Ok(CloseResult {
local_changed,
distributed_transition,
effective_state: self.effective_state(),
})
}
pub fn add_child(&mut self, child: RegionId) -> Result<(), Error> {
if !self.can_spawn() {
return Err(
Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
);
}
let before = self.local.child_ids().len();
self.local
.add_child(child)
.map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
if self.local.child_ids().len() > before {
self.mark_sync_pending();
}
Ok(())
}
pub fn remove_child(&mut self, child: RegionId) {
let before = self.local.child_ids().len();
self.local.remove_child(child);
if self.local.child_ids().len() < before {
self.mark_sync_pending();
}
}
pub fn add_task(&mut self, task: TaskId) -> Result<(), Error> {
if !self.can_spawn() {
return Err(
Error::new(ErrorKind::RegionClosed).with_message("region not accepting new work")
);
}
let before = self.local.task_ids().len();
self.local
.add_task(task)
.map_err(|e| Error::new(ErrorKind::AdmissionDenied).with_message(format!("{e:?}")))?;
if self.local.task_ids().len() > before {
self.mark_sync_pending();
}
Ok(())
}
pub fn remove_task(&mut self, task: TaskId) {
let before = self.local.task_ids().len();
self.local.remove_task(task);
if self.local.task_ids().len() < before {
self.mark_sync_pending();
}
}
pub fn sync(&mut self, now: Time) -> Result<SyncResult, Error> {
if !self.mode.is_replicated() || !self.sync_state.sync_pending || self.distributed.is_none()
{
return Ok(SyncResult::NotNeeded);
}
let snapshot = self.create_snapshot(now);
let seq = snapshot.sequence;
let timestamp = snapshot.timestamp;
self.sync_state.last_synced_sequence = seq;
self.sync_state.last_sync_time = Some(timestamp);
self.sync_state.sync_pending = false;
self.sync_state.pending_ops = 0;
Ok(SyncResult::Synced { sequence: seq })
}
#[must_use]
pub fn create_snapshot(&mut self, now: Time) -> RegionSnapshot {
self.sequence = self
.sequence
.checked_add(1)
.expect("distributed bridge snapshot sequence counter exhausted");
let tasks: Vec<TaskSnapshot> = self
.local
.task_ids()
.into_iter()
.map(|id| TaskSnapshot {
task_id: id,
state: TaskState::Running,
priority: 0,
})
.collect();
RegionSnapshot {
region_id: self.local.id,
state: self.local.state(),
timestamp: now,
sequence: self.sequence,
tasks,
children: self.local.child_ids(),
finalizer_count: u32::try_from(self.local.finalizer_count()).unwrap_or(u32::MAX),
budget: self.local.budget().to_distributed(),
cancel_reason: self
.local
.cancel_reason()
.map(|r| r.kind.as_str().to_owned()),
parent: self.local.parent,
metadata: vec![],
}
}
pub fn apply_snapshot(&mut self, snapshot: &RegionSnapshot) -> Result<(), Error> {
if snapshot.region_id != self.local.id {
return Err(Error::new(ErrorKind::ObjectMismatch)
.with_message("snapshot region ID does not match bridge"));
}
let current_sequence = self.sequence.max(self.sync_state.last_synced_sequence);
if snapshot.sequence <= current_sequence {
return Ok(());
}
let budget = Budget {
deadline: snapshot.budget.deadline_nanos.map(Time::from_nanos),
poll_quota: snapshot.budget.polls_remaining.unwrap_or(0),
cost_quota: snapshot.budget.cost_remaining,
priority: 128, };
let cancel_reason = snapshot.cancel_reason.as_ref().map(|reason_str| {
let kind = match reason_str.as_str() {
"Timeout" => crate::types::cancel::CancelKind::Timeout,
"Deadline" => crate::types::cancel::CancelKind::Deadline,
"PollQuota" => crate::types::cancel::CancelKind::PollQuota,
"CostBudget" => crate::types::cancel::CancelKind::CostBudget,
"FailFast" => crate::types::cancel::CancelKind::FailFast,
"RaceLost" => crate::types::cancel::CancelKind::RaceLost,
"ParentCancelled" => crate::types::cancel::CancelKind::ParentCancelled,
"ResourceUnavailable" => crate::types::cancel::CancelKind::ResourceUnavailable,
"Shutdown" => crate::types::cancel::CancelKind::Shutdown,
"LinkedExit" => crate::types::cancel::CancelKind::LinkedExit,
_ => crate::types::cancel::CancelKind::User, };
crate::types::cancel::CancelReason::with_origin(
kind,
snapshot.region_id,
snapshot.timestamp,
)
});
let tasks: Vec<TaskId> = snapshot.tasks.iter().map(|t| t.task_id).collect();
self.local.apply_distributed_snapshot(
snapshot.state,
budget,
snapshot.children.clone(),
tasks,
cancel_reason,
);
self.sequence = self.sequence.max(snapshot.sequence);
self.sync_state.last_synced_sequence = snapshot.sequence;
self.sync_state.last_sync_time = Some(snapshot.timestamp);
self.sync_state.sync_pending = false;
self.sync_state.pending_ops = 0;
Ok(())
}
pub fn upgrade_to_distributed(
&mut self,
now: Time,
config: DistributedRegionConfig,
_replicas: &[ReplicaInfo],
) -> Result<UpgradeResult, Error> {
if !self.config.allow_upgrade {
return Err(Error::new(ErrorKind::InvalidStateTransition)
.with_message("mode upgrade not allowed"));
}
if self.mode.is_replicated() {
return Err(Error::new(ErrorKind::InvalidStateTransition)
.with_message("already in distributed mode"));
}
if self.local.state() != RegionState::Open {
return Err(Error::new(ErrorKind::InvalidStateTransition)
.with_message("can only upgrade open regions"));
}
config.validate()?;
let snapshot = self.create_snapshot(now);
let snapshot_sequence = snapshot.sequence;
let replication_factor = config.replication_factor;
let consistency = config.write_consistency;
let distributed = DistributedRegionRecord::new(
self.local.id,
config,
self.local.parent,
self.local.budget(),
);
let previous_mode = self.mode;
self.distributed = Some(distributed);
self.mode = RegionMode::Distributed {
replication_factor,
consistency,
};
Ok(UpgradeResult {
previous_mode,
new_mode: self.mode,
snapshot_sequence,
})
}
}
#[cfg(test)]
#[allow(clippy::similar_names)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn mode_local() {
let mode = RegionMode::local();
assert!(!mode.is_replicated());
assert!(!mode.is_distributed());
assert_eq!(mode.replication_factor(), 1);
}
#[test]
fn mode_distributed() {
let mode = RegionMode::distributed(3);
assert!(mode.is_replicated());
assert!(mode.is_distributed());
assert_eq!(mode.replication_factor(), 3);
}
#[test]
fn mode_hybrid() {
let mode = RegionMode::hybrid(2);
assert!(mode.is_replicated());
assert!(!mode.is_distributed());
assert_eq!(mode.replication_factor(), 2);
}
#[test]
#[should_panic(expected = "distributed region mode requires at least one replica")]
fn mode_distributed_rejects_zero_replication() {
let _ = RegionMode::distributed(0);
}
#[test]
#[should_panic(expected = "hybrid region mode requires at least one replica")]
fn mode_hybrid_rejects_zero_replication() {
let _ = RegionMode::hybrid(0);
}
#[test]
fn mode_default_is_local() {
assert_eq!(RegionMode::default(), RegionMode::Local);
}
#[test]
fn effective_state_local_open() {
let state = EffectiveState::compute(RegionState::Open, None);
assert_eq!(state, EffectiveState::Open);
assert!(state.can_spawn());
assert!(!state.needs_recovery());
}
#[test]
fn effective_state_local_closing() {
let state = EffectiveState::compute(RegionState::Closing, None);
assert_eq!(state, EffectiveState::Closing);
assert!(!state.can_spawn());
}
#[test]
fn effective_state_local_closed() {
let state = EffectiveState::compute(RegionState::Closed, None);
assert_eq!(state, EffectiveState::Closed);
}
#[test]
fn effective_state_distributed_active() {
let state =
EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Active));
assert_eq!(state, EffectiveState::Open);
assert!(state.can_spawn());
}
#[test]
fn effective_state_distributed_initializing() {
let state = EffectiveState::compute(
RegionState::Open,
Some(DistributedRegionState::Initializing),
);
assert_eq!(state, EffectiveState::Open);
}
#[test]
fn effective_state_degraded() {
let state =
EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Degraded));
assert_eq!(state, EffectiveState::Degraded);
assert!(!state.can_spawn());
assert!(state.needs_recovery());
}
#[test]
fn effective_state_recovering() {
let state =
EffectiveState::compute(RegionState::Open, Some(DistributedRegionState::Recovering));
assert_eq!(state, EffectiveState::Recovering);
assert!(state.needs_recovery());
}
#[test]
fn effective_state_inconsistent() {
let state =
EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Active));
assert!(state.is_inconsistent());
assert!(state.needs_recovery());
}
#[test]
fn effective_state_closing_distributed() {
let state =
EffectiveState::compute(RegionState::Closing, Some(DistributedRegionState::Closing));
assert_eq!(state, EffectiveState::Closing);
}
#[test]
fn effective_state_closed_distributed() {
let state =
EffectiveState::compute(RegionState::Closed, Some(DistributedRegionState::Closed));
assert_eq!(state, EffectiveState::Closed);
}
#[test]
fn local_state_to_distributed() {
assert_eq!(
RegionState::Open.to_distributed(),
DistributedRegionState::Active
);
assert_eq!(
RegionState::Closing.to_distributed(),
DistributedRegionState::Closing
);
assert_eq!(
RegionState::Draining.to_distributed(),
DistributedRegionState::Closing
);
assert_eq!(
RegionState::Finalizing.to_distributed(),
DistributedRegionState::Closing
);
assert_eq!(
RegionState::Closed.to_distributed(),
DistributedRegionState::Closed
);
}
#[test]
fn distributed_state_to_local() {
assert_eq!(DistributedRegionState::Active.to_local(), RegionState::Open);
assert_eq!(
DistributedRegionState::Initializing.to_local(),
RegionState::Open
);
assert_eq!(
DistributedRegionState::Degraded.to_local(),
RegionState::Open
);
assert_eq!(
DistributedRegionState::Recovering.to_local(),
RegionState::Open
);
assert_eq!(
DistributedRegionState::Closing.to_local(),
RegionState::Closing
);
assert_eq!(
DistributedRegionState::Closed.to_local(),
RegionState::Closed
);
}
#[test]
fn is_lossless_conversion() {
assert!(DistributedRegionState::Active.is_lossless());
assert!(DistributedRegionState::Closing.is_lossless());
assert!(DistributedRegionState::Closed.is_lossless());
assert!(!DistributedRegionState::Degraded.is_lossless());
assert!(!DistributedRegionState::Recovering.is_lossless());
assert!(!DistributedRegionState::Initializing.is_lossless());
}
#[test]
fn budget_to_distributed() {
let budget = Budget::new().with_poll_quota(100).with_cost_quota(500);
let snapshot = budget.to_distributed();
assert_eq!(snapshot.polls_remaining, Some(100));
assert_eq!(snapshot.cost_remaining, Some(500));
}
#[test]
fn bridge_new_local() {
let bridge = RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default());
assert_eq!(bridge.mode(), RegionMode::Local);
assert!(bridge.distributed().is_none());
assert!(bridge.can_spawn());
assert_eq!(bridge.local_state(), RegionState::Open);
}
#[test]
fn bridge_new_distributed() {
let bridge = RegionBridge::new_distributed(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
DistributedRegionConfig::default(),
);
assert!(bridge.mode().is_distributed());
assert!(bridge.distributed().is_some());
}
#[test]
fn bridge_with_mode_local() {
let bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::Local,
);
assert_eq!(bridge.mode(), RegionMode::Local);
}
#[test]
fn bridge_with_mode_distributed() {
let bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::distributed(3),
);
assert!(bridge.mode().is_distributed());
assert!(bridge.distributed().is_some());
}
#[test]
fn bridge_with_mode_distributed_single_replica_derives_valid_quorum() {
let bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::distributed(1),
);
let distributed = bridge.distributed().expect("distributed mode");
assert_eq!(distributed.config.replication_factor, 1);
assert_eq!(distributed.config.min_quorum, 1);
assert_eq!(
distributed.config.write_consistency,
ConsistencyLevel::Quorum
);
}
#[test]
#[should_panic(expected = "distributed region mode requires at least one replica")]
fn bridge_with_mode_distributed_rejects_zero_replication_literal() {
let _ = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::Distributed {
replication_factor: 0,
consistency: ConsistencyLevel::Quorum,
},
);
}
#[test]
fn bridge_begin_close_local() {
let mut bridge = create_local_bridge();
let result = bridge.begin_close(None, Time::from_secs(0)).unwrap();
assert!(result.local_changed);
assert!(result.distributed_transition.is_none());
assert_eq!(result.effective_state, EffectiveState::Closing);
}
#[test]
fn bridge_begin_close_distributed() {
let mut bridge = create_distributed_bridge();
if let Some(ref mut dist) = bridge.distributed {
let _ = dist.activate(Time::from_secs(0));
}
let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
assert!(result.local_changed);
assert!(result.distributed_transition.is_some());
assert_eq!(result.effective_state, EffectiveState::Closing);
}
#[test]
fn bridge_full_lifecycle() {
let mut bridge = create_local_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
assert!(!bridge.can_spawn());
bridge.begin_drain().unwrap();
bridge.begin_finalize().unwrap();
bridge.complete_close(Time::from_secs(1)).unwrap();
assert_eq!(bridge.effective_state(), EffectiveState::Closed);
}
#[test]
fn bridge_cannot_spawn_when_closed() {
let mut bridge = create_local_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
let result = bridge.add_task(TaskId::new_for_test(1, 0));
assert!(result.is_err());
}
#[test]
fn bridge_add_remove_task() {
let mut bridge = create_local_bridge();
let task_id = TaskId::new_for_test(1, 0);
bridge.add_task(task_id).unwrap();
assert!(bridge.has_live_work());
assert!(bridge.sync_state.sync_pending);
bridge.remove_task(task_id);
assert!(!bridge.has_live_work());
}
#[test]
fn bridge_add_remove_child() {
let mut bridge = create_local_bridge();
let child_id = RegionId::new_for_test(2, 0);
bridge.add_child(child_id).unwrap();
assert!(bridge.has_live_work());
bridge.remove_child(child_id);
assert!(!bridge.has_live_work());
}
#[test]
fn sync_not_needed_local() {
let mut bridge = create_local_bridge();
let result = bridge.sync(Time::from_secs(1)).unwrap();
assert!(matches!(result, SyncResult::NotNeeded));
}
#[test]
fn sync_after_changes() {
let mut bridge = create_distributed_bridge();
bridge.sync_state.sync_pending = true;
let sync_time = Time::from_secs(10);
let result = bridge.sync(sync_time).unwrap();
assert!(matches!(result, SyncResult::Synced { .. }));
assert!(!bridge.sync_state.sync_pending);
assert_eq!(bridge.sync_state.last_sync_time, Some(sync_time));
}
#[test]
fn create_snapshot_increments_sequence() {
let mut bridge = create_local_bridge();
let snap1 = bridge.create_snapshot(Time::from_secs(10));
let snap2 = bridge.create_snapshot(Time::from_secs(11));
assert_eq!(snap1.sequence, 1);
assert_eq!(snap2.sequence, 2);
assert_eq!(snap1.region_id, bridge.id());
assert_eq!(snap1.timestamp, Time::from_secs(10));
assert_eq!(snap2.timestamp, Time::from_secs(11));
}
#[test]
fn snapshot_includes_tasks() {
let mut bridge = create_local_bridge();
bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
let snap = bridge.create_snapshot(Time::from_secs(20));
assert_eq!(snap.tasks.len(), 2);
}
#[test]
fn apply_snapshot_updates_sync_state() {
let mut bridge = create_local_bridge();
bridge.sync_state.sync_pending = true;
bridge.sync_state.pending_ops = 7;
let snap = RegionSnapshot {
region_id: bridge.id(),
state: RegionState::Open,
timestamp: Time::from_secs(100),
sequence: 42,
tasks: vec![],
children: vec![],
finalizer_count: 0,
budget: BudgetSnapshot {
deadline_nanos: None,
polls_remaining: None,
cost_remaining: None,
},
cancel_reason: None,
parent: None,
metadata: vec![],
};
bridge.apply_snapshot(&snap).unwrap();
assert_eq!(bridge.sync_state.last_synced_sequence, 42);
assert_eq!(bridge.sync_state.last_sync_time, Some(Time::from_secs(100)));
assert!(!bridge.sync_state.sync_pending);
assert_eq!(bridge.sync_state.pending_ops, 0);
}
#[test]
fn apply_snapshot_advances_local_sequence_counter() {
let mut bridge = create_local_bridge();
let snap = RegionSnapshot {
region_id: bridge.id(),
state: RegionState::Open,
timestamp: Time::from_secs(100),
sequence: 42,
tasks: vec![],
children: vec![],
finalizer_count: 0,
budget: BudgetSnapshot {
deadline_nanos: None,
polls_remaining: None,
cost_remaining: None,
},
cancel_reason: None,
parent: None,
metadata: vec![],
};
bridge.apply_snapshot(&snap).unwrap();
let next = bridge.create_snapshot(Time::from_secs(101));
assert_eq!(next.sequence, 43);
}
#[test]
fn apply_snapshot_mismatch() {
let mut bridge = create_local_bridge();
let snap = RegionSnapshot {
region_id: RegionId::new_for_test(999, 0),
state: RegionState::Open,
timestamp: Time::ZERO,
sequence: 1,
tasks: vec![],
children: vec![],
finalizer_count: 0,
budget: BudgetSnapshot {
deadline_nanos: None,
polls_remaining: None,
cost_remaining: None,
},
cancel_reason: None,
parent: None,
metadata: vec![],
};
let result = bridge.apply_snapshot(&snap);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), ErrorKind::ObjectMismatch);
}
#[test]
fn upgrade_local_to_distributed() {
let mut bridge = create_local_bridge();
let config = DistributedRegionConfig {
replication_factor: 3,
..Default::default()
};
let replicas = create_test_replicas(3);
let result = bridge
.upgrade_to_distributed(Time::from_secs(30), config, &replicas)
.unwrap();
assert_eq!(result.previous_mode, RegionMode::Local);
assert!(result.new_mode.is_distributed());
assert!(bridge.distributed().is_some());
}
#[test]
fn upgrade_not_allowed() {
let mut bridge = create_local_bridge();
bridge.config.allow_upgrade = false;
let result = bridge.upgrade_to_distributed(
Time::from_secs(31),
DistributedRegionConfig::default(),
&create_test_replicas(3),
);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().kind(),
ErrorKind::InvalidStateTransition
);
}
#[test]
fn upgrade_already_distributed() {
let mut bridge = create_distributed_bridge();
let result = bridge.upgrade_to_distributed(
Time::from_secs(32),
DistributedRegionConfig::default(),
&create_test_replicas(3),
);
assert!(result.is_err());
}
#[test]
fn upgrade_only_from_open() {
let mut bridge = create_local_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
let result = bridge.upgrade_to_distributed(
Time::from_secs(33),
DistributedRegionConfig::default(),
&create_test_replicas(3),
);
assert!(result.is_err());
}
fn create_local_bridge() -> RegionBridge {
RegionBridge::new_local(RegionId::new_for_test(1, 0), None, Budget::default())
}
fn create_distributed_bridge() -> RegionBridge {
RegionBridge::new_distributed(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
DistributedRegionConfig::default(),
)
}
fn create_test_replicas(count: usize) -> Vec<ReplicaInfo> {
(0..count)
.map(|i| ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}")))
.collect()
}
fn scrub_region_snapshot_for_snapshot_test(snapshot: &RegionSnapshot) -> serde_json::Value {
json!({
"region_id": "[region_id]",
"state": format!("{:?}", snapshot.state),
"timestamp_nanos": "[timestamp_nanos]",
"sequence": snapshot.sequence,
"tasks": snapshot.tasks.iter().map(|task| {
json!({
"task_id": "[task_id]",
"state": format!("{:?}", task.state),
"priority": task.priority,
})
}).collect::<Vec<_>>(),
"children": snapshot.children.iter().map(|_| "[child_region_id]").collect::<Vec<_>>(),
"finalizer_count": snapshot.finalizer_count,
"budget": {
"deadline_nanos": snapshot
.budget
.deadline_nanos
.map(|_| "[deadline_nanos]"),
"polls_remaining": snapshot.budget.polls_remaining,
"cost_remaining": snapshot.budget.cost_remaining,
},
"cancel_reason": snapshot.cancel_reason,
"parent": snapshot.parent.map(|_| "[parent_region_id]"),
"metadata": snapshot.metadata,
})
}
fn scrub_bridge_sequence_advancement_step(
applied_sequence: u64,
bridge: &RegionBridge,
) -> serde_json::Value {
json!({
"applied_sequence": applied_sequence,
"bridge_sequence": bridge.sequence,
"last_synced_sequence": bridge.sync_state.last_synced_sequence,
"last_sync_time_nanos": bridge.sync_state.last_sync_time.map(|_| "[timestamp_nanos]"),
"sync_pending": bridge.sync_state.sync_pending,
"pending_ops": bridge.sync_state.pending_ops,
"local_state": format!("{:?}", bridge.local.state()),
"task_count": bridge.local.task_ids().len(),
"child_count": bridge.local.child_ids().len(),
"cancel_reason": bridge
.local
.cancel_reason()
.map(|reason| reason.kind.as_str().to_owned()),
})
}
fn run_bridge_sequence_advancement_scenario(
snapshots: &[&RegionSnapshot],
) -> Vec<serde_json::Value> {
let mut bridge = create_local_bridge();
let mut steps = Vec::with_capacity(snapshots.len());
for &snapshot in snapshots {
bridge.apply_snapshot(snapshot).unwrap();
steps.push(scrub_bridge_sequence_advancement_step(
snapshot.sequence,
&bridge,
));
}
steps
}
fn strip_applied_sequence(step: &serde_json::Value) -> serde_json::Value {
let mut object = step
.as_object()
.expect("bridge sequence snapshot step should be an object")
.clone();
object.remove("applied_sequence");
serde_json::Value::Object(object)
}
#[test]
fn upgrade_while_tasks_running() {
let mut bridge = create_local_bridge();
bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
bridge.add_task(TaskId::new_for_test(2, 0)).unwrap();
assert!(bridge.has_live_work());
let config = DistributedRegionConfig {
replication_factor: 3,
..Default::default()
};
let result = bridge
.upgrade_to_distributed(Time::from_secs(34), config, &create_test_replicas(3))
.unwrap();
assert!(result.new_mode.is_distributed());
assert!(bridge.has_live_work());
assert!(result.snapshot_sequence > 0);
}
#[test]
fn snapshot_monotonic_under_rapid_changes() {
let mut bridge = create_local_bridge();
let mut prev_seq = 0;
for i in 0u32..20 {
let tid = TaskId::new_for_test(i, 0);
bridge.add_task(tid).unwrap();
let snap = bridge.create_snapshot(Time::from_secs(u64::from(i) + 1));
assert!(
snap.sequence > prev_seq,
"sequence must be monotonically increasing"
);
prev_seq = snap.sequence;
bridge.remove_task(tid);
}
}
#[test]
fn double_close_local() {
let mut bridge = create_local_bridge();
let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
assert!(result1.local_changed);
let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
assert!(!result2.local_changed);
assert_eq!(result2.effective_state, EffectiveState::Closing);
}
#[test]
fn double_close_distributed() {
let mut bridge = create_distributed_bridge();
let result1 = bridge.begin_close(None, Time::from_secs(0)).unwrap();
assert!(result1.local_changed);
assert!(result1.distributed_transition.is_some());
assert_eq!(result1.effective_state, EffectiveState::Closing);
let result2 = bridge.begin_close(None, Time::from_secs(1)).unwrap();
assert!(!result2.local_changed);
assert!(result2.distributed_transition.is_none());
assert_eq!(result2.effective_state, EffectiveState::Closing);
}
#[test]
fn double_complete_close_local() {
let mut bridge = create_local_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
bridge.begin_drain().unwrap();
bridge.begin_finalize().unwrap();
let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
assert!(result1.local_changed);
assert_eq!(result1.effective_state, EffectiveState::Closed);
let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
assert!(!result2.local_changed);
}
#[test]
fn double_complete_close_distributed() {
let mut bridge = create_distributed_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
bridge.begin_drain().unwrap();
bridge.begin_finalize().unwrap();
let result1 = bridge.complete_close(Time::from_secs(1)).unwrap();
assert!(result1.local_changed);
assert!(result1.distributed_transition.is_some());
assert_eq!(result1.effective_state, EffectiveState::Closed);
let result2 = bridge.complete_close(Time::from_secs(2)).unwrap();
assert!(!result2.local_changed);
assert!(result2.distributed_transition.is_none());
assert_eq!(result2.effective_state, EffectiveState::Closed);
}
#[test]
fn close_with_cancel_reason() {
let mut bridge = create_local_bridge();
let reason = CancelReason::timeout();
let result = bridge
.begin_close(Some(reason), Time::from_secs(0))
.unwrap();
assert!(result.local_changed);
assert_eq!(result.effective_state, EffectiveState::Closing);
}
#[test]
fn add_child_after_close_rejected() {
let mut bridge = create_local_bridge();
bridge.begin_close(None, Time::from_secs(0)).unwrap();
let result = bridge.add_child(RegionId::new_for_test(2, 0));
assert!(result.is_err());
}
#[test]
fn sync_not_needed_when_no_changes() {
let mut bridge = create_distributed_bridge();
assert!(!bridge.sync_state.sync_pending);
let result = bridge.sync(Time::from_secs(40)).unwrap();
assert!(matches!(result, SyncResult::NotNeeded));
}
#[test]
fn sync_clears_pending_ops() {
let mut bridge = create_distributed_bridge();
bridge.sync_state.sync_pending = true;
bridge.sync_state.pending_ops = 5;
let sync_time = Time::from_secs(41);
let result = bridge.sync(sync_time).unwrap();
assert!(matches!(result, SyncResult::Synced { .. }));
assert_eq!(bridge.sync_state.pending_ops, 0);
assert!(!bridge.sync_state.sync_pending);
assert_eq!(bridge.sync_state.last_sync_time, Some(sync_time));
}
#[test]
fn pending_ops_counts_only_real_mutations() {
let mut bridge = create_distributed_bridge();
bridge.add_task(TaskId::new_for_test(1, 0)).unwrap();
bridge.add_task(TaskId::new_for_test(1, 0)).unwrap(); bridge.remove_task(TaskId::new_for_test(999, 0)); bridge.remove_task(TaskId::new_for_test(1, 0));
bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
bridge.add_child(RegionId::new_for_test(2, 0)).unwrap(); bridge.remove_child(RegionId::new_for_test(777, 0)); bridge.remove_child(RegionId::new_for_test(2, 0));
assert!(bridge.sync_state.sync_pending);
assert_eq!(bridge.sync_state.pending_ops, 4);
}
#[test]
fn close_transitions_mark_sync_pending() {
let mut bridge = create_distributed_bridge();
if let Some(ref mut dist) = bridge.distributed {
let _ = dist.activate(Time::from_secs(0));
}
assert!(!bridge.sync_state.sync_pending);
assert_eq!(bridge.sync_state.pending_ops, 0);
bridge.begin_close(None, Time::from_secs(1)).unwrap();
assert!(bridge.sync_state.sync_pending);
assert!(bridge.sync_state.pending_ops >= 1);
}
#[test]
fn upgrade_snapshot_sequence_matches() {
let mut bridge = create_local_bridge();
let _ = bridge.create_snapshot(Time::from_secs(50));
let _ = bridge.create_snapshot(Time::from_secs(51));
assert_eq!(bridge.sequence, 2);
let config = DistributedRegionConfig {
replication_factor: 3,
..Default::default()
};
let result = bridge
.upgrade_to_distributed(Time::from_secs(52), config, &create_test_replicas(3))
.unwrap();
assert_eq!(result.snapshot_sequence, 3);
}
#[test]
fn bridge_with_mode_hybrid() {
let bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::hybrid(2),
);
assert!(bridge.mode().is_replicated());
assert!(!bridge.mode().is_distributed());
assert!(bridge.distributed().is_none());
}
#[test]
#[should_panic(expected = "hybrid region mode requires at least one replica")]
fn bridge_with_mode_hybrid_rejects_zero_replication_literal() {
let _ = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::Hybrid {
replication_factor: 0,
max_lag: Duration::from_secs(1),
},
);
}
#[test]
fn upgrade_to_distributed_rejects_zero_replication_without_panicking() {
let mut bridge = create_local_bridge();
let config = DistributedRegionConfig {
min_quorum: 1,
replication_factor: 0,
..Default::default()
};
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
bridge.upgrade_to_distributed(Time::from_secs(53), config, &[])
}))
.expect("invalid config should return Err rather than panic");
let err = result.expect_err("zero-replica config must be rejected");
assert_eq!(err.kind(), ErrorKind::ConfigError);
assert!(err.to_string().contains("replication_factor >= 1"));
assert_eq!(bridge.mode(), RegionMode::Local);
assert!(bridge.distributed().is_none());
}
#[test]
fn upgrade_to_distributed_rejects_invalid_quorum_without_panicking() {
let mut bridge = create_local_bridge();
let config = DistributedRegionConfig {
min_quorum: 3,
replication_factor: 2,
..Default::default()
};
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
bridge.upgrade_to_distributed(Time::from_secs(54), config, &[])
}))
.expect("invalid config should return Err rather than panic");
let err = result.expect_err("out-of-range quorum must be rejected");
assert_eq!(err.kind(), ErrorKind::ConfigError);
assert!(
err.to_string()
.contains("min_quorum in 1..=replication_factor")
);
assert_eq!(bridge.mode(), RegionMode::Local);
assert!(bridge.distributed().is_none());
}
#[test]
fn effective_state_draining_with_distributed_closing() {
let state =
EffectiveState::compute(RegionState::Draining, Some(DistributedRegionState::Closing));
assert_eq!(state, EffectiveState::Closing);
}
#[test]
fn effective_state_finalizing_with_distributed_closing() {
let state = EffectiveState::compute(
RegionState::Finalizing,
Some(DistributedRegionState::Closing),
);
assert_eq!(state, EffectiveState::Closing);
}
#[test]
fn bridge_config_defaults() {
let config = BridgeConfig::default();
assert!(config.allow_upgrade);
assert_eq!(config.sync_timeout, Duration::from_secs(5));
assert_eq!(config.sync_mode, SyncMode::Synchronous);
assert_eq!(
config.conflict_resolution,
ConflictResolution::DistributedWins
);
}
#[test]
fn sync_state_default() {
let state = SyncState::default();
assert_eq!(state.last_synced_sequence, 0);
assert!(!state.sync_pending);
assert_eq!(state.pending_ops, 0);
assert!(state.last_sync_time.is_none());
assert!(state.last_sync_error.is_none());
}
#[test]
fn snapshot_includes_children() {
let mut bridge = create_local_bridge();
bridge.add_child(RegionId::new_for_test(2, 0)).unwrap();
bridge.add_child(RegionId::new_for_test(3, 0)).unwrap();
let snap = bridge.create_snapshot(Time::from_secs(60));
assert_eq!(snap.children.len(), 2);
}
#[test]
fn region_snapshot_json_snapshot_scrubs_ids_and_wall_clock() {
let budget = Budget::new()
.with_deadline(Time::from_secs(90))
.with_poll_quota(12)
.with_cost_quota(34);
let mut bridge = RegionBridge::new_local(
RegionId::new_for_test(7, 0),
Some(RegionId::new_for_test(4, 1)),
budget,
);
bridge.add_task(TaskId::new_for_test(3, 0)).unwrap();
bridge.add_task(TaskId::new_for_test(5, 2)).unwrap();
bridge.add_child(RegionId::new_for_test(8, 0)).unwrap();
bridge.add_child(RegionId::new_for_test(9, 1)).unwrap();
bridge
.begin_close(Some(CancelReason::timeout()), Time::from_secs(55))
.unwrap();
let snapshot = bridge.create_snapshot(Time::from_secs(56));
insta::assert_json_snapshot!(
"region_snapshot_scrubbed",
scrub_region_snapshot_for_snapshot_test(&snapshot)
);
}
#[test]
fn bridge_sequence_advancement_scrubbed() {
let mut source = create_local_bridge();
source.add_task(TaskId::new_for_test(11, 0)).unwrap();
let snap1 = source.create_snapshot(Time::from_secs(10));
source.add_child(RegionId::new_for_test(2, 0)).unwrap();
source.add_task(TaskId::new_for_test(12, 0)).unwrap();
let snap2 = source.create_snapshot(Time::from_secs(11));
source
.begin_close(Some(CancelReason::timeout()), Time::from_secs(12))
.unwrap();
source.remove_task(TaskId::new_for_test(11, 0));
let snap3 = source.create_snapshot(Time::from_secs(13));
let normal = run_bridge_sequence_advancement_scenario(&[&snap1, &snap2, &snap3]);
let reordered = run_bridge_sequence_advancement_scenario(&[&snap2, &snap1, &snap3]);
let duplicate =
run_bridge_sequence_advancement_scenario(&[&snap1, &snap1, &snap2, &snap2, &snap3]);
assert_eq!(
normal.last(),
reordered.last(),
"reordered delivery should converge to the same final state"
);
assert_eq!(
normal.last(),
duplicate.last(),
"duplicate delivery should converge to the same final state"
);
assert_eq!(
strip_applied_sequence(&reordered[0]),
strip_applied_sequence(&reordered[1]),
"older snapshots must be ignored after a newer sequence lands"
);
assert_eq!(
strip_applied_sequence(&duplicate[0]),
strip_applied_sequence(&duplicate[1]),
"duplicate sequence 1 delivery must be idempotent"
);
assert_eq!(
strip_applied_sequence(&duplicate[2]),
strip_applied_sequence(&duplicate[3]),
"duplicate sequence 2 delivery must be idempotent"
);
insta::assert_json_snapshot!(
"bridge_sequence_advancement_scrubbed",
json!({
"normal": normal,
"reordered": reordered,
"duplicate": duplicate,
})
);
}
#[test]
fn region_mode_debug_clone_copy_default_eq() {
let m = RegionMode::default();
assert_eq!(m, RegionMode::Local);
let dbg = format!("{m:?}");
assert!(dbg.contains("Local"), "{dbg}");
let dist = RegionMode::distributed(3);
let copied: RegionMode = dist;
let cloned = dist;
assert_eq!(copied, cloned);
assert_ne!(dist, RegionMode::Local);
}
#[test]
fn sync_mode_debug_clone_copy_eq() {
let s = SyncMode::Synchronous;
let dbg = format!("{s:?}");
assert!(dbg.contains("Synchronous"), "{dbg}");
let copied: SyncMode = s;
let cloned = s;
assert_eq!(copied, cloned);
assert_ne!(s, SyncMode::Asynchronous);
}
#[test]
fn conflict_resolution_debug_clone_copy_eq() {
let c = ConflictResolution::DistributedWins;
let dbg = format!("{c:?}");
assert!(dbg.contains("DistributedWins"), "{dbg}");
let copied: ConflictResolution = c;
let cloned = c;
assert_eq!(copied, cloned);
}
#[test]
fn bridge_config_debug_clone_default() {
let c = BridgeConfig::default();
let dbg = format!("{c:?}");
assert!(dbg.contains("BridgeConfig"), "{dbg}");
assert!(c.allow_upgrade);
let cloned = c;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn effective_state_debug_clone_copy_eq() {
let e = EffectiveState::Open;
let dbg = format!("{e:?}");
assert!(dbg.contains("Open"), "{dbg}");
let copied: EffectiveState = e;
let cloned = e;
assert_eq!(copied, cloned);
assert_ne!(e, EffectiveState::Closed);
}
#[test]
fn sync_state_debug_clone_default() {
let s = SyncState::default();
let dbg = format!("{s:?}");
assert!(dbg.contains("SyncState"), "{dbg}");
assert_eq!(s.pending_ops, 0);
let cloned = s;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn distributed_close_full_lifecycle() {
let mut bridge = create_distributed_bridge();
if let Some(ref mut dist) = bridge.distributed {
let _ = dist.activate(Time::from_secs(0));
}
let result = bridge.begin_close(None, Time::from_secs(1)).unwrap();
assert!(result.local_changed);
assert!(result.distributed_transition.is_some());
bridge.begin_drain().unwrap();
bridge.begin_finalize().unwrap();
let result = bridge.complete_close(Time::from_secs(2)).unwrap();
assert_eq!(result.effective_state, EffectiveState::Closed);
}
#[test]
fn effective_state_inconsistent_pairs_are_exhaustive() {
let inconsistent_pairs: &[(RegionState, DistributedRegionState)] = &[
(RegionState::Closed, DistributedRegionState::Active),
(RegionState::Closed, DistributedRegionState::Initializing),
(RegionState::Closed, DistributedRegionState::Degraded),
(RegionState::Closed, DistributedRegionState::Recovering),
(RegionState::Closed, DistributedRegionState::Closing),
(RegionState::Closing, DistributedRegionState::Active),
(RegionState::Closing, DistributedRegionState::Initializing),
(RegionState::Closing, DistributedRegionState::Degraded),
(RegionState::Closing, DistributedRegionState::Recovering),
(RegionState::Closing, DistributedRegionState::Closed),
(RegionState::Draining, DistributedRegionState::Active),
(RegionState::Draining, DistributedRegionState::Initializing),
(RegionState::Draining, DistributedRegionState::Degraded),
(RegionState::Draining, DistributedRegionState::Recovering),
(RegionState::Draining, DistributedRegionState::Closed),
(RegionState::Finalizing, DistributedRegionState::Active),
(
RegionState::Finalizing,
DistributedRegionState::Initializing,
),
(RegionState::Finalizing, DistributedRegionState::Degraded),
(RegionState::Finalizing, DistributedRegionState::Recovering),
(RegionState::Finalizing, DistributedRegionState::Closed),
(RegionState::Open, DistributedRegionState::Closing),
(RegionState::Open, DistributedRegionState::Closed),
];
for (local, distributed) in inconsistent_pairs {
let state = EffectiveState::compute(*local, Some(*distributed));
assert!(
state.is_inconsistent(),
"({local:?}, {distributed:?}) should be Inconsistent, got {state:?}"
);
if let EffectiveState::Inconsistent {
local: l,
distributed: d,
} = state
{
assert_eq!(l, *local, "local state not preserved");
assert_eq!(d, *distributed, "distributed state not preserved");
}
}
}
#[test]
fn hybrid_mode_sync_not_needed_without_distributed_record() {
let mut bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::hybrid(3),
);
assert!(bridge.mode().is_replicated());
let sync = bridge.sync(Time::from_secs(70)).unwrap();
assert!(
matches!(sync, SyncResult::NotNeeded),
"hybrid mode without distributed record must report NotNeeded"
);
}
#[test]
fn hybrid_mode_sync_not_needed_with_pending_ops() {
let mut bridge = RegionBridge::with_mode(
RegionId::new_for_test(1, 0),
None,
Budget::default(),
RegionMode::hybrid(3),
);
bridge.sync_state.sync_pending = true;
bridge.sync_state.pending_ops = 3;
let sync = bridge.sync(Time::from_secs(71)).unwrap();
assert!(
matches!(sync, SyncResult::NotNeeded),
"hybrid mode without distributed record must report NotNeeded even with pending ops"
);
}
}