1use crate::{
38 error::Error,
39 id::{OpId, ReplicaId},
40 version::VersionVector,
41};
42use std::collections::HashMap;
43use std::hash::Hash;
44
45#[cfg(feature = "serde")]
46use serde::{Deserialize, Serialize};
47
48#[derive(Clone, Debug, PartialEq, Eq)]
54#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55pub enum MapOp<K, V> {
56 Set {
58 id: OpId,
60 key: K,
62 value: V,
64 },
65 Remove {
67 id: OpId,
69 key: K,
71 },
72}
73
74impl<K, V> MapOp<K, V> {
75 #[must_use]
77 pub fn id(&self) -> OpId {
78 match self {
79 MapOp::Set { id, .. } | MapOp::Remove { id, .. } => *id,
80 }
81 }
82
83 #[must_use]
85 pub fn key(&self) -> &K {
86 match self {
87 MapOp::Set { key, .. } | MapOp::Remove { key, .. } => key,
88 }
89 }
90}
91
92#[derive(Clone, Debug)]
97#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
98struct Entry<V> {
99 op_id: OpId,
102 value: Option<V>,
104}
105
106#[derive(Clone, Debug)]
112pub struct Map<K: Eq + Hash + Clone, V: Clone> {
113 replica: ReplicaId,
114 clock: u64,
115 entries: HashMap<K, Entry<V>>,
116 log: Vec<MapOp<K, V>>,
117 version: VersionVector,
118}
119
120impl<K: Eq + Hash + Clone, V: Clone> Map<K, V> {
121 #[must_use]
123 pub fn new(replica: ReplicaId) -> Self {
124 Self {
125 replica,
126 clock: 0,
127 entries: HashMap::new(),
128 log: Vec::new(),
129 version: VersionVector::new(),
130 }
131 }
132
133 #[must_use]
136 pub fn new_random() -> Self {
137 Self::new(crate::id::new_replica_id())
138 }
139
140 #[must_use]
142 pub fn replica_id(&self) -> ReplicaId {
143 self.replica
144 }
145
146 #[must_use]
148 pub fn len(&self) -> usize {
149 self.entries.values().filter(|e| e.value.is_some()).count()
150 }
151
152 #[must_use]
154 pub fn is_empty(&self) -> bool {
155 self.len() == 0
156 }
157
158 pub fn get(&self, key: &K) -> Option<&V> {
160 self.entries.get(key).and_then(|e| e.value.as_ref())
161 }
162
163 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> + '_ {
165 self.entries
166 .iter()
167 .filter_map(|(k, e)| e.value.as_ref().map(|v| (k, v)))
168 }
169
170 pub fn keys(&self) -> impl Iterator<Item = &K> + '_ {
172 self.iter().map(|(k, _)| k)
173 }
174
175 pub fn values(&self) -> impl Iterator<Item = &V> + '_ {
177 self.iter().map(|(_, v)| v)
178 }
179
180 pub fn contains_key(&self, key: &K) -> bool {
182 self.get(key).is_some()
183 }
184
185 pub fn set(&mut self, key: K, value: V) -> MapOp<K, V> {
187 self.clock = self
188 .clock
189 .checked_add(1)
190 .expect("Lamport clock overflow (>2^64 ops)");
191 let id = OpId::new(self.clock, self.replica);
192 let op = MapOp::Set {
193 id,
194 key: key.clone(),
195 value: value.clone(),
196 };
197 self.upsert(id, key, Some(value));
198 self.version.observe(id);
199 self.log.push(op.clone());
200 op
201 }
202
203 pub fn remove(&mut self, key: K) -> MapOp<K, V> {
206 self.clock = self
207 .clock
208 .checked_add(1)
209 .expect("Lamport clock overflow (>2^64 ops)");
210 let id = OpId::new(self.clock, self.replica);
211 let op = MapOp::Remove {
212 id,
213 key: key.clone(),
214 };
215 self.upsert(id, key, None);
216 self.version.observe(id);
217 self.log.push(op.clone());
218 op
219 }
220
221 pub fn apply(&mut self, op: MapOp<K, V>) -> Result<(), Error> {
223 let op_id = op.id();
224 if self.version.contains(op_id) {
225 return Ok(());
226 }
227 match &op {
228 MapOp::Set { id, key, value } => {
229 self.upsert(*id, key.clone(), Some(value.clone()));
230 }
231 MapOp::Remove { id, key } => {
232 self.upsert(*id, key.clone(), None);
233 }
234 }
235 self.version.observe(op_id);
236 self.clock = self.clock.max(op_id.counter);
237 self.log.push(op);
238 Ok(())
239 }
240
241 pub fn merge(&mut self, other: &Self) {
244 let mut to_apply: Vec<&MapOp<K, V>> = other
245 .log
246 .iter()
247 .filter(|op| !self.version.contains(op.id()))
248 .collect();
249 to_apply.sort_by_key(|op| op.id());
250 for op in to_apply {
251 self.apply(op.clone())
253 .expect("corrupt op log in merge source");
254 }
255 }
256
257 #[must_use]
259 pub fn ops(&self) -> &[MapOp<K, V>] {
260 &self.log
261 }
262
263 pub fn ops_since<'a>(
265 &'a self,
266 since: &'a VersionVector,
267 ) -> impl Iterator<Item = &'a MapOp<K, V>> + 'a {
268 self.log.iter().filter(move |op| !since.contains(op.id()))
269 }
270
271 #[must_use]
273 pub fn version(&self) -> &VersionVector {
274 &self.version
275 }
276
277 fn upsert(&mut self, id: OpId, key: K, value: Option<V>) {
283 match self.entries.get_mut(&key) {
284 Some(entry) if id <= entry.op_id => {
285 }
290 Some(entry) => {
291 entry.op_id = id;
292 entry.value = value;
293 }
294 None => {
295 self.entries.insert(key, Entry { op_id: id, value });
296 }
297 }
298 }
299}
300
301impl<K: Eq + Hash + Clone, V: Clone> Default for Map<K, V> {
302 fn default() -> Self {
303 Self::new(0)
304 }
305}
306
307#[cfg(feature = "serde")]
315#[derive(Serialize, Deserialize)]
316struct MapSnapshot<K, V> {
317 replica: ReplicaId,
318 clock: u64,
319 entries: Vec<(K, Entry<V>)>,
320 version: VersionVector,
321 log: Vec<MapOp<K, V>>,
322}
323
324#[cfg(feature = "serde")]
325impl<K, V> Serialize for Map<K, V>
326where
327 K: Eq + Hash + Clone + Serialize,
328 V: Clone + Serialize,
329{
330 fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
331 let entries: Vec<(K, Entry<V>)> = self
332 .entries
333 .iter()
334 .map(|(k, v)| (k.clone(), v.clone()))
335 .collect();
336 let snap = MapSnapshot {
337 replica: self.replica,
338 clock: self.clock,
339 entries,
340 version: self.version.clone(),
341 log: self.log.clone(),
342 };
343 snap.serialize(ser)
344 }
345}
346
347#[cfg(feature = "serde")]
348impl<'de, K, V> Deserialize<'de> for Map<K, V>
349where
350 K: Eq + Hash + Clone + Deserialize<'de>,
351 V: Clone + Deserialize<'de>,
352{
353 fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
354 let snap = MapSnapshot::<K, V>::deserialize(de)?;
355 Ok(Map {
356 replica: snap.replica,
357 clock: snap.clock,
358 entries: snap.entries.into_iter().collect(),
359 version: snap.version,
360 log: snap.log,
361 })
362 }
363}
364
365#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn empty_map() {
375 let m: Map<String, i32> = Map::new(1);
376 assert!(m.is_empty());
377 assert_eq!(m.len(), 0);
378 assert_eq!(m.get(&"k".to_string()), None);
379 }
380
381 #[test]
382 fn set_and_get() {
383 let mut m: Map<String, i32> = Map::new(1);
384 m.set("a".into(), 1);
385 m.set("b".into(), 2);
386 assert_eq!(m.get(&"a".into()), Some(&1));
387 assert_eq!(m.get(&"b".into()), Some(&2));
388 assert_eq!(m.len(), 2);
389 }
390
391 #[test]
392 fn overwrite_in_one_replica() {
393 let mut m: Map<&'static str, i32> = Map::new(1);
394 m.set("a", 1);
395 m.set("a", 2);
396 m.set("a", 3);
397 assert_eq!(m.get(&"a"), Some(&3));
398 assert_eq!(m.len(), 1);
399 }
400
401 #[test]
402 fn remove_drops_value() {
403 let mut m: Map<&'static str, i32> = Map::new(1);
404 m.set("a", 1);
405 m.remove("a");
406 assert!(!m.contains_key(&"a"));
407 assert_eq!(m.len(), 0);
408 }
409
410 #[test]
411 fn concurrent_set_lww_resolution() {
412 let mut a: Map<&'static str, i32> = Map::new(1);
413 let mut b: Map<&'static str, i32> = Map::new(2);
414
415 a.set("k", 100);
416 b.set("k", 200);
417
418 let mut a2 = a.clone();
419 a2.merge(&b);
420 let mut b2 = b.clone();
421 b2.merge(&a);
422
423 assert_eq!(a2.get(&"k"), b2.get(&"k"));
425 assert_eq!(a2.get(&"k"), Some(&200));
429 }
430
431 #[test]
432 fn set_beats_concurrent_remove_with_higher_id() {
433 let mut a: Map<&'static str, i32> = Map::new(1);
434 let mut b: Map<&'static str, i32> = Map::new(2);
435 a.set("k", 1);
436 b.merge(&a);
437
438 a.remove("k"); b.set("k", 99); let mut a2 = a.clone();
442 a2.merge(&b);
443 let mut b2 = b.clone();
444 b2.merge(&a);
445
446 assert_eq!(a2.get(&"k"), b2.get(&"k"));
447 assert_eq!(a2.get(&"k"), Some(&99));
449 }
450
451 #[test]
452 fn idempotent_apply() {
453 let mut a: Map<&'static str, i32> = Map::new(1);
454 let op1 = a.set("k", 1);
455 let op2 = a.set("j", 2);
456
457 let mut b: Map<&'static str, i32> = Map::new(2);
458 b.apply(op1.clone()).unwrap();
459 b.apply(op2.clone()).unwrap();
460 b.apply(op1).unwrap();
461 b.apply(op2).unwrap();
462
463 assert_eq!(b.len(), 2);
464 assert_eq!(b.get(&"k"), Some(&1));
465 assert_eq!(b.get(&"j"), Some(&2));
466 }
467
468 #[test]
469 fn ops_since_returns_only_unseen() {
470 let mut a: Map<&'static str, i32> = Map::new(1);
471 a.set("k", 1);
472 let v1 = a.version().clone();
473 a.set("j", 2);
474
475 let new: Vec<&MapOp<&'static str, i32>> = a.ops_since(&v1).collect();
476 assert_eq!(new.len(), 1);
477 }
478}