Skip to main content

logicaffeine_data/crdt/
ormap.rs

1//! OR-Map (Observed-Remove Map) CRDT.
2//!
3//! A key-value map where values are nested CRDTs.
4//! Keys use add-wins semantics, values are merged recursively.
5
6use super::causal::{Dot, DotContext, VClock};
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use std::hash::Hash;
14
15/// Delta for ORMap synchronization.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(bound = "K: Serialize + serde::de::DeserializeOwned + Hash + Eq, V: Serialize + serde::de::DeserializeOwned")]
18pub struct ORMapDelta<K, V> {
19    pub keys: HashMap<K, HashSet<Dot>>,
20    pub values: HashMap<K, V>,
21    pub context: DotContext,
22}
23
24/// An observed-remove map with nested CRDT values.
25///
26/// Keys are managed with OR-Set add-wins semantics.
27/// Values are merged using their CRDT merge function.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(bound = "K: Serialize + serde::de::DeserializeOwned + Hash + Eq, V: Serialize + serde::de::DeserializeOwned + Merge + Default + Clone")]
30pub struct ORMap<K, V: Merge + Default + Clone> {
31    /// Map of active keys to their dots
32    keys: HashMap<K, HashSet<Dot>>,
33    /// Map of all values (including for removed keys, for resurrection)
34    values: HashMap<K, V>,
35    /// Tracks seen dots
36    context: DotContext,
37    /// This replica's ID
38    replica_id: ReplicaId,
39}
40
41impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> ORMap<K, V> {
42    /// Create a new map with a specific replica ID.
43    pub fn new(replica_id: ReplicaId) -> Self {
44        Self {
45            keys: HashMap::new(),
46            values: HashMap::new(),
47            context: DotContext::new(),
48            replica_id,
49        }
50    }
51
52    /// Create a new map with a random replica ID.
53    pub fn new_random() -> Self {
54        Self::new(generate_replica_id())
55    }
56
57    /// Get a reference to a value if the key exists.
58    pub fn get(&self, key: &K) -> Option<&V> {
59        if self.contains_key(key) {
60            self.values.get(key)
61        } else {
62            None
63        }
64    }
65
66    /// Get a mutable reference to a value, creating the key if necessary.
67    /// Always creates a new dot to ensure add-wins semantics.
68    pub fn get_or_insert(&mut self, key: K) -> &mut V {
69        // Always create a new dot for add-wins semantics
70        let dot = self.context.next(self.replica_id);
71        self.keys.entry(key.clone()).or_default().insert(dot);
72
73        // Ensure value exists
74        self.values.entry(key).or_default()
75    }
76
77    /// Get a mutable reference to a value without creating a new dot.
78    /// Use this for read-heavy access patterns where add-wins isn't needed.
79    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
80        if self.contains_key(key) {
81            self.values.get_mut(key)
82        } else {
83            None
84        }
85    }
86
87    /// Check if a key exists in the map.
88    pub fn contains_key(&self, key: &K) -> bool {
89        self.keys
90            .get(key)
91            .map_or(false, |dots| !dots.is_empty())
92    }
93
94    /// Remove a key from the map.
95    pub fn remove(&mut self, key: &K) {
96        self.keys.remove(key);
97    }
98
99    /// Get the number of keys.
100    pub fn len(&self) -> usize {
101        self.keys
102            .values()
103            .filter(|dots| !dots.is_empty())
104            .count()
105    }
106
107    /// Check if the map is empty.
108    pub fn is_empty(&self) -> bool {
109        self.len() == 0
110    }
111
112    /// Iterate over active keys.
113    pub fn keys(&self) -> impl Iterator<Item = &K> {
114        self.keys
115            .iter()
116            .filter(|(_, dots)| !dots.is_empty())
117            .map(|(k, _)| k)
118    }
119
120    /// Iterate over key-value pairs.
121    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
122        self.keys
123            .iter()
124            .filter(|(_, dots)| !dots.is_empty())
125            .filter_map(|(k, _)| self.values.get(k).map(|v| (k, v)))
126    }
127}
128
129impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> Merge for ORMap<K, V> {
130    fn merge(&mut self, other: &Self) {
131        // First, merge all values (regardless of key presence)
132        // This ensures nested CRDTs are properly merged
133        for (key, other_value) in &other.values {
134            let my_value = self.values.entry(key.clone()).or_default();
135            my_value.merge(other_value);
136        }
137
138        // Merge keys with OR-Set add-wins semantics
139        for (key, other_dots) in &other.keys {
140            let my_dots = self.keys.entry(key.clone()).or_default();
141
142            // Keep dots from other that we haven't seen
143            for &dot in other_dots {
144                if !self.context.has_seen(&dot) {
145                    my_dots.insert(dot);
146                }
147            }
148        }
149
150        // Handle removals: if we have a key that other doesn't, check if they saw our dots
151        let my_keys: Vec<_> = self.keys.keys().cloned().collect();
152        for key in my_keys {
153            if !other.keys.contains_key(&key) {
154                // Other doesn't have this key
155                if let Some(my_dots) = self.keys.get_mut(&key) {
156                    // Only keep dots that other hasn't seen
157                    my_dots.retain(|dot| !other.context.has_seen(dot));
158                }
159            } else {
160                // Both have the key
161                let other_dots = other.keys.get(&key).unwrap();
162                if let Some(my_dots) = self.keys.get_mut(&key) {
163                    // Keep dots that other hasn't seen OR that other still has
164                    my_dots.retain(|dot| !other.context.has_seen(dot) || other_dots.contains(dot));
165                }
166            }
167        }
168
169        // Merge contexts
170        self.context.merge(&other.context);
171
172        // Clean up empty key entries
173        self.keys.retain(|_, dots| !dots.is_empty());
174    }
175}
176
177impl<
178        K: Hash + Eq + Clone + Serialize + DeserializeOwned + Send + 'static,
179        V: Merge + Default + Clone + Serialize + DeserializeOwned + Send + 'static,
180    > DeltaCrdt for ORMap<K, V>
181{
182    type Delta = ORMapDelta<K, V>;
183
184    fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
185        let current = self.version();
186        if since.dominates(&current) {
187            return None;
188        }
189
190        Some(ORMapDelta {
191            keys: self.keys.clone(),
192            values: self.values.clone(),
193            context: self.context.clone(),
194        })
195    }
196
197    fn apply_delta(&mut self, delta: &Self::Delta) {
198        // Merge values
199        for (key, other_value) in &delta.values {
200            let my_value = self.values.entry(key.clone()).or_default();
201            my_value.merge(other_value);
202        }
203
204        // Merge keys
205        for (key, other_dots) in &delta.keys {
206            let my_dots = self.keys.entry(key.clone()).or_default();
207            for &dot in other_dots {
208                if !self.context.has_seen(&dot) {
209                    my_dots.insert(dot);
210                }
211            }
212        }
213
214        // Merge context
215        self.context.merge(&delta.context);
216
217        // Clean up empty entries
218        self.keys.retain(|_, dots| !dots.is_empty());
219    }
220
221    fn version(&self) -> VClock {
222        self.context.version()
223    }
224}
225
226impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> Default for ORMap<K, V> {
227    fn default() -> Self {
228        Self::new_random()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::crdt::PNCounter;
236
237    #[test]
238    fn test_ormap_get_or_insert() {
239        let mut map: ORMap<String, PNCounter> = ORMap::new(1);
240        map.get_or_insert("score".to_string()).increment(10);
241        assert_eq!(map.get(&"score".to_string()).unwrap().value(), 10);
242    }
243
244    #[test]
245    fn test_ormap_remove() {
246        let mut map: ORMap<String, PNCounter> = ORMap::new(1);
247        map.get_or_insert("key".to_string()).increment(5);
248        map.remove(&"key".to_string());
249        assert!(map.get(&"key".to_string()).is_none());
250    }
251
252    #[test]
253    fn test_ormap_concurrent_update() {
254        let mut a: ORMap<String, PNCounter> = ORMap::new(1);
255        let mut b: ORMap<String, PNCounter> = ORMap::new(2);
256
257        a.get_or_insert("score".to_string()).increment(10);
258        b.get_or_insert("score".to_string()).increment(5);
259
260        a.merge(&b);
261        assert_eq!(a.get(&"score".to_string()).unwrap().value(), 15);
262    }
263}