use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use crate::clock::HybridTimestamp;
use crate::{Crdt, DeltaCrdt};
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct LWWMap<K: Ord + Clone, V: Clone> {
entries: BTreeMap<K, Entry<V>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
struct Entry<V: Clone> {
value: Option<V>,
timestamp: HybridTimestamp,
alive: bool,
}
impl<K: Ord + Clone, V: Clone> LWWMap<K, V> {
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
pub fn insert(&mut self, key: K, value: V, timestamp: HybridTimestamp) {
match self.entries.get(&key) {
Some(entry) if entry.timestamp >= timestamp => {}
_ => {
self.entries.insert(
key,
Entry {
value: Some(value),
timestamp,
alive: true,
},
);
}
}
}
pub fn remove(&mut self, key: &K, timestamp: HybridTimestamp) -> bool {
match self.entries.get(key) {
Some(entry) if entry.timestamp >= timestamp => false,
_ => {
self.entries.insert(
key.clone(),
Entry {
value: None,
timestamp,
alive: false,
},
);
true
}
}
}
#[must_use]
pub fn get(&self, key: &K) -> Option<&V> {
self.entries
.get(key)
.filter(|e| e.alive)
.and_then(|e| e.value.as_ref())
}
#[must_use]
pub fn contains_key(&self, key: &K) -> bool {
self.entries.get(key).is_some_and(|e| e.alive)
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.values().filter(|e| e.alive).count()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.entries
.iter()
.filter_map(|(k, e)| {
if e.alive {
e.value.as_ref().map(|v| (k, v))
} else {
None
}
})
}
pub fn keys(&self) -> impl Iterator<Item = &K> {
self.iter().map(|(k, _)| k)
}
pub fn values(&self) -> impl Iterator<Item = &V> {
self.iter().map(|(_, v)| v)
}
}
impl<K: Ord + Clone, V: Clone> Default for LWWMap<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K: Ord + Clone, V: Clone> Crdt for LWWMap<K, V> {
fn merge(&mut self, other: &Self) {
for (key, other_entry) in &other.entries {
match self.entries.get(key) {
Some(self_entry) if self_entry.timestamp >= other_entry.timestamp => {}
_ => {
self.entries.insert(key.clone(), other_entry.clone());
}
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct LWWMapDelta<K: Ord + Clone, V: Clone> {
entries: Vec<(K, Option<V>, HybridTimestamp, bool)>,
}
impl<K: Ord + Clone, V: Clone> DeltaCrdt for LWWMap<K, V> {
type Delta = LWWMapDelta<K, V>;
fn delta(&self, other: &Self) -> LWWMapDelta<K, V> {
let mut entries = Vec::new();
for (key, self_entry) in &self.entries {
let dominated = other
.entries
.get(key)
.is_some_and(|oe| oe.timestamp >= self_entry.timestamp);
if !dominated {
entries.push((
key.clone(),
self_entry.value.clone(),
self_entry.timestamp,
self_entry.alive,
));
}
}
LWWMapDelta { entries }
}
fn apply_delta(&mut self, delta: &LWWMapDelta<K, V>) {
for (key, value, timestamp, alive) in &delta.entries {
match self.entries.get(key) {
Some(entry) if entry.timestamp >= *timestamp => {}
_ => {
self.entries.insert(
key.clone(),
Entry {
value: value.clone(),
timestamp: *timestamp,
alive: *alive,
},
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(physical: u64, node: u16) -> HybridTimestamp {
HybridTimestamp {
physical,
logical: 0,
node_id: node,
}
}
#[test]
fn new_map_is_empty() {
let m = LWWMap::<String, String>::new();
assert!(m.is_empty());
assert_eq!(m.len(), 0);
}
#[test]
fn insert_and_get() {
let mut m = LWWMap::new();
m.insert("key", "value", ts(1, 1));
assert_eq!(m.get(&"key"), Some(&"value"));
assert!(m.contains_key(&"key"));
assert_eq!(m.len(), 1);
}
#[test]
fn later_write_wins() {
let mut m = LWWMap::new();
m.insert("k", "old", ts(1, 1));
m.insert("k", "new", ts(2, 1));
assert_eq!(m.get(&"k"), Some(&"new"));
}
#[test]
fn stale_write_ignored() {
let mut m = LWWMap::new();
m.insert("k", "new", ts(2, 1));
m.insert("k", "old", ts(1, 1));
assert_eq!(m.get(&"k"), Some(&"new"));
}
#[test]
fn remove_hides_key() {
let mut m = LWWMap::new();
m.insert("k", "v", ts(1, 1));
assert!(m.remove(&"k", ts(2, 1)));
assert!(!m.contains_key(&"k"));
assert_eq!(m.get(&"k"), None);
assert_eq!(m.len(), 0);
}
#[test]
fn stale_remove_ignored() {
let mut m = LWWMap::new();
m.insert("k", "v", ts(2, 1));
assert!(!m.remove(&"k", ts(1, 1)));
assert!(m.contains_key(&"k"));
}
#[test]
fn insert_after_remove() {
let mut m = LWWMap::new();
m.insert("k", "v1", ts(1, 1));
m.remove(&"k", ts(2, 1));
m.insert("k", "v2", ts(3, 1));
assert_eq!(m.get(&"k"), Some(&"v2"));
}
#[test]
fn merge_later_wins() {
let mut m1 = LWWMap::new();
m1.insert("k", "old", ts(1, 1));
let mut m2 = LWWMap::new();
m2.insert("k", "new", ts(2, 2));
m1.merge(&m2);
assert_eq!(m1.get(&"k"), Some(&"new"));
}
#[test]
fn merge_is_commutative() {
let mut m1 = LWWMap::new();
m1.insert("a", 1, ts(1, 1));
m1.insert("b", 2, ts(2, 1));
let mut m2 = LWWMap::new();
m2.insert("b", 3, ts(3, 2));
m2.insert("c", 4, ts(1, 2));
let mut left = m1.clone();
left.merge(&m2);
let mut right = m2.clone();
right.merge(&m1);
assert_eq!(left, right);
}
#[test]
fn merge_is_idempotent() {
let mut m1 = LWWMap::new();
m1.insert("k", "v", ts(1, 1));
let m2 = m1.clone();
m1.merge(&m2);
let after = m1.clone();
m1.merge(&m2);
assert_eq!(m1, after);
}
#[test]
fn merge_propagates_remove() {
let mut m1 = LWWMap::new();
m1.insert("k", "v", ts(1, 1));
let mut m2 = m1.clone();
m2.remove(&"k", ts(2, 2));
m1.merge(&m2);
assert!(!m1.contains_key(&"k"));
}
#[test]
fn delta_apply_equivalent_to_merge() {
let mut m1 = LWWMap::new();
m1.insert("a", 1, ts(1, 1));
m1.insert("b", 2, ts(3, 1));
let mut m2 = LWWMap::new();
m2.insert("b", 3, ts(2, 2));
m2.insert("c", 4, ts(1, 2));
let mut via_merge = m2.clone();
via_merge.merge(&m1);
let mut via_delta = m2.clone();
let d = m1.delta(&m2);
via_delta.apply_delta(&d);
assert_eq!(via_merge, via_delta);
}
#[test]
fn delta_is_empty_when_dominated() {
let mut m1 = LWWMap::new();
m1.insert("k", "old", ts(1, 1));
let mut m2 = LWWMap::new();
m2.insert("k", "new", ts(2, 2));
let d = m1.delta(&m2);
assert!(d.entries.is_empty());
}
#[test]
fn iterate_alive_entries() {
let mut m = LWWMap::new();
m.insert("a", 1, ts(1, 1));
m.insert("b", 2, ts(2, 1));
m.insert("c", 3, ts(3, 1));
m.remove(&"b", ts(4, 1));
let keys: Vec<_> = m.keys().collect();
assert_eq!(keys, vec![&"a", &"c"]);
}
}