use crdtosphere::prelude::*;
use std::sync::Arc;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq)]
enum ConfigKey {
MaxConnections = 1,
TimeoutMs = 2,
RetryCount = 3,
BufferSize = 4,
LogLevel = 5,
}
fn get_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
fn main() -> Result<(), CRDTError> {
println!("CRDTosphere Atomic LWWMap Example");
println!("======================================");
let config_map = Arc::new(LWWMap::<ConfigKey, u32, DefaultConfig>::new(1));
println!("\n📝 Initial Configuration Setup");
println!("------------------------------");
config_map.insert(ConfigKey::MaxConnections, 100, get_timestamp())?;
config_map.insert(ConfigKey::TimeoutMs, 5000, get_timestamp())?;
config_map.insert(ConfigKey::RetryCount, 3, get_timestamp())?;
config_map.insert(ConfigKey::BufferSize, 8192, get_timestamp())?;
println!("Initial config:");
for key in [
ConfigKey::MaxConnections,
ConfigKey::TimeoutMs,
ConfigKey::RetryCount,
ConfigKey::BufferSize,
] {
if let Some(value) = config_map.get(&key) {
println!(" {:?}: {}", key, value);
}
}
println!("\n🔄 Concurrent Configuration Updates");
println!("----------------------------------");
let mut handles = vec![];
let config_clone1 = Arc::clone(&config_map);
handles.push(thread::spawn(move || {
println!("🚀 Performance thread: Increasing connection limits");
thread::sleep(std::time::Duration::from_millis(10));
config_clone1
.insert(ConfigKey::MaxConnections, 200, get_timestamp())
.unwrap();
thread::sleep(std::time::Duration::from_millis(20));
config_clone1
.insert(ConfigKey::BufferSize, 16384, get_timestamp())
.unwrap();
println!(" ✅ Performance updates applied");
}));
let config_clone2 = Arc::clone(&config_map);
handles.push(thread::spawn(move || {
println!("🛡️ Reliability thread: Adjusting timeout and retries");
thread::sleep(std::time::Duration::from_millis(15));
config_clone2
.insert(ConfigKey::TimeoutMs, 10000, get_timestamp())
.unwrap();
thread::sleep(std::time::Duration::from_millis(25));
config_clone2
.insert(ConfigKey::RetryCount, 5, get_timestamp())
.unwrap();
println!(" ✅ Reliability updates applied");
}));
let config_clone3 = Arc::clone(&config_map);
handles.push(thread::spawn(move || {
println!("📊 Monitoring thread: Adding logging configuration");
thread::sleep(std::time::Duration::from_millis(30));
config_clone3
.insert(ConfigKey::LogLevel, 2, get_timestamp())
.unwrap();
println!(" ✅ Monitoring configuration added");
}));
for handle in handles {
handle.join().unwrap();
}
println!("\n📋 Final Configuration State");
println!("----------------------------");
println!("Final config after concurrent updates:");
for key in [
ConfigKey::MaxConnections,
ConfigKey::TimeoutMs,
ConfigKey::RetryCount,
ConfigKey::BufferSize,
ConfigKey::LogLevel,
] {
if let Some(value) = config_map.get(&key) {
println!(" {:?}: {}", key, value);
}
}
println!("\n📊 Map Statistics");
println!("-----------------");
println!("Total entries: {}", config_map.len());
println!("Capacity: {}", config_map.capacity());
println!("Remaining capacity: {}", config_map.remaining_capacity());
println!("Is empty: {}", config_map.is_empty());
println!("Is full: {}", config_map.is_full());
println!("\n🔍 Configuration Merging Example");
println!("--------------------------------");
let remote_config = LWWMap::<ConfigKey, u32, DefaultConfig>::new(2);
remote_config.insert(ConfigKey::MaxConnections, 150, get_timestamp() + 1000)?; remote_config.insert(ConfigKey::TimeoutMs, 7500, get_timestamp() - 1000)?;
println!("Remote config before merge:");
for key in [ConfigKey::MaxConnections, ConfigKey::TimeoutMs] {
if let Some(value) = remote_config.get(&key) {
println!(" {:?}: {}", key, value);
}
}
let mut local_config = (*config_map).clone();
local_config.merge(&remote_config)?;
println!("\nAfter merging remote config:");
for key in [
ConfigKey::MaxConnections,
ConfigKey::TimeoutMs,
ConfigKey::RetryCount,
ConfigKey::BufferSize,
ConfigKey::LogLevel,
] {
if let Some(value) = local_config.get(&key) {
println!(" {:?}: {}", key, value);
}
}
println!("\n🗑️ Remove Operation Example");
println!("---------------------------");
println!(
"Before removal - LogLevel: {:?}",
local_config.get(&ConfigKey::LogLevel)
);
println!("Map length: {}", local_config.len());
let removed_value = local_config.remove(&ConfigKey::LogLevel);
println!("Removed LogLevel: {:?}", removed_value);
println!(
"After removal - LogLevel: {:?}",
local_config.get(&ConfigKey::LogLevel)
);
println!("Map length: {}", local_config.len());
println!("Remaining capacity: {}", local_config.remaining_capacity());
println!("\nAdding new configuration after removal:");
local_config.insert(ConfigKey::LogLevel, 1, get_timestamp())?; println!(
"Re-added LogLevel: {:?}",
local_config.get(&ConfigKey::LogLevel)
);
println!("Map length: {}", local_config.len());
println!("\n✨ Atomic LWWMap Features Demonstrated:");
println!(" • Thread-safe concurrent updates without locks");
println!(" • Last-writer-wins conflict resolution");
println!(" • Timestamp-based ordering");
println!(" • Node ID tiebreaking for same timestamps");
println!(" • Deterministic merge operations");
println!(" • Fixed memory allocation");
println!(" • Remove operations that free capacity");
println!(" • Remove and re-insert functionality");
Ok(())
}