Skip to main content

loro_internal/delta/
map_delta.rs

1use std::hash::Hash;
2
3use loro_common::IdLp;
4use rustc_hash::FxHashMap;
5use serde::{ser::SerializeStruct, Serialize};
6
7use crate::{
8    change::Lamport, handler::ValueOrHandler, id::PeerID, span::HasLamport, InternalString,
9    LoroValue,
10};
11
12#[derive(Default, Debug, Clone, Serialize)]
13pub struct MapDelta {
14    /// If the value is none, it's a uncreate op that should remove the entry from the
15    /// map container.
16    pub updated: FxHashMap<InternalString, Option<MapValue>>,
17}
18
19#[derive(Debug, Clone)]
20pub struct MapValue {
21    pub value: Option<LoroValue>,
22    pub lamp: Lamport,
23    pub peer: PeerID,
24}
25
26impl Ord for MapValue {
27    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28        self.lamp
29            .cmp(&other.lamp)
30            .then_with(|| self.peer.cmp(&other.peer))
31    }
32}
33
34impl PartialOrd for MapValue {
35    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36        Some(self.cmp(other))
37    }
38}
39
40impl PartialEq for MapValue {
41    fn eq(&self, other: &Self) -> bool {
42        self.lamp == other.lamp && self.peer == other.peer
43    }
44}
45
46impl Eq for MapValue {}
47
48impl MapValue {
49    pub fn idlp(&self) -> IdLp {
50        IdLp::new(self.peer, self.lamp)
51    }
52}
53
54#[derive(Default, Debug, Clone)]
55pub struct ResolvedMapDelta {
56    pub updated: FxHashMap<InternalString, ResolvedMapValue>,
57}
58
59#[derive(Debug, Clone)]
60pub struct ResolvedMapValue {
61    pub value: Option<ValueOrHandler>,
62    pub idlp: IdLp,
63}
64
65impl ResolvedMapValue {
66    /// This is used to indicate that the entry is unset. (caused by checkout to before the entry is created)
67    pub fn new_unset() -> Self {
68        ResolvedMapValue {
69            idlp: IdLp::new(PeerID::default(), Lamport::MAX),
70            value: None,
71        }
72    }
73}
74
75impl MapDelta {
76    pub(crate) fn compose(mut self, x: MapDelta) -> MapDelta {
77        for (k, v) in x.updated.into_iter() {
78            if let Some(old) = self.updated.get_mut(&k) {
79                if &v > old {
80                    *old = v;
81                }
82            } else {
83                self.updated.insert(k, v);
84            }
85        }
86        self
87    }
88
89    #[inline]
90    pub fn new() -> Self {
91        MapDelta {
92            updated: FxHashMap::default(),
93        }
94    }
95
96    #[inline]
97    pub fn with_entry(mut self, key: InternalString, map_value: MapValue) -> Self {
98        self.updated.insert(key, Some(map_value));
99        self
100    }
101}
102
103impl ResolvedMapDelta {
104    pub(crate) fn compose(mut self, x: ResolvedMapDelta) -> ResolvedMapDelta {
105        // Compose into `self` in place; cloning `self.updated` here made
106        // composing N fragments into one map event O(N^2).
107        for (k, v) in x.updated.into_iter() {
108            if let Some(old) = self.updated.get_mut(&k) {
109                if v.idlp > old.idlp {
110                    *old = v;
111                }
112            } else {
113                self.updated.insert(k, v);
114            }
115        }
116        self
117    }
118
119    #[inline]
120    pub fn new() -> Self {
121        ResolvedMapDelta {
122            updated: FxHashMap::default(),
123        }
124    }
125
126    #[inline]
127    pub fn with_entry(mut self, key: InternalString, map_value: ResolvedMapValue) -> Self {
128        self.updated.insert(key, map_value);
129        self
130    }
131
132    pub(crate) fn transform(&mut self, b: &ResolvedMapDelta, left_prior: bool) {
133        for (k, _) in b.updated.iter() {
134            if !left_prior {
135                self.updated.remove(k);
136            }
137        }
138    }
139}
140
141impl Hash for MapValue {
142    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
143        // value is not being hashed
144        self.peer.hash(state);
145        self.lamp.hash(state);
146    }
147}
148
149impl HasLamport for MapValue {
150    fn lamport(&self) -> Lamport {
151        self.lamp
152    }
153}
154
155impl Serialize for MapValue {
156    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
157    where
158        S: serde::Serializer,
159    {
160        let mut s = serializer.serialize_struct("MapValue", 2)?;
161        s.serialize_field("value", &self.value)?;
162        s.serialize_field("lamport", &self.lamp)?;
163        s.serialize_field("id", &self.idlp())?;
164        s.end()
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::hash::{Hash, Hasher};
171
172    use rustc_hash::FxHasher;
173
174    use crate::{handler::ValueOrHandler, LoroValue};
175
176    use super::*;
177
178    fn key(value: &str) -> InternalString {
179        value.into()
180    }
181
182    fn map_value(lamp: Lamport, peer: PeerID, value: i64) -> MapValue {
183        MapValue {
184            value: Some(LoroValue::I64(value)),
185            lamp,
186            peer,
187        }
188    }
189
190    fn resolved_value(lamp: Lamport, peer: PeerID, value: i64) -> ResolvedMapValue {
191        ResolvedMapValue {
192            value: Some(ValueOrHandler::Value(LoroValue::I64(value))),
193            idlp: IdLp::new(peer, lamp),
194        }
195    }
196
197    fn hash_map_value(value: &MapValue) -> u64 {
198        let mut hasher = FxHasher::default();
199        value.hash(&mut hasher);
200        hasher.finish()
201    }
202
203    #[test]
204    fn map_value_ordering_identity_and_hash_are_timestamp_based() {
205        let lower_lamport = map_value(1, 9, 10);
206        let higher_lamport = map_value(2, 1, 20);
207        let lower_peer = map_value(2, 1, 30);
208        let higher_peer = map_value(2, 2, 40);
209
210        assert!(higher_lamport > lower_lamport);
211        assert!(higher_peer > lower_peer);
212        assert_eq!(higher_peer.idlp(), IdLp::new(2, 2));
213        assert_eq!(higher_peer.lamport(), 2);
214
215        let same_id_different_value = MapValue {
216            value: Some(LoroValue::I64(999)),
217            ..higher_peer.clone()
218        };
219        assert_eq!(higher_peer, same_id_different_value);
220        assert_eq!(
221            hash_map_value(&higher_peer),
222            hash_map_value(&same_id_different_value)
223        );
224    }
225
226    #[test]
227    fn map_delta_compose_keeps_latest_timestamp_per_key() {
228        let initial = MapDelta::new()
229            .with_entry(key("same"), map_value(2, 1, 10))
230            .with_entry(key("left-only"), map_value(1, 1, 11));
231
232        let mut incoming = MapDelta::new()
233            .with_entry(key("same"), map_value(2, 2, 20))
234            .with_entry(key("right-only"), map_value(1, 1, 21));
235        incoming.updated.insert(key("unset"), None);
236
237        let composed = initial.compose(incoming);
238        assert_eq!(
239            composed
240                .updated
241                .get(&key("same"))
242                .and_then(|value| value.as_ref())
243                .and_then(|value| value.value.as_ref()),
244            Some(&LoroValue::I64(20))
245        );
246        assert!(composed.updated.contains_key(&key("left-only")));
247        assert!(composed.updated.contains_key(&key("right-only")));
248        assert_eq!(composed.updated.get(&key("unset")), Some(&None));
249    }
250
251    #[test]
252    fn map_delta_compose_does_not_let_older_entries_overwrite_newer_ones() {
253        let initial = MapDelta::new().with_entry(key("same"), map_value(5, 1, 50));
254        let incoming = MapDelta::new().with_entry(key("same"), map_value(4, 9, 40));
255
256        let composed = initial.compose(incoming);
257        let value = composed
258            .updated
259            .get(&key("same"))
260            .and_then(|value| value.as_ref())
261            .and_then(|value| value.value.as_ref());
262        assert_eq!(value, Some(&LoroValue::I64(50)));
263    }
264
265    #[test]
266    fn resolved_map_delta_compose_and_transform_apply_the_same_conflict_contract() {
267        let left = ResolvedMapDelta::new()
268            .with_entry(key("same"), resolved_value(1, 1, 10))
269            .with_entry(key("left-only"), resolved_value(1, 1, 11));
270        let right = ResolvedMapDelta::new()
271            .with_entry(key("same"), resolved_value(1, 2, 20))
272            .with_entry(key("right-only"), resolved_value(1, 1, 21));
273
274        let composed = left.compose(right.clone());
275        let value = composed.updated.get(&key("same")).unwrap();
276        assert_eq!(value.idlp, IdLp::new(2, 1));
277        assert_eq!(value.value.as_ref().unwrap().to_value(), LoroValue::I64(20));
278        assert!(composed.updated.contains_key(&key("left-only")));
279        assert!(composed.updated.contains_key(&key("right-only")));
280
281        let mut left_prior = composed.clone();
282        left_prior.transform(&right, true);
283        assert!(left_prior.updated.contains_key(&key("same")));
284
285        let mut right_prior = composed;
286        right_prior.transform(&right, false);
287        assert!(!right_prior.updated.contains_key(&key("same")));
288        assert!(!right_prior.updated.contains_key(&key("right-only")));
289        assert!(right_prior.updated.contains_key(&key("left-only")));
290    }
291
292    #[test]
293    fn resolved_unset_uses_max_lamport_to_win_against_material_values() {
294        let unset = ResolvedMapValue::new_unset();
295        let value = resolved_value(Lamport::MAX - 1, PeerID::MAX, 1);
296
297        let composed = ResolvedMapDelta::new()
298            .with_entry(key("field"), value)
299            .compose(ResolvedMapDelta::new().with_entry(key("field"), unset));
300        let entry = composed.updated.get(&key("field")).unwrap();
301        assert_eq!(entry.idlp, IdLp::new(PeerID::default(), Lamport::MAX));
302        assert!(entry.value.is_none());
303    }
304
305    #[test]
306    fn map_value_serialization_exposes_value_lamport_and_id() {
307        let serialized = serde_json::to_value(map_value(7, 8, 42)).unwrap();
308        assert_eq!(serialized["value"], serde_json::json!(42));
309        assert_eq!(serialized["lamport"], serde_json::json!(7));
310        assert_eq!(
311            serialized["id"],
312            serde_json::json!({ "peer": 8, "lamport": 7 })
313        );
314    }
315}