#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
use crdtosphere::prelude::*;
use std::sync::Arc;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
fn main() {
println!("CRDTosphere Atomic MVRegister Example");
println!("=====================================");
#[cfg(feature = "hardware-atomic")]
{
println!("Atomic implementation:");
println!("- Allows &self for modifications");
println!("- Multi-threaded safe with Arc<T>");
println!("- Supports concurrent set operations");
println!("- Handles multiple concurrent values");
println!();
let register = Arc::new(MVRegister::<f32, DefaultConfig>::new(1));
let mut handles = vec![];
for thread_id in 0..4 {
let register_clone = Arc::clone(®ister);
let handle = thread::spawn(move || {
let base_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let node_id = thread_id + 1;
let sensor_register = MVRegister::<f32, DefaultConfig>::new(node_id);
for i in 0..3 {
let timestamp = base_time + (thread_id as u64 * 1000) + (i as u64 * 100);
let value = 20.0 + (thread_id as f32) + (i as f32 * 0.1);
if let Err(e) = sensor_register.set(value, timestamp) {
eprintln!("Sensor {} failed to set value: {:?}", node_id, e);
} else {
println!(
"Sensor {} recorded value {} at timestamp {}",
node_id, value, timestamp
);
}
thread::sleep(std::time::Duration::from_millis(10));
}
println!("Sensor {} completed readings", node_id);
sensor_register
});
handles.push(handle);
}
let sensor_registers: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
println!();
println!("Merging sensor readings into main register...");
let mut main_register = (*register).clone();
for sensor_register in &sensor_registers {
if let Err(e) = main_register.merge(sensor_register) {
eprintln!("Failed to merge sensor data: {:?}", e);
}
}
println!();
println!("Final multi-sensor register state:");
println!("Number of sensors: {}", main_register.len());
let values = main_register.values_array();
for (i, value) in values.iter().enumerate() {
if let Some(v) = value {
println!(" Sensor reading {}: {:.1}", i + 1, v);
}
}
if let Some(avg) = main_register.average() {
println!("Average reading: {:.2}", avg);
}
if let Some(min) = main_register.min() {
println!("Minimum reading: {:.1}", min);
}
if let Some(max) = main_register.max() {
println!("Maximum reading: {:.1}", max);
}
println!();
println!("Testing atomic merge operations...");
let register_a = Arc::new(MVRegister::<f32, DefaultConfig>::new(10));
let register_b = Arc::new(MVRegister::<f32, DefaultConfig>::new(20));
let register_c = Arc::new(MVRegister::<f32, DefaultConfig>::new(30));
let base_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
register_a.set(100.0, base_time + 1000).unwrap();
register_b.set(200.0, base_time + 2000).unwrap();
register_c.set(300.0, base_time + 3000).unwrap();
println!("Before merge:");
println!(
" Register A: {} values, node 10 = {:?}",
register_a.len(),
register_a.get_from_node(10)
);
println!(
" Register B: {} values, node 20 = {:?}",
register_b.len(),
register_b.get_from_node(20)
);
println!(
" Register C: {} values, node 30 = {:?}",
register_c.len(),
register_c.get_from_node(30)
);
let merge_handles = vec![
{
let reg_a = Arc::clone(®ister_a);
let reg_b = Arc::clone(®ister_b);
thread::spawn(move || {
let mut temp_a = (*reg_a).clone();
temp_a.merge(&*reg_b).unwrap();
println!(
"Thread 1: Merged B into A, result has {} values",
temp_a.len()
);
temp_a
})
},
{
let reg_a = Arc::clone(®ister_a);
let reg_c = Arc::clone(®ister_c);
thread::spawn(move || {
let mut temp_a = (*reg_a).clone();
temp_a.merge(&*reg_c).unwrap();
println!(
"Thread 2: Merged C into A, result has {} values",
temp_a.len()
);
temp_a
})
},
];
let merge_results: Vec<_> = merge_handles
.into_iter()
.map(|h| h.join().unwrap())
.collect();
println!();
println!("After merge operations:");
for (i, result) in merge_results.iter().enumerate() {
println!(" Merge result {}: {} values", i + 1, result.len());
let values = result.values_array();
for (j, value) in values.iter().enumerate() {
if let Some(v) = value {
println!(" Value {}: {:.1}", j + 1, v);
}
}
}
for result in &merge_results {
assert_eq!(result.len(), 2, "Each merge should result in 2 values");
assert!(
result.get_from_node(10).is_some(),
"Should have value from node 10"
);
assert_eq!(
*result.get_from_node(10).unwrap(),
100.0,
"Node 10 should have value 100.0"
);
}
assert!(
merge_results[0].get_from_node(20).is_some(),
"First merge should have node 20"
);
assert_eq!(
*merge_results[0].get_from_node(20).unwrap(),
200.0,
"Node 20 should have value 200.0"
);
assert!(
merge_results[1].get_from_node(30).is_some(),
"Second merge should have node 30"
);
assert_eq!(
*merge_results[1].get_from_node(30).unwrap(),
300.0,
"Node 30 should have value 300.0"
);
println!();
println!("✓ Multi-value merge semantics verified:");
println!(" - Each register maintained its node's value");
println!(" - Merge operations combined values from different nodes");
println!(" - Atomic operations maintained consistency");
println!();
println!("Testing timestamp-based conflict resolution...");
let conflict_reg_1 = Arc::new(MVRegister::<f32, DefaultConfig>::new(5));
let conflict_reg_2 = Arc::new(MVRegister::<f32, DefaultConfig>::new(5));
let early_time = base_time + 1000;
let later_time = base_time + 2000;
conflict_reg_1.set(50.0, early_time).unwrap();
conflict_reg_2.set(150.0, later_time).unwrap();
let mut conflict_result = (*conflict_reg_1).clone();
conflict_result.merge(&*conflict_reg_2).unwrap();
println!(
"Conflict resolution result: node 5 = {:?}",
conflict_result.get_from_node(5)
);
if let Some(value) = conflict_result.get_from_node(5) {
assert_eq!(*value, 150.0, "Newer timestamp should win");
}
println!(
"✓ Timestamp conflict resolution verified: newer value (150.0) won over older (50.0)"
);
println!();
println!("Testing capacity limits...");
let capacity_reg = Arc::new(MVRegister::<f32, DefaultConfig>::new(1));
let mut capacity_test = (*capacity_reg).clone();
for node_id in 1..=4 {
let mut temp_reg = MVRegister::<f32, DefaultConfig>::new(node_id);
temp_reg
.set(node_id as f32 * 10.0, base_time + node_id as u64)
.unwrap();
capacity_test.merge(&temp_reg).unwrap();
}
println!(
"Filled register to capacity: {} values",
capacity_test.len()
);
assert!(capacity_test.is_full(), "Register should be full");
let mut overflow_reg = MVRegister::<f32, DefaultConfig>::new(5);
overflow_reg.set(50.0, base_time + 5000).unwrap();
let merge_result = capacity_test.merge(&overflow_reg);
assert!(merge_result.is_err(), "Merge should fail when at capacity");
println!("✓ Capacity limit enforced: merge correctly failed when at capacity");
println!();
println!("Atomic MVRegister demonstration completed!");
println!("✓ Concurrent set operations: Multiple sensor nodes safely recorded values");
println!(
"✓ Atomic merge operations: Multi-value semantics maintained across concurrent merges"
);
println!("✓ Timestamp conflict resolution: Newer timestamps correctly won conflicts");
println!("✓ Capacity management: Buffer overflow protection working correctly");
println!("✓ Thread safety: All operations completed without data races");
println!(
"✓ Numeric operations: Average, min, max calculations working on multi-value data"
);
}
#[cfg(not(feature = "hardware-atomic"))]
{
println!("This example requires the 'hardware-atomic' feature to be enabled.");
println!("Run with: cargo run --example atomic_mv_register --features hardware-atomic");
}
}