use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::RwLock;
use atomr_distributed_data::{CrdtMerge, LWWMap};
pub struct DDataShardCoordinator {
state: RwLock<LWWMap<String, String>>,
next_ts: AtomicU64,
}
impl Default for DDataShardCoordinator {
fn default() -> Self {
Self::new()
}
}
impl DDataShardCoordinator {
pub fn new() -> Self {
let bootstrap =
SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(1);
Self { state: RwLock::new(LWWMap::new()), next_ts: AtomicU64::new(bootstrap) }
}
fn tick(&self) -> u128 {
self.next_ts.fetch_add(1, Ordering::Relaxed) as u128
}
pub fn allocate(&self, shard_id: impl Into<String>, region: impl Into<String>) {
let ts = self.tick();
self.state.write().put(shard_id.into(), region.into(), ts);
}
pub fn region_for(&self, shard_id: &str) -> Option<String> {
self.state.read().get(&shard_id.to_string()).cloned()
}
pub fn shard_count(&self) -> usize {
self.state.read().iter().count()
}
pub fn allocations(&self) -> Vec<(String, String)> {
let mut v: Vec<(String, String)> =
self.state.read().iter().map(|(k, v)| (k.clone(), v.clone())).collect();
v.sort_by(|a, b| a.0.cmp(&b.0));
v
}
pub fn merge_remote(&self, remote: &LWWMap<String, String>) {
self.state.write().merge(remote);
}
pub fn snapshot(&self) -> LWWMap<String, String> {
self.state.read().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn allocate_and_lookup() {
let c = DDataShardCoordinator::new();
c.allocate("s1", "r1");
c.allocate("s2", "r2");
assert_eq!(c.region_for("s1"), Some("r1".into()));
assert_eq!(c.region_for("s2"), Some("r2".into()));
assert_eq!(c.shard_count(), 2);
}
#[test]
fn later_allocate_overwrites_earlier() {
let c = DDataShardCoordinator::new();
c.allocate("s1", "r1");
c.allocate("s1", "r2");
assert_eq!(c.region_for("s1"), Some("r2".into()));
}
#[test]
fn merge_remote_takes_higher_timestamp() {
let local = DDataShardCoordinator::new();
local.allocate("s1", "r1");
let mut remote = LWWMap::new();
remote.put("s1".to_string(), "r-remote".to_string(), u128::MAX);
local.merge_remote(&remote);
assert_eq!(local.region_for("s1"), Some("r-remote".into()));
}
#[test]
fn merge_remote_keeps_local_when_local_is_newer() {
let local = DDataShardCoordinator::new();
local.allocate("s1", "r-local");
let mut remote = LWWMap::new();
remote.put("s1".to_string(), "r-stale".to_string(), 1);
local.merge_remote(&remote);
assert_eq!(local.region_for("s1"), Some("r-local".into()));
}
#[test]
fn allocations_sorted_for_telemetry() {
let c = DDataShardCoordinator::new();
c.allocate("zebra", "r2");
c.allocate("alpha", "r1");
c.allocate("middle", "r3");
let snap = c.allocations();
assert_eq!(
snap,
vec![
("alpha".into(), "r1".into()),
("middle".into(), "r3".into()),
("zebra".into(), "r2".into()),
]
);
}
#[test]
fn snapshot_is_independent_of_subsequent_writes() {
let c = DDataShardCoordinator::new();
c.allocate("s1", "r1");
let snap = c.snapshot();
c.allocate("s1", "r2"); assert_eq!(snap.get(&"s1".to_string()), Some(&"r1".to_string()));
}
#[test]
fn empty_coordinator_adopts_remote_state() {
let local = DDataShardCoordinator::new();
let mut remote = LWWMap::new();
remote.put("s1".to_string(), "rA".to_string(), 100);
remote.put("s2".to_string(), "rB".to_string(), 100);
local.merge_remote(&remote);
assert_eq!(local.region_for("s1"), Some("rA".into()));
assert_eq!(local.region_for("s2"), Some("rB".into()));
}
}