use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use atomr_distributed_data::{GCounter, Replicator};
#[test]
fn subscribe_callback_fires_on_update() {
let r = Replicator::new();
let hits = Arc::new(AtomicU32::new(0));
let hits_cb = hits.clone();
let _token = r.subscribe("counter", move |key| {
assert_eq!(key, "counter");
hits_cb.fetch_add(1, Ordering::SeqCst);
});
let mut c = GCounter::new();
c.increment("n1", 1);
r.update("counter", c.clone());
r.update("counter", c);
assert_eq!(hits.load(Ordering::SeqCst), 2);
}
#[test]
fn drop_token_silences_subsequent_updates() {
let r = Replicator::new();
let hits = Arc::new(AtomicU32::new(0));
let hits_cb = hits.clone();
let token = r.subscribe("k", move |_| {
hits_cb.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(r.subscriber_count("k"), 1);
r.update("k", GCounter::new());
assert_eq!(hits.load(Ordering::SeqCst), 1);
drop(token);
assert_eq!(r.subscriber_count("k"), 0);
r.update("k", GCounter::new());
r.update("k", GCounter::new());
assert_eq!(hits.load(Ordering::SeqCst), 1, "callback must not fire after token drop");
}
#[test]
fn multiple_subscribers_all_see_update() {
let r = Replicator::new();
let a = Arc::new(AtomicU32::new(0));
let b = Arc::new(AtomicU32::new(0));
let c = Arc::new(AtomicU32::new(0));
let a_cb = a.clone();
let b_cb = b.clone();
let c_cb = c.clone();
let _t1 = r.subscribe("k", move |_| {
a_cb.fetch_add(1, Ordering::SeqCst);
});
let _t2 = r.subscribe("k", move |_| {
b_cb.fetch_add(1, Ordering::SeqCst);
});
let _t3 = r.subscribe("k", move |_| {
c_cb.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(r.subscriber_count("k"), 3);
r.update("k", GCounter::new());
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
assert_eq!(c.load(Ordering::SeqCst), 1);
}
#[test]
fn subscribers_are_scoped_to_their_key() {
let r = Replicator::new();
let on_a = Arc::new(AtomicU32::new(0));
let on_b = Arc::new(AtomicU32::new(0));
let on_a_cb = on_a.clone();
let on_b_cb = on_b.clone();
let _ta = r.subscribe("a", move |key| {
assert_eq!(key, "a");
on_a_cb.fetch_add(1, Ordering::SeqCst);
});
let _tb = r.subscribe("b", move |key| {
assert_eq!(key, "b");
on_b_cb.fetch_add(1, Ordering::SeqCst);
});
r.update("a", GCounter::new());
r.update("a", GCounter::new());
r.update("b", GCounter::new());
r.update("c", GCounter::new());
assert_eq!(on_a.load(Ordering::SeqCst), 2);
assert_eq!(on_b.load(Ordering::SeqCst), 1);
}
#[test]
fn delete_notifies_subscribers_and_clears_value() {
let r = Replicator::new();
let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let received_cb = received.clone();
let _t = r.subscribe("k", move |key| {
received_cb.lock().unwrap().push(key.to_string());
});
let mut c = GCounter::new();
c.increment("n1", 7);
r.update("k", c);
assert_eq!(r.get::<GCounter>("k").map(|g| g.value()), Some(7));
r.delete("k");
let log = received.lock().unwrap().clone();
assert_eq!(log, vec!["k".to_string(), "k".to_string()]);
assert!(r.get::<GCounter>("k").is_none());
}
#[test]
fn delta_crdt_disjoint_increments_merge_to_sum() {
let r = Replicator::new();
let mut from_n1 = GCounter::new();
from_n1.increment("n1", 3);
let mut from_n2 = GCounter::new();
from_n2.increment("n2", 5);
let key = "shared";
r.update(key, from_n1);
let after_first: GCounter = r.get(key).unwrap();
assert_eq!(after_first.value(), 3);
r.update(key, from_n2);
let merged: GCounter = r.get(key).unwrap();
assert_eq!(merged.value(), 3 + 5, "disjoint per-node deltas must merge to the sum");
}
#[test]
fn delta_merge_is_order_independent_and_idempotent() {
let r1 = Replicator::new();
let r2 = Replicator::new();
let mut a = GCounter::new();
a.increment("n1", 2);
let mut b = GCounter::new();
b.increment("n2", 4);
let mut c = GCounter::new();
c.increment("n3", 1);
r1.update("k", a.clone());
r1.update("k", b.clone());
r1.update("k", c.clone());
r2.update("k", c);
r2.update("k", a.clone());
r2.update("k", b.clone());
r2.update("k", a);
let v1: GCounter = r1.get("k").unwrap();
let v2: GCounter = r2.get("k").unwrap();
assert_eq!(v1.value(), v2.value());
assert_eq!(v1.value(), 2 + 4 + 1);
}