Skip to main content

loro_internal/delta/
map_delta.rs

1use std::hash::Hash;
2
3use loro_common::IdLp;
4use rustc_hash::FxHashMap;
5use serde::{ser::SerializeStruct, Serialize};
6
7use crate::{
8    change::Lamport, handler::ValueOrHandler, id::PeerID, span::HasLamport, InternalString,
9    LoroValue,
10};
11
12#[derive(Default, Debug, Clone, Serialize)]
13pub struct MapDelta {
14    /// If the value is none, it's a uncreate op that should remove the entry from the
15    /// map container.
16    pub updated: FxHashMap<InternalString, Option<MapValue>>,
17}
18
19#[derive(Debug, Clone)]
20pub struct MapValue {
21    pub value: Option<LoroValue>,
22    pub lamp: Lamport,
23    pub peer: PeerID,
24}
25
26impl Ord for MapValue {
27    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28        self.lamp
29            .cmp(&other.lamp)
30            .then_with(|| self.peer.cmp(&other.peer))
31    }
32}
33
34impl PartialOrd for MapValue {
35    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36        Some(self.cmp(other))
37    }
38}
39
40impl PartialEq for MapValue {
41    fn eq(&self, other: &Self) -> bool {
42        self.lamp == other.lamp && self.peer == other.peer
43    }
44}
45
46impl Eq for MapValue {}
47
48impl MapValue {
49    pub fn idlp(&self) -> IdLp {
50        IdLp::new(self.peer, self.lamp)
51    }
52}
53
54#[derive(Default, Debug, Clone)]
55pub struct ResolvedMapDelta {
56    pub updated: FxHashMap<InternalString, ResolvedMapValue>,
57}
58
59#[derive(Debug, Clone)]
60pub struct ResolvedMapValue {
61    pub value: Option<ValueOrHandler>,
62    pub idlp: IdLp,
63}
64
65impl ResolvedMapValue {
66    /// This is used to indicate that the entry is unset. (caused by checkout to before the entry is created)
67    pub fn new_unset() -> Self {
68        ResolvedMapValue {
69            idlp: IdLp::new(PeerID::default(), Lamport::MAX),
70            value: None,
71        }
72    }
73}
74
75impl MapDelta {
76    pub(crate) fn compose(mut self, x: MapDelta) -> MapDelta {
77        for (k, v) in x.updated.into_iter() {
78            if let Some(old) = self.updated.get_mut(&k) {
79                if &v > old {
80                    *old = v;
81                }
82            } else {
83                self.updated.insert(k, v);
84            }
85        }
86        self
87    }
88
89    #[inline]
90    pub fn new() -> Self {
91        MapDelta {
92            updated: FxHashMap::default(),
93        }
94    }
95
96    #[inline]
97    pub fn with_entry(mut self, key: InternalString, map_value: MapValue) -> Self {
98        self.updated.insert(key, Some(map_value));
99        self
100    }
101}
102
103impl ResolvedMapDelta {
104    pub(crate) fn compose(&self, x: ResolvedMapDelta) -> ResolvedMapDelta {
105        let mut updated = self.updated.clone();
106        for (k, v) in x.updated.into_iter() {
107            if let Some(old) = updated.get_mut(&k) {
108                if v.idlp > old.idlp {
109                    *old = v;
110                }
111            } else {
112                updated.insert(k, v);
113            }
114        }
115        ResolvedMapDelta { updated }
116    }
117
118    #[inline]
119    pub fn new() -> Self {
120        ResolvedMapDelta {
121            updated: FxHashMap::default(),
122        }
123    }
124
125    #[inline]
126    pub fn with_entry(mut self, key: InternalString, map_value: ResolvedMapValue) -> Self {
127        self.updated.insert(key, map_value);
128        self
129    }
130
131    pub(crate) fn transform(&mut self, b: &ResolvedMapDelta, left_prior: bool) {
132        for (k, _) in b.updated.iter() {
133            if !left_prior {
134                self.updated.remove(k);
135            }
136        }
137    }
138}
139
140impl Hash for MapValue {
141    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
142        // value is not being hashed
143        self.peer.hash(state);
144        self.lamp.hash(state);
145    }
146}
147
148impl HasLamport for MapValue {
149    fn lamport(&self) -> Lamport {
150        self.lamp
151    }
152}
153
154impl Serialize for MapValue {
155    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
156    where
157        S: serde::Serializer,
158    {
159        let mut s = serializer.serialize_struct("MapValue", 2)?;
160        s.serialize_field("value", &self.value)?;
161        s.serialize_field("lamport", &self.lamp)?;
162        s.serialize_field("id", &self.idlp())?;
163        s.end()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::hash::{Hash, Hasher};
170
171    use rustc_hash::FxHasher;
172
173    use crate::{handler::ValueOrHandler, LoroValue};
174
175    use super::*;
176
177    fn key(value: &str) -> InternalString {
178        value.into()
179    }
180
181    fn map_value(lamp: Lamport, peer: PeerID, value: i64) -> MapValue {
182        MapValue {
183            value: Some(LoroValue::I64(value)),
184            lamp,
185            peer,
186        }
187    }
188
189    fn resolved_value(lamp: Lamport, peer: PeerID, value: i64) -> ResolvedMapValue {
190        ResolvedMapValue {
191            value: Some(ValueOrHandler::Value(LoroValue::I64(value))),
192            idlp: IdLp::new(peer, lamp),
193        }
194    }
195
196    fn hash_map_value(value: &MapValue) -> u64 {
197        let mut hasher = FxHasher::default();
198        value.hash(&mut hasher);
199        hasher.finish()
200    }
201
202    #[test]
203    fn map_value_ordering_identity_and_hash_are_timestamp_based() {
204        let lower_lamport = map_value(1, 9, 10);
205        let higher_lamport = map_value(2, 1, 20);
206        let lower_peer = map_value(2, 1, 30);
207        let higher_peer = map_value(2, 2, 40);
208
209        assert!(higher_lamport > lower_lamport);
210        assert!(higher_peer > lower_peer);
211        assert_eq!(higher_peer.idlp(), IdLp::new(2, 2));
212        assert_eq!(higher_peer.lamport(), 2);
213
214        let same_id_different_value = MapValue {
215            value: Some(LoroValue::I64(999)),
216            ..higher_peer.clone()
217        };
218        assert_eq!(higher_peer, same_id_different_value);
219        assert_eq!(
220            hash_map_value(&higher_peer),
221            hash_map_value(&same_id_different_value)
222        );
223    }
224
225    #[test]
226    fn map_delta_compose_keeps_latest_timestamp_per_key() {
227        let initial = MapDelta::new()
228            .with_entry(key("same"), map_value(2, 1, 10))
229            .with_entry(key("left-only"), map_value(1, 1, 11));
230
231        let mut incoming = MapDelta::new()
232            .with_entry(key("same"), map_value(2, 2, 20))
233            .with_entry(key("right-only"), map_value(1, 1, 21));
234        incoming.updated.insert(key("unset"), None);
235
236        let composed = initial.compose(incoming);
237        assert_eq!(
238            composed
239                .updated
240                .get(&key("same"))
241                .and_then(|value| value.as_ref())
242                .and_then(|value| value.value.as_ref()),
243            Some(&LoroValue::I64(20))
244        );
245        assert!(composed.updated.contains_key(&key("left-only")));
246        assert!(composed.updated.contains_key(&key("right-only")));
247        assert_eq!(composed.updated.get(&key("unset")), Some(&None));
248    }
249
250    #[test]
251    fn map_delta_compose_does_not_let_older_entries_overwrite_newer_ones() {
252        let initial = MapDelta::new().with_entry(key("same"), map_value(5, 1, 50));
253        let incoming = MapDelta::new().with_entry(key("same"), map_value(4, 9, 40));
254
255        let composed = initial.compose(incoming);
256        let value = composed
257            .updated
258            .get(&key("same"))
259            .and_then(|value| value.as_ref())
260            .and_then(|value| value.value.as_ref());
261        assert_eq!(value, Some(&LoroValue::I64(50)));
262    }
263
264    #[test]
265    fn resolved_map_delta_compose_and_transform_apply_the_same_conflict_contract() {
266        let left = ResolvedMapDelta::new()
267            .with_entry(key("same"), resolved_value(1, 1, 10))
268            .with_entry(key("left-only"), resolved_value(1, 1, 11));
269        let right = ResolvedMapDelta::new()
270            .with_entry(key("same"), resolved_value(1, 2, 20))
271            .with_entry(key("right-only"), resolved_value(1, 1, 21));
272
273        let composed = left.compose(right.clone());
274        let value = composed.updated.get(&key("same")).unwrap();
275        assert_eq!(value.idlp, IdLp::new(2, 1));
276        assert_eq!(value.value.as_ref().unwrap().to_value(), LoroValue::I64(20));
277        assert!(composed.updated.contains_key(&key("left-only")));
278        assert!(composed.updated.contains_key(&key("right-only")));
279
280        let mut left_prior = composed.clone();
281        left_prior.transform(&right, true);
282        assert!(left_prior.updated.contains_key(&key("same")));
283
284        let mut right_prior = composed;
285        right_prior.transform(&right, false);
286        assert!(!right_prior.updated.contains_key(&key("same")));
287        assert!(!right_prior.updated.contains_key(&key("right-only")));
288        assert!(right_prior.updated.contains_key(&key("left-only")));
289    }
290
291    #[test]
292    fn resolved_unset_uses_max_lamport_to_win_against_material_values() {
293        let unset = ResolvedMapValue::new_unset();
294        let value = resolved_value(Lamport::MAX - 1, PeerID::MAX, 1);
295
296        let composed = ResolvedMapDelta::new()
297            .with_entry(key("field"), value)
298            .compose(ResolvedMapDelta::new().with_entry(key("field"), unset));
299        let entry = composed.updated.get(&key("field")).unwrap();
300        assert_eq!(entry.idlp, IdLp::new(PeerID::default(), Lamport::MAX));
301        assert!(entry.value.is_none());
302    }
303
304    #[test]
305    fn map_value_serialization_exposes_value_lamport_and_id() {
306        let serialized = serde_json::to_value(map_value(7, 8, 42)).unwrap();
307        assert_eq!(serialized["value"], serde_json::json!(42));
308        assert_eq!(serialized["lamport"], serde_json::json!(7));
309        assert_eq!(
310            serialized["id"],
311            serde_json::json!({ "peer": 8, "lamport": 7 })
312        );
313    }
314}