logicaffeine_data/crdt/
ormap.rs1use super::causal::{Dot, DotContext, VClock};
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use std::hash::Hash;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(bound = "K: Serialize + serde::de::DeserializeOwned + Hash + Eq, V: Serialize + serde::de::DeserializeOwned")]
18pub struct ORMapDelta<K, V> {
19 pub keys: HashMap<K, HashSet<Dot>>,
20 pub values: HashMap<K, V>,
21 pub context: DotContext,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(bound = "K: Serialize + serde::de::DeserializeOwned + Hash + Eq, V: Serialize + serde::de::DeserializeOwned + Merge + Default + Clone")]
30pub struct ORMap<K, V: Merge + Default + Clone> {
31 keys: HashMap<K, HashSet<Dot>>,
33 values: HashMap<K, V>,
35 context: DotContext,
37 replica_id: ReplicaId,
39}
40
41impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> ORMap<K, V> {
42 pub fn new(replica_id: ReplicaId) -> Self {
44 Self {
45 keys: HashMap::new(),
46 values: HashMap::new(),
47 context: DotContext::new(),
48 replica_id,
49 }
50 }
51
52 pub fn new_random() -> Self {
54 Self::new(generate_replica_id())
55 }
56
57 pub fn get(&self, key: &K) -> Option<&V> {
59 if self.contains_key(key) {
60 self.values.get(key)
61 } else {
62 None
63 }
64 }
65
66 pub fn get_or_insert(&mut self, key: K) -> &mut V {
69 let dot = self.context.next(self.replica_id);
71 self.keys.entry(key.clone()).or_default().insert(dot);
72
73 self.values.entry(key).or_default()
75 }
76
77 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
80 if self.contains_key(key) {
81 self.values.get_mut(key)
82 } else {
83 None
84 }
85 }
86
87 pub fn contains_key(&self, key: &K) -> bool {
89 self.keys
90 .get(key)
91 .map_or(false, |dots| !dots.is_empty())
92 }
93
94 pub fn remove(&mut self, key: &K) {
96 self.keys.remove(key);
97 }
98
99 pub fn len(&self) -> usize {
101 self.keys
102 .values()
103 .filter(|dots| !dots.is_empty())
104 .count()
105 }
106
107 pub fn is_empty(&self) -> bool {
109 self.len() == 0
110 }
111
112 pub fn keys(&self) -> impl Iterator<Item = &K> {
114 self.keys
115 .iter()
116 .filter(|(_, dots)| !dots.is_empty())
117 .map(|(k, _)| k)
118 }
119
120 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
122 self.keys
123 .iter()
124 .filter(|(_, dots)| !dots.is_empty())
125 .filter_map(|(k, _)| self.values.get(k).map(|v| (k, v)))
126 }
127}
128
129impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> Merge for ORMap<K, V> {
130 fn merge(&mut self, other: &Self) {
131 for (key, other_value) in &other.values {
134 let my_value = self.values.entry(key.clone()).or_default();
135 my_value.merge(other_value);
136 }
137
138 for (key, other_dots) in &other.keys {
140 let my_dots = self.keys.entry(key.clone()).or_default();
141
142 for &dot in other_dots {
144 if !self.context.has_seen(&dot) {
145 my_dots.insert(dot);
146 }
147 }
148 }
149
150 let my_keys: Vec<_> = self.keys.keys().cloned().collect();
152 for key in my_keys {
153 if !other.keys.contains_key(&key) {
154 if let Some(my_dots) = self.keys.get_mut(&key) {
156 my_dots.retain(|dot| !other.context.has_seen(dot));
158 }
159 } else {
160 let other_dots = other.keys.get(&key).unwrap();
162 if let Some(my_dots) = self.keys.get_mut(&key) {
163 my_dots.retain(|dot| !other.context.has_seen(dot) || other_dots.contains(dot));
165 }
166 }
167 }
168
169 self.context.merge(&other.context);
171
172 self.keys.retain(|_, dots| !dots.is_empty());
174 }
175}
176
177impl<
178 K: Hash + Eq + Clone + Serialize + DeserializeOwned + Send + 'static,
179 V: Merge + Default + Clone + Serialize + DeserializeOwned + Send + 'static,
180 > DeltaCrdt for ORMap<K, V>
181{
182 type Delta = ORMapDelta<K, V>;
183
184 fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
185 let current = self.version();
186 if since.dominates(¤t) {
187 return None;
188 }
189
190 Some(ORMapDelta {
191 keys: self.keys.clone(),
192 values: self.values.clone(),
193 context: self.context.clone(),
194 })
195 }
196
197 fn apply_delta(&mut self, delta: &Self::Delta) {
198 for (key, other_value) in &delta.values {
200 let my_value = self.values.entry(key.clone()).or_default();
201 my_value.merge(other_value);
202 }
203
204 for (key, other_dots) in &delta.keys {
206 let my_dots = self.keys.entry(key.clone()).or_default();
207 for &dot in other_dots {
208 if !self.context.has_seen(&dot) {
209 my_dots.insert(dot);
210 }
211 }
212 }
213
214 self.context.merge(&delta.context);
216
217 self.keys.retain(|_, dots| !dots.is_empty());
219 }
220
221 fn version(&self) -> VClock {
222 self.context.version()
223 }
224}
225
226impl<K: Hash + Eq + Clone, V: Merge + Default + Clone> Default for ORMap<K, V> {
227 fn default() -> Self {
228 Self::new_random()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::crdt::PNCounter;
236
237 #[test]
238 fn test_ormap_get_or_insert() {
239 let mut map: ORMap<String, PNCounter> = ORMap::new(1);
240 map.get_or_insert("score".to_string()).increment(10);
241 assert_eq!(map.get(&"score".to_string()).unwrap().value(), 10);
242 }
243
244 #[test]
245 fn test_ormap_remove() {
246 let mut map: ORMap<String, PNCounter> = ORMap::new(1);
247 map.get_or_insert("key".to_string()).increment(5);
248 map.remove(&"key".to_string());
249 assert!(map.get(&"key".to_string()).is_none());
250 }
251
252 #[test]
253 fn test_ormap_concurrent_update() {
254 let mut a: ORMap<String, PNCounter> = ORMap::new(1);
255 let mut b: ORMap<String, PNCounter> = ORMap::new(2);
256
257 a.get_or_insert("score".to_string()).increment(10);
258 b.get_or_insert("score".to_string()).increment(5);
259
260 a.merge(&b);
261 assert_eq!(a.get(&"score".to_string()).unwrap().value(), 15);
262 }
263}