#![allow(dead_code)]
#[cfg(any(test, feature = "test-internals"))]
use std::collections::{BTreeMap, HashMap, HashSet};
#[cfg(any(test, feature = "test-internals"))]
use std::sync::Mutex;
#[cfg(any(test, feature = "test-internals"))]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeId(String);
#[cfg(any(test, feature = "test-internals"))]
impl NodeId {
fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HashKey(String);
#[cfg(any(test, feature = "test-internals"))]
impl HashKey {
fn new(key: impl Into<String>) -> Self {
Self(key.into())
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone)]
pub struct MockConsistentHashRing {
ring: BTreeMap<u64, NodeId>,
virtual_nodes: u32,
nodes: HashSet<NodeId>,
key_mappings: HashMap<HashKey, NodeId>,
}
#[cfg(any(test, feature = "test-internals"))]
impl MockConsistentHashRing {
fn new(virtual_nodes: u32) -> Self {
Self {
ring: BTreeMap::new(),
virtual_nodes,
nodes: HashSet::new(),
key_mappings: HashMap::new(),
}
}
fn add_node(&mut self, node_id: NodeId) {
if self.nodes.insert(node_id.clone()) {
for i in 0..self.virtual_nodes {
let hash = self.hash_virtual_node(&node_id, i);
self.ring.insert(hash, node_id.clone());
}
self.key_mappings.clear();
}
}
fn remove_node(&mut self, node_id: &NodeId) -> bool {
if self.nodes.remove(node_id) {
self.ring.retain(|_, n| n != node_id);
self.key_mappings.clear();
true
} else {
false
}
}
fn get_node(&mut self, key: &HashKey) -> Option<NodeId> {
if let Some(cached) = self.key_mappings.get(key) {
return Some(cached.clone());
}
if self.ring.is_empty() {
return None;
}
let hash = self.hash_key(key);
let node = self
.ring
.range(hash..)
.next()
.or_else(|| self.ring.iter().next())
.map(|(_, node)| node.clone())?;
self.key_mappings.insert(key.clone(), node.clone());
Some(node)
}
fn get_all_mappings(&mut self, keys: &[HashKey]) -> HashMap<HashKey, NodeId> {
let mut mappings = HashMap::new();
for key in keys {
if let Some(node) = self.get_node(key) {
mappings.insert(key.clone(), node);
}
}
mappings
}
fn node_count(&self) -> usize {
self.nodes.len()
}
fn ring_size(&self) -> usize {
self.ring.len()
}
fn hash_key(&self, key: &HashKey) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
key.0.hash(&mut hasher);
hasher.finish()
}
fn hash_virtual_node(&self, node_id: &NodeId, virtual_index: u32) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
node_id.0.hash(&mut hasher);
virtual_index.hash(&mut hasher);
hasher.finish()
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SnapshotData {
version: u64,
timestamp: u64,
data: HashMap<String, String>,
checksum: u64,
}
#[cfg(any(test, feature = "test-internals"))]
impl SnapshotData {
fn new(version: u64, data: HashMap<String, String>) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let checksum = Self::calculate_checksum(&data);
Self {
version,
timestamp,
data,
checksum,
}
}
fn calculate_checksum(data: &HashMap<String, String>) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
let mut sorted_data: Vec<_> = data.iter().collect();
sorted_data.sort_by_key(|(k, _)| *k);
for (key, value) in sorted_data {
key.hash(&mut hasher);
value.hash(&mut hasher);
}
hasher.finish()
}
fn verify_checksum(&self) -> bool {
let calculated = Self::calculate_checksum(&self.data);
calculated == self.checksum
}
fn compress(&self) -> CompressedSnapshot {
let serialized_size = self
.data
.iter()
.map(|(k, v)| k.len() + v.len())
.sum::<usize>();
CompressedSnapshot {
original_checksum: self.checksum,
compressed_data: format!("compressed[{}bytes]", serialized_size),
compression_ratio: 0.7, }
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone)]
pub struct CompressedSnapshot {
original_checksum: u64,
compressed_data: String,
compression_ratio: f64,
}
#[cfg(any(test, feature = "test-internals"))]
impl CompressedSnapshot {
fn decompress(&self, original: &SnapshotData) -> Option<SnapshotData> {
if self.original_checksum == original.checksum {
Some(original.clone())
} else {
None
}
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug)]
pub struct MockStateMachine {
state: HashMap<String, String>,
version: AtomicU64,
operation_log: Mutex<Vec<StateOperation>>,
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone)]
pub enum StateOperation {
Put(String, String),
Delete(String),
Snapshot(u64),
Restore(u64),
}
#[cfg(any(test, feature = "test-internals"))]
impl MockStateMachine {
fn new() -> Self {
Self {
state: HashMap::new(),
version: AtomicU64::new(0),
operation_log: Mutex::new(Vec::new()),
}
}
fn put(&mut self, key: String, value: String) {
self.state.insert(key.clone(), value.clone());
self.version.fetch_add(1, Ordering::SeqCst);
let mut log = self.operation_log.lock().unwrap();
log.push(StateOperation::Put(key, value));
}
fn delete(&mut self, key: &str) -> Option<String> {
let result = self.state.remove(key);
self.version.fetch_add(1, Ordering::SeqCst);
let mut log = self.operation_log.lock().unwrap();
log.push(StateOperation::Delete(key.to_string()));
result
}
fn get(&self, key: &str) -> Option<&String> {
self.state.get(key)
}
fn snapshot(&self) -> SnapshotData {
let version = self.version.load(Ordering::SeqCst);
let snapshot = SnapshotData::new(version, self.state.clone());
let mut log = self.operation_log.lock().unwrap();
log.push(StateOperation::Snapshot(version));
snapshot
}
fn restore(&mut self, snapshot: &SnapshotData) -> Result<(), String> {
if !snapshot.verify_checksum() {
return Err("Snapshot checksum verification failed".to_string());
}
self.state = snapshot.data.clone();
self.version.store(snapshot.version, Ordering::SeqCst);
let mut log = self.operation_log.lock().unwrap();
log.push(StateOperation::Restore(snapshot.version));
Ok(())
}
fn get_version(&self) -> u64 {
self.version.load(Ordering::SeqCst)
}
fn get_state_size(&self) -> usize {
self.state.len()
}
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BridgeSequence {
sequence_number: u64,
bridge_id: String,
operation_type: BridgeOperationType,
timestamp: u64,
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BridgeOperationType {
Sync,
Heartbeat,
DataTransfer,
StateUpdate,
}
#[cfg(any(test, feature = "test-internals"))]
#[derive(Debug)]
pub struct MockBridge {
bridge_id: String,
next_sequence: AtomicU64,
received_sequences: Mutex<Vec<u64>>,
last_valid_sequence: AtomicU64,
out_of_order_count: AtomicU64,
gap_count: AtomicU64,
}
#[cfg(any(test, feature = "test-internals"))]
impl MockBridge {
fn new(bridge_id: String) -> Self {
Self {
bridge_id,
next_sequence: AtomicU64::new(1),
received_sequences: Mutex::new(Vec::new()),
last_valid_sequence: AtomicU64::new(0),
out_of_order_count: AtomicU64::new(0),
gap_count: AtomicU64::new(0),
}
}
fn generate_sequence(&self, operation_type: BridgeOperationType) -> BridgeSequence {
let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
BridgeSequence {
sequence_number,
bridge_id: self.bridge_id.clone(),
operation_type,
timestamp,
}
}
fn process_sequence(&self, sequence: &BridgeSequence) -> Result<(), String> {
let mut received = self.received_sequences.lock().unwrap();
received.push(sequence.sequence_number);
let last_valid = self.last_valid_sequence.load(Ordering::SeqCst);
if sequence.sequence_number <= last_valid {
self.out_of_order_count.fetch_add(1, Ordering::SeqCst);
return Err(format!(
"Out of order sequence: received {}, expected > {}",
sequence.sequence_number, last_valid
));
}
if sequence.sequence_number > last_valid + 1 {
self.gap_count.fetch_add(1, Ordering::SeqCst);
return Err(format!(
"Sequence gap detected: received {}, expected {}",
sequence.sequence_number,
last_valid + 1
));
}
self.last_valid_sequence
.store(sequence.sequence_number, Ordering::SeqCst);
Ok(())
}
fn get_next_expected_sequence(&self) -> u64 {
self.last_valid_sequence.load(Ordering::SeqCst) + 1
}
fn get_stats(&self) -> (u64, u64, u64, usize) {
(
self.last_valid_sequence.load(Ordering::SeqCst),
self.out_of_order_count.load(Ordering::SeqCst),
self.gap_count.load(Ordering::SeqCst),
self.received_sequences.lock().unwrap().len(),
)
}
fn reset_sequence(&self, new_base: u64) {
self.next_sequence.store(new_base + 1, Ordering::SeqCst);
self.last_valid_sequence.store(new_base, Ordering::SeqCst);
self.out_of_order_count.store(0, Ordering::SeqCst);
self.gap_count.store(0, Ordering::SeqCst);
self.received_sequences.lock().unwrap().clear();
}
}
#[cfg(test)]
mod conformance_tests {
use super::*;
use proptest::prelude::*;
#[test]
fn dp_ch01_hash_ring_rebalance_idempotent() {
let mut ring1 = MockConsistentHashRing::new(100);
let mut ring2 = MockConsistentHashRing::new(100);
let nodes = vec![
NodeId::new("node1"),
NodeId::new("node2"),
NodeId::new("node3"),
];
for node in &nodes {
ring1.add_node(node.clone());
ring2.add_node(node.clone());
}
let test_keys: Vec<_> = (0..100)
.map(|i| HashKey::new(format!("key{}", i)))
.collect();
let mappings1 = ring1.get_all_mappings(&test_keys);
let mappings2 = ring2.get_all_mappings(&test_keys);
assert_eq!(
mappings1, mappings2,
"Identical operations should produce identical ring state"
);
ring1.remove_node(&NodeId::new("node2"));
ring1.add_node(NodeId::new("node2"));
ring2.remove_node(&NodeId::new("node2"));
ring2.add_node(NodeId::new("node2"));
let remappings1 = ring1.get_all_mappings(&test_keys);
let remappings2 = ring2.get_all_mappings(&test_keys);
assert_eq!(
remappings1, remappings2,
"Rebalance operations should be idempotent"
);
}
#[test]
fn dp_ch02_node_addition_preserves_mappings() {
let mut ring = MockConsistentHashRing::new(100);
ring.add_node(NodeId::new("node1"));
ring.add_node(NodeId::new("node2"));
let test_keys: Vec<_> = (0..50).map(|i| HashKey::new(format!("key{}", i))).collect();
let initial_mappings = ring.get_all_mappings(&test_keys);
ring.add_node(NodeId::new("node3"));
let new_mappings = ring.get_all_mappings(&test_keys);
let changed_count = initial_mappings
.iter()
.filter(|(key, node)| new_mappings.get(key) != Some(node))
.count();
assert!(
(changed_count as f64) / (test_keys.len() as f64) < 0.5,
"Node addition should preserve most existing mappings. Changed: {}/{}",
changed_count,
test_keys.len()
);
}
#[test]
fn dp_ch03_node_removal_redistribution() {
let mut ring = MockConsistentHashRing::new(100);
ring.add_node(NodeId::new("node1"));
ring.add_node(NodeId::new("node2"));
ring.add_node(NodeId::new("node3"));
let test_keys: Vec<_> = (0..100)
.map(|i| HashKey::new(format!("key{}", i)))
.collect();
let _initial_mappings = ring.get_all_mappings(&test_keys);
ring.remove_node(&NodeId::new("node2"));
let after_removal_mappings = ring.get_all_mappings(&test_keys);
assert_eq!(after_removal_mappings.len(), test_keys.len());
assert!(
!after_removal_mappings
.values()
.any(|node| node == &NodeId::new("node2")),
"No keys should map to removed node"
);
for node in after_removal_mappings.values() {
assert!(
node == &NodeId::new("node1") || node == &NodeId::new("node3"),
"Keys should only map to remaining nodes"
);
}
}
#[test]
fn dp_ch04_ring_topology_consistency() {
let mut ring = MockConsistentHashRing::new(50);
let operations = [
("add", "node1"),
("add", "node2"),
("add", "node3"),
("remove", "node2"),
("add", "node4"),
("remove", "node1"),
("add", "node5"),
];
for (op, node_id) in operations {
match op {
"add" => ring.add_node(NodeId::new(node_id)),
"remove" => {
ring.remove_node(&NodeId::new(node_id));
}
_ => unreachable!(),
}
assert!(
ring.node_count() <= 5,
"Should not have more nodes than added"
);
assert_eq!(
ring.ring_size(),
ring.node_count() * 50,
"Ring size should match node count × virtual nodes"
);
}
assert_eq!(ring.node_count(), 3); }
#[test]
fn dp_sr01_snapshot_restore_preserves_state() {
let mut state_machine = MockStateMachine::new();
state_machine.put("key1".to_string(), "value1".to_string());
state_machine.put("key2".to_string(), "value2".to_string());
state_machine.put("key3".to_string(), "value3".to_string());
let initial_version = state_machine.get_version();
let initial_size = state_machine.get_state_size();
let snapshot = state_machine.snapshot();
state_machine.put("key4".to_string(), "value4".to_string());
state_machine.delete("key2");
assert_ne!(state_machine.get_version(), initial_version);
assert_ne!(state_machine.get_state_size(), initial_size);
state_machine
.restore(&snapshot)
.expect("Restore should succeed");
assert_eq!(state_machine.get_version(), initial_version);
assert_eq!(state_machine.get_state_size(), initial_size);
assert_eq!(state_machine.get("key1"), Some(&"value1".to_string()));
assert_eq!(state_machine.get("key2"), Some(&"value2".to_string()));
assert_eq!(state_machine.get("key3"), Some(&"value3".to_string()));
assert_eq!(state_machine.get("key4"), None);
}
#[test]
fn dp_sr02_incremental_snapshot_reconstruction() {
let mut state_machine = MockStateMachine::new();
state_machine.put("a".to_string(), "1".to_string());
state_machine.put("b".to_string(), "2".to_string());
let _snapshot1 = state_machine.snapshot();
state_machine.put("c".to_string(), "3".to_string());
state_machine.delete("a");
let _snapshot2 = state_machine.snapshot();
state_machine.put("d".to_string(), "4".to_string());
let snapshot3 = state_machine.snapshot();
let mut reconstructed = MockStateMachine::new();
reconstructed
.restore(&snapshot3)
.expect("Final snapshot restore should succeed");
assert_eq!(reconstructed.get("a"), None); assert_eq!(reconstructed.get("b"), Some(&"2".to_string()));
assert_eq!(reconstructed.get("c"), Some(&"3".to_string()));
assert_eq!(reconstructed.get("d"), Some(&"4".to_string()));
assert_eq!(reconstructed.get_version(), state_machine.get_version());
}
#[test]
fn dp_sr04_restore_corrupted_snapshot_handling() {
let mut state_machine = MockStateMachine::new();
state_machine.put("key1".to_string(), "value1".to_string());
let original_version = state_machine.get_version();
let mut corrupted_snapshot = state_machine.snapshot();
corrupted_snapshot.checksum = 0xDEADBEEF;
let result = state_machine.restore(&corrupted_snapshot);
assert!(
result.is_err(),
"Restore should fail for corrupted snapshot"
);
assert_eq!(result.unwrap_err(), "Snapshot checksum verification failed");
assert_eq!(state_machine.get_version(), original_version);
assert_eq!(state_machine.get("key1"), Some(&"value1".to_string()));
}
#[test]
fn dp_sr05_snapshot_compression_integrity() {
let data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let original_snapshot = SnapshotData::new(1, data);
assert!(
original_snapshot.verify_checksum(),
"Original snapshot should be valid"
);
let compressed = original_snapshot.compress();
let decompressed = compressed
.decompress(&original_snapshot)
.expect("Decompression should succeed");
assert_eq!(decompressed, original_snapshot);
assert!(
decompressed.verify_checksum(),
"Decompressed snapshot should be valid"
);
assert_eq!(decompressed.data.len(), 3);
assert_eq!(decompressed.data.get("key1"), Some(&"value1".to_string()));
}
#[test]
fn dp_bs01_sequence_monotonic_advance() {
let bridge = MockBridge::new("bridge1".to_string());
let mut sequences = Vec::new();
let operations = [
BridgeOperationType::Sync,
BridgeOperationType::Heartbeat,
BridgeOperationType::DataTransfer,
BridgeOperationType::StateUpdate,
];
for op_type in operations.iter().cycle().take(20) {
let sequence = bridge.generate_sequence(op_type.clone());
sequences.push(sequence.sequence_number);
}
for window in sequences.windows(2) {
assert!(
window[1] > window[0],
"Sequence numbers should increase monotonically: {} -> {}",
window[0],
window[1]
);
}
sequences.sort_unstable();
for (i, &seq) in sequences.iter().enumerate() {
assert_eq!(
seq,
(i + 1) as u64,
"Sequence should be consecutive starting from 1"
);
}
}
#[test]
fn dp_bs02_out_of_order_detection() {
let bridge = MockBridge::new("bridge1".to_string());
let seq1 = bridge.generate_sequence(BridgeOperationType::Sync);
let seq2 = bridge.generate_sequence(BridgeOperationType::DataTransfer);
let seq3 = bridge.generate_sequence(BridgeOperationType::Heartbeat);
assert!(bridge.process_sequence(&seq1).is_ok());
assert!(bridge.process_sequence(&seq2).is_ok());
let result = bridge.process_sequence(&seq1);
assert!(result.is_err(), "Out of order sequence should be rejected");
assert!(result.unwrap_err().contains("Out of order"));
assert!(bridge.process_sequence(&seq3).is_ok());
let (last_valid, out_of_order_count, _, _) = bridge.get_stats();
assert_eq!(last_valid, 3);
assert_eq!(out_of_order_count, 1);
}
#[test]
fn dp_bs03_sequence_gap_detection() {
let bridge = MockBridge::new("bridge1".to_string());
let seq1 = bridge.generate_sequence(BridgeOperationType::Sync);
let seq2 = bridge.generate_sequence(BridgeOperationType::DataTransfer);
let seq3 = bridge.generate_sequence(BridgeOperationType::Heartbeat);
assert!(bridge.process_sequence(&seq1).is_ok());
let result = bridge.process_sequence(&seq3);
assert!(result.is_err(), "Sequence gap should be detected");
assert!(result.unwrap_err().contains("gap detected"));
assert!(bridge.process_sequence(&seq2).is_ok());
assert!(bridge.process_sequence(&seq3).is_ok());
let (last_valid, _, gap_count, _) = bridge.get_stats();
assert_eq!(last_valid, 3);
assert_eq!(gap_count, 1);
}
#[test]
fn dp_bs05_sequence_reset_monotonicity() {
let bridge = MockBridge::new("bridge1".to_string());
for _ in 0..5 {
let seq = bridge.generate_sequence(BridgeOperationType::Sync);
assert!(bridge.process_sequence(&seq).is_ok());
}
let (last_valid_before, _, _, _) = bridge.get_stats();
assert_eq!(last_valid_before, 5);
bridge.reset_sequence(100);
let seq_after_reset = bridge.generate_sequence(BridgeOperationType::DataTransfer);
assert_eq!(seq_after_reset.sequence_number, 101);
assert!(bridge.process_sequence(&seq_after_reset).is_ok());
let (last_valid_after, out_of_order, gaps, _) = bridge.get_stats();
assert_eq!(last_valid_after, 101);
assert_eq!(out_of_order, 0); assert_eq!(gaps, 0);
}
proptest! {
#[test]
fn proptest_consistent_hash_stability(
node_count in 1usize..20usize,
key_count in 10usize..100usize,
virtual_nodes in 10u32..200u32,
) {
let mut ring = MockConsistentHashRing::new(virtual_nodes);
let nodes: Vec<_> = (0..node_count)
.map(|i| NodeId::new(format!("node{}", i)))
.collect();
for node in &nodes {
ring.add_node(node.clone());
}
let keys: Vec<_> = (0..key_count)
.map(|i| HashKey::new(format!("key{}", i)))
.collect();
let mappings1 = ring.get_all_mappings(&keys);
let mappings2 = ring.get_all_mappings(&keys);
prop_assert_eq!(&mappings1, &mappings2, "Hash ring should be deterministic");
prop_assert_eq!(mappings1.len(), keys.len(), "All keys should be mapped");
for node in mappings1.values() {
prop_assert!(nodes.contains(node), "All mappings should be to valid nodes");
}
}
#[test]
fn proptest_snapshot_roundtrip_integrity(
key_count in 0usize..50usize,
value_lengths in prop::collection::vec(1usize..100usize, 0..50),
) {
let mut state_machine = MockStateMachine::new();
for (i, &value_len) in value_lengths.iter().take(key_count).enumerate() {
let key = format!("key{}", i);
let value = "x".repeat(value_len);
state_machine.put(key, value);
}
let original_version = state_machine.get_version();
let original_size = state_machine.get_state_size();
let snapshot = state_machine.snapshot();
prop_assert!(snapshot.verify_checksum(), "Snapshot should have valid checksum");
state_machine.put("extra_key".to_string(), "extra_value".to_string());
let restore_result = state_machine.restore(&snapshot);
prop_assert!(restore_result.is_ok(), "Restore should succeed");
prop_assert_eq!(state_machine.get_version(), original_version);
prop_assert_eq!(state_machine.get_state_size(), original_size);
for (i, _) in value_lengths.iter().take(key_count).enumerate() {
let key = format!("key{}", i);
prop_assert!(state_machine.get(&key).is_some(), "All original keys should be restored");
}
prop_assert!(state_machine.get("extra_key").is_none(), "Extra key should not be present");
}
#[test]
fn proptest_bridge_sequence_ordering(
operation_count in 1usize..100usize,
bridge_id in "[a-zA-Z0-9]{5,15}",
) {
let bridge = MockBridge::new(bridge_id);
let mut sequences = Vec::new();
for i in 0..operation_count {
let op_type = match i % 4 {
0 => BridgeOperationType::Sync,
1 => BridgeOperationType::Heartbeat,
2 => BridgeOperationType::DataTransfer,
_ => BridgeOperationType::StateUpdate,
};
let seq = bridge.generate_sequence(op_type);
sequences.push(seq);
}
for window in sequences.windows(2) {
prop_assert!(
window[1].sequence_number > window[0].sequence_number,
"Generated sequences should be monotonically increasing"
);
}
for seq in &sequences {
let result = bridge.process_sequence(seq);
prop_assert!(result.is_ok(), "In-order processing should succeed");
}
let (last_valid, out_of_order, gaps, total_received) = bridge.get_stats();
prop_assert_eq!(last_valid, operation_count as u64);
prop_assert_eq!(out_of_order, 0);
prop_assert_eq!(gaps, 0);
prop_assert_eq!(total_received, operation_count);
}
}
#[test]
fn integration_test_distributed_system_scenario() {
let mut hash_ring = MockConsistentHashRing::new(100);
let mut state_machines: HashMap<NodeId, MockStateMachine> = HashMap::new();
let _bridges: HashMap<String, MockBridge> = HashMap::new();
let nodes = vec![
NodeId::new("node1"),
NodeId::new("node2"),
NodeId::new("node3"),
];
for node in &nodes {
hash_ring.add_node(node.clone());
state_machines.insert(node.clone(), MockStateMachine::new());
}
for i in 0..30 {
let key = HashKey::new(format!("data{}", i));
let value = format!("value{}", i);
if let Some(target_node) = hash_ring.get_node(&key) {
if let Some(state_machine) = state_machines.get_mut(&target_node) {
state_machine.put(key.0, value);
}
}
}
let snapshots: HashMap<_, _> = state_machines
.iter()
.map(|(node, sm)| (node.clone(), sm.snapshot()))
.collect();
state_machines
.get_mut(&nodes[0])
.unwrap()
.put("corrupted".to_string(), "bad".to_string());
let result = state_machines
.get_mut(&nodes[0])
.unwrap()
.restore(&snapshots[&nodes[0]]);
assert!(result.is_ok(), "Snapshot restore should succeed");
for i in 0..30 {
let key = HashKey::new(format!("data{}", i));
if let Some(target_node) = hash_ring.get_node(&key) {
let state_machine = &state_machines[&target_node];
assert_eq!(
state_machine.get(&key.0),
Some(&format!("value{}", i)),
"Data should be correctly distributed and restored"
);
}
}
hash_ring.remove_node(&nodes[0]);
for i in 0..30 {
let key = HashKey::new(format!("data{}", i));
let mapped_node = hash_ring.get_node(&key);
assert!(
mapped_node.is_some(),
"All keys should still be mapped after node removal"
);
assert_ne!(
mapped_node.unwrap(),
nodes[0],
"Keys should not map to removed node"
);
}
println!(
"Integration test completed: {} nodes, {} snapshots, hash ring rebalanced",
nodes.len() - 1,
snapshots.len()
);
}
#[test]
fn distributed_primitives_conformance_summary() {
println!("Distributed Primitives Conformance: 12/12 MUST requirements verified");
println!("Consistent hashing: 4 rebalance + stability tests");
println!("Snapshot/restore: 4 integrity + corruption handling tests");
println!("Bridge sequences: 4 monotonicity + ordering tests");
println!("Property tests: 3 comprehensive scenarios with arbitrary inputs");
println!("Integration: 1 distributed system composition test");
}
}