use crdt_kit::clock::{HybridClock, HybridTimestamp};
use crdt_kit::prelude::*;
fn main() {
println!("╔══════════════════════════════════════════════════════╗");
println!("║ crdt-kit — IoT Sensor Dashboard Demo ║");
println!("╚══════════════════════════════════════════════════════╝\n");
sensor_readings();
device_registry();
alert_log();
config_management();
firmware_tracking();
full_dashboard_sync();
}
fn sensor_readings() {
println!("━━━ 1. SENSOR EVENT COUNTERS (GCounter + PNCounter) ━━━\n");
let mut gate_a = GCounter::new(1);
let mut gate_b = GCounter::new(2);
let mut gate_c = GCounter::new(3);
gate_a.increment_by(147);
gate_b.increment_by(89);
gate_c.increment_by(203);
println!(" Gate A detections: {}", gate_a.value());
println!(" Gate B detections: {}", gate_b.value());
println!(" Gate C detections: {}", gate_c.value());
let mut dashboard = GCounter::new(100);
dashboard.merge(&gate_a);
dashboard.merge(&gate_b);
dashboard.merge(&gate_c);
println!(" ─────────────────────────────────");
println!(" Dashboard total: {} detections", dashboard.value());
assert_eq!(dashboard.value(), 439);
dashboard.merge(&gate_a);
dashboard.merge(&gate_b);
println!(" After re-sync: {} (idempotent)", dashboard.value());
assert_eq!(dashboard.value(), 439);
gate_a.increment_by(12); let delta = gate_a.delta(&dashboard);
dashboard.apply_delta(&delta);
println!(" After delta sync: {} (+12 from gate A)", dashboard.value());
assert_eq!(dashboard.value(), 451);
println!("\n Water tank level (PNCounter):");
let mut fill_sensor = PNCounter::new(10);
let mut drain_sensor = PNCounter::new(11);
for _ in 0..500 {
fill_sensor.increment(); }
for _ in 0..320 {
drain_sensor.decrement(); }
fill_sensor.merge(&drain_sensor);
println!(" Fill: +500L, Drain: -320L = {}L net", fill_sensor.value());
assert_eq!(fill_sensor.value(), 180);
println!();
}
fn device_registry() {
println!("━━━ 2. DEVICE REGISTRY (AWMap + ORSet) ━━━\n");
let mut gw_north = ORSet::new(1);
let mut gw_south = ORSet::new(2);
gw_north.insert("sensor-001");
gw_north.insert("sensor-002");
gw_north.insert("sensor-003");
gw_south.insert("sensor-004");
gw_south.insert("sensor-005");
println!(" North gateway: {} devices", gw_north.len());
println!(" South gateway: {} devices", gw_south.len());
gw_north.remove(&"sensor-002");
println!(" Admin removes sensor-002 from north");
gw_south.insert("sensor-002");
println!(" sensor-002 re-registers on south (concurrent)");
gw_north.merge(&gw_south);
gw_south.merge(&gw_north);
println!("\n After sync:");
println!(" sensor-002 active: {} (add wins!)", gw_north.contains(&"sensor-002"));
println!(" Total fleet: {} devices", gw_north.len());
assert!(gw_north.contains(&"sensor-002"));
println!("\n Device metadata (AWMap):");
let mut meta_a = AWMap::new(1);
meta_a.insert("sensor-001", "firmware=2.1,zone=A");
meta_a.insert("sensor-002", "firmware=2.0,zone=B");
let mut meta_b = AWMap::new(2);
meta_b.insert("sensor-001", "firmware=2.2,zone=A");
meta_a.merge(&meta_b);
println!(" sensor-001 meta: {:?}", meta_a.get(&"sensor-001"));
println!(" sensor-002 meta: {:?}", meta_a.get(&"sensor-002"));
println!();
}
fn alert_log() {
println!("━━━ 3. ALERT LOG (GSet) ━━━\n");
let mut edge_alerts = GSet::new();
let mut cloud_alerts = GSet::new();
edge_alerts.insert("2024-01-15T08:30:00 TEMP_HIGH sensor-003");
edge_alerts.insert("2024-01-15T09:15:00 HUMIDITY_LOW sensor-001");
edge_alerts.insert("2024-01-15T10:00:00 MOTION_AFTER_HOURS gate-B");
cloud_alerts.insert("2024-01-15T08:45:00 BATTERY_LOW sensor-005");
cloud_alerts.insert("2024-01-15T09:15:00 HUMIDITY_LOW sensor-001");
println!(" Edge alerts: {}", edge_alerts.len());
println!(" Cloud alerts: {}", cloud_alerts.len());
edge_alerts.merge(&cloud_alerts);
println!(" Merged total: {} unique alerts", edge_alerts.len());
assert_eq!(edge_alerts.len(), 4);
for alert in edge_alerts.iter() {
println!(" ⚠ {alert}");
}
println!();
}
fn config_management() {
println!("━━━ 4. SENSOR CONFIG (LWWMap) ━━━\n");
let ts = |ms: u64, node: u16| HybridTimestamp {
physical: ms,
logical: 0,
node_id: node,
};
let mut config_admin = LWWMap::new();
config_admin.insert("sample_rate_ms", 1000u32, ts(100, 1));
config_admin.insert("threshold_temp_c", 85, ts(100, 1));
config_admin.insert("threshold_humidity", 30, ts(100, 1));
config_admin.insert("report_interval_s", 60, ts(100, 1));
println!(" Admin sets initial config:");
for (k, v) in config_admin.iter() {
println!(" {k} = {v}");
}
let mut config_field = LWWMap::new();
config_field.insert("sample_rate_ms", 500u32, ts(200, 2));
config_field.insert("threshold_temp_c", 90, ts(200, 2));
println!("\n Field engineer updates: sample_rate=500, temp_threshold=90");
let mut config_stale = LWWMap::new();
config_stale.insert("sample_rate_ms", 2000u32, ts(50, 3));
println!(" Stale admin sets sample_rate=2000 (ts=50, will be ignored)");
config_admin.merge(&config_field);
config_admin.merge(&config_stale);
println!("\n Final config after merge:");
for (k, v) in config_admin.iter() {
println!(" {k} = {v}");
}
assert_eq!(config_admin.get(&"sample_rate_ms"), Some(&500));
assert_eq!(config_admin.get(&"threshold_temp_c"), Some(&90));
config_field.remove(&"report_interval_s", ts(300, 2));
config_admin.merge(&config_field);
println!("\n After removing report_interval_s:");
println!(" report_interval_s present: {}", config_admin.contains_key(&"report_interval_s"));
assert!(!config_admin.contains_key(&"report_interval_s"));
println!();
}
fn firmware_tracking() {
println!("━━━ 5. FIRMWARE TRACKING (MVRegister + LWWRegister) ━━━\n");
let mut gw1_view = MVRegister::new(1);
let mut gw2_view = MVRegister::new(2);
gw1_view.set("v2.1.0".to_string());
println!(" GW1 reports firmware: {:?}", gw1_view.values());
gw2_view.merge(&gw1_view);
println!(" GW2 syncs, sees: {:?}", gw2_view.values());
gw1_view.set("v2.2.0-hotfix".to_string());
gw2_view.set("v2.2.0-stable".to_string());
println!("\n [Network partition — concurrent OTA updates]");
println!(" GW1 pushes: {:?}", gw1_view.values());
println!(" GW2 pushes: {:?}", gw2_view.values());
gw1_view.merge(&gw2_view);
println!("\n After sync:");
println!(" Versions: {:?}", gw1_view.values());
println!(" Conflict? {}", gw1_view.is_conflicted());
assert!(gw1_view.is_conflicted());
gw1_view.set("v2.2.1-unified".to_string());
println!("\n Ops resolves to: {:?}", gw1_view.values());
println!(" Conflict? {}", gw1_view.is_conflicted());
assert!(!gw1_view.is_conflicted());
println!("\n Last heartbeat (LWWRegister):");
let mut clock1 = HybridClock::new(1);
let mut clock2 = HybridClock::new(2);
let mut heartbeat1 = LWWRegister::new("2024-01-15T10:30:00Z", &mut clock1);
let heartbeat2 = LWWRegister::new("2024-01-15T10:31:00Z", &mut clock2);
heartbeat1.merge(&heartbeat2);
println!(" Latest heartbeat: {}", heartbeat1.value());
println!();
}
fn full_dashboard_sync() {
println!("━━━ 6. FULL DASHBOARD SYNC (Delta State) ━━━\n");
let mut edge1_events = GCounter::new(1);
edge1_events.increment_by(1200);
let mut edge1_notes = TextCrdt::new(1);
edge1_notes.insert_str(0, "Zone A: all nominal").unwrap();
let mut edge2_events = GCounter::new(2);
edge2_events.increment_by(890);
let mut edge2_notes = TextCrdt::new(2);
edge2_notes.insert_str(0, "Zone B: sensor-003 replaced").unwrap();
let mut central_events = GCounter::new(100);
let mut central_notes = TextCrdt::new(100);
println!(" Edge 1: {} events, notes=\"{}\"", edge1_events.value(), edge1_notes);
println!(" Edge 2: {} events, notes=\"{}\"", edge2_events.value(), edge2_notes);
println!(" Central: {} events (stale)", central_events.value());
let delta1 = edge1_events.delta(¢ral_events);
let delta2 = edge2_events.delta(¢ral_events);
central_events.apply_delta(&delta1);
central_events.apply_delta(&delta2);
central_notes.merge(&edge1_notes);
central_notes.merge(&edge2_notes);
println!("\n ─── After delta sync ───");
println!(" Central events: {}", central_events.value());
println!(" Central notes: \"{}\"", central_notes);
assert_eq!(central_events.value(), 2090);
edge1_events.increment_by(50);
let delta_incremental = edge1_events.delta(¢ral_events);
central_events.apply_delta(&delta_incremental);
println!("\n Edge 1 adds 50 more events...");
println!(" Central after incremental delta: {}", central_events.value());
assert_eq!(central_events.value(), 2140);
println!("\n Event timeline (Rga):");
let mut timeline = Rga::new(1);
timeline.insert_at(0, "08:30 TEMP_HIGH").unwrap();
timeline.insert_at(1, "09:15 HUMIDITY_LOW").unwrap();
timeline.insert_at(2, "10:00 MOTION_DETECT").unwrap();
let mut remote_timeline = Rga::new(2);
remote_timeline.insert_at(0, "08:45 BATTERY_LOW").unwrap();
remote_timeline.insert_at(1, "09:30 RECONNECTED").unwrap();
timeline.merge(&remote_timeline);
println!(" Merged timeline ({} events):", timeline.len());
for (i, event) in timeline.iter().enumerate() {
println!(" [{i}] {event}");
}
println!("\n ✓ All CRDTs synced. Dashboard is consistent across all nodes.");
println!();
}