1use std::hash::Hash;
2
3use loro_common::IdLp;
4use rustc_hash::FxHashMap;
5use serde::{ser::SerializeStruct, Serialize};
6
7use crate::{
8 change::Lamport, handler::ValueOrHandler, id::PeerID, span::HasLamport, InternalString,
9 LoroValue,
10};
11
12#[derive(Default, Debug, Clone, Serialize)]
13pub struct MapDelta {
14 pub updated: FxHashMap<InternalString, Option<MapValue>>,
17}
18
19#[derive(Debug, Clone)]
20pub struct MapValue {
21 pub value: Option<LoroValue>,
22 pub lamp: Lamport,
23 pub peer: PeerID,
24}
25
26impl Ord for MapValue {
27 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28 self.lamp
29 .cmp(&other.lamp)
30 .then_with(|| self.peer.cmp(&other.peer))
31 }
32}
33
34impl PartialOrd for MapValue {
35 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36 Some(self.cmp(other))
37 }
38}
39
40impl PartialEq for MapValue {
41 fn eq(&self, other: &Self) -> bool {
42 self.lamp == other.lamp && self.peer == other.peer
43 }
44}
45
46impl Eq for MapValue {}
47
48impl MapValue {
49 pub fn idlp(&self) -> IdLp {
50 IdLp::new(self.peer, self.lamp)
51 }
52}
53
54#[derive(Default, Debug, Clone)]
55pub struct ResolvedMapDelta {
56 pub updated: FxHashMap<InternalString, ResolvedMapValue>,
57}
58
59#[derive(Debug, Clone)]
60pub struct ResolvedMapValue {
61 pub value: Option<ValueOrHandler>,
62 pub idlp: IdLp,
63}
64
65impl ResolvedMapValue {
66 pub fn new_unset() -> Self {
68 ResolvedMapValue {
69 idlp: IdLp::new(PeerID::default(), Lamport::MAX),
70 value: None,
71 }
72 }
73}
74
75impl MapDelta {
76 pub(crate) fn compose(mut self, x: MapDelta) -> MapDelta {
77 for (k, v) in x.updated.into_iter() {
78 if let Some(old) = self.updated.get_mut(&k) {
79 if &v > old {
80 *old = v;
81 }
82 } else {
83 self.updated.insert(k, v);
84 }
85 }
86 self
87 }
88
89 #[inline]
90 pub fn new() -> Self {
91 MapDelta {
92 updated: FxHashMap::default(),
93 }
94 }
95
96 #[inline]
97 pub fn with_entry(mut self, key: InternalString, map_value: MapValue) -> Self {
98 self.updated.insert(key, Some(map_value));
99 self
100 }
101}
102
103impl ResolvedMapDelta {
104 pub(crate) fn compose(&self, x: ResolvedMapDelta) -> ResolvedMapDelta {
105 let mut updated = self.updated.clone();
106 for (k, v) in x.updated.into_iter() {
107 if let Some(old) = updated.get_mut(&k) {
108 if v.idlp > old.idlp {
109 *old = v;
110 }
111 } else {
112 updated.insert(k, v);
113 }
114 }
115 ResolvedMapDelta { updated }
116 }
117
118 #[inline]
119 pub fn new() -> Self {
120 ResolvedMapDelta {
121 updated: FxHashMap::default(),
122 }
123 }
124
125 #[inline]
126 pub fn with_entry(mut self, key: InternalString, map_value: ResolvedMapValue) -> Self {
127 self.updated.insert(key, map_value);
128 self
129 }
130
131 pub(crate) fn transform(&mut self, b: &ResolvedMapDelta, left_prior: bool) {
132 for (k, _) in b.updated.iter() {
133 if !left_prior {
134 self.updated.remove(k);
135 }
136 }
137 }
138}
139
140impl Hash for MapValue {
141 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
142 self.peer.hash(state);
144 self.lamp.hash(state);
145 }
146}
147
148impl HasLamport for MapValue {
149 fn lamport(&self) -> Lamport {
150 self.lamp
151 }
152}
153
154impl Serialize for MapValue {
155 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
156 where
157 S: serde::Serializer,
158 {
159 let mut s = serializer.serialize_struct("MapValue", 2)?;
160 s.serialize_field("value", &self.value)?;
161 s.serialize_field("lamport", &self.lamp)?;
162 s.serialize_field("id", &self.idlp())?;
163 s.end()
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::hash::{Hash, Hasher};
170
171 use rustc_hash::FxHasher;
172
173 use crate::{handler::ValueOrHandler, LoroValue};
174
175 use super::*;
176
177 fn key(value: &str) -> InternalString {
178 value.into()
179 }
180
181 fn map_value(lamp: Lamport, peer: PeerID, value: i64) -> MapValue {
182 MapValue {
183 value: Some(LoroValue::I64(value)),
184 lamp,
185 peer,
186 }
187 }
188
189 fn resolved_value(lamp: Lamport, peer: PeerID, value: i64) -> ResolvedMapValue {
190 ResolvedMapValue {
191 value: Some(ValueOrHandler::Value(LoroValue::I64(value))),
192 idlp: IdLp::new(peer, lamp),
193 }
194 }
195
196 fn hash_map_value(value: &MapValue) -> u64 {
197 let mut hasher = FxHasher::default();
198 value.hash(&mut hasher);
199 hasher.finish()
200 }
201
202 #[test]
203 fn map_value_ordering_identity_and_hash_are_timestamp_based() {
204 let lower_lamport = map_value(1, 9, 10);
205 let higher_lamport = map_value(2, 1, 20);
206 let lower_peer = map_value(2, 1, 30);
207 let higher_peer = map_value(2, 2, 40);
208
209 assert!(higher_lamport > lower_lamport);
210 assert!(higher_peer > lower_peer);
211 assert_eq!(higher_peer.idlp(), IdLp::new(2, 2));
212 assert_eq!(higher_peer.lamport(), 2);
213
214 let same_id_different_value = MapValue {
215 value: Some(LoroValue::I64(999)),
216 ..higher_peer.clone()
217 };
218 assert_eq!(higher_peer, same_id_different_value);
219 assert_eq!(
220 hash_map_value(&higher_peer),
221 hash_map_value(&same_id_different_value)
222 );
223 }
224
225 #[test]
226 fn map_delta_compose_keeps_latest_timestamp_per_key() {
227 let initial = MapDelta::new()
228 .with_entry(key("same"), map_value(2, 1, 10))
229 .with_entry(key("left-only"), map_value(1, 1, 11));
230
231 let mut incoming = MapDelta::new()
232 .with_entry(key("same"), map_value(2, 2, 20))
233 .with_entry(key("right-only"), map_value(1, 1, 21));
234 incoming.updated.insert(key("unset"), None);
235
236 let composed = initial.compose(incoming);
237 assert_eq!(
238 composed
239 .updated
240 .get(&key("same"))
241 .and_then(|value| value.as_ref())
242 .and_then(|value| value.value.as_ref()),
243 Some(&LoroValue::I64(20))
244 );
245 assert!(composed.updated.contains_key(&key("left-only")));
246 assert!(composed.updated.contains_key(&key("right-only")));
247 assert_eq!(composed.updated.get(&key("unset")), Some(&None));
248 }
249
250 #[test]
251 fn map_delta_compose_does_not_let_older_entries_overwrite_newer_ones() {
252 let initial = MapDelta::new().with_entry(key("same"), map_value(5, 1, 50));
253 let incoming = MapDelta::new().with_entry(key("same"), map_value(4, 9, 40));
254
255 let composed = initial.compose(incoming);
256 let value = composed
257 .updated
258 .get(&key("same"))
259 .and_then(|value| value.as_ref())
260 .and_then(|value| value.value.as_ref());
261 assert_eq!(value, Some(&LoroValue::I64(50)));
262 }
263
264 #[test]
265 fn resolved_map_delta_compose_and_transform_apply_the_same_conflict_contract() {
266 let left = ResolvedMapDelta::new()
267 .with_entry(key("same"), resolved_value(1, 1, 10))
268 .with_entry(key("left-only"), resolved_value(1, 1, 11));
269 let right = ResolvedMapDelta::new()
270 .with_entry(key("same"), resolved_value(1, 2, 20))
271 .with_entry(key("right-only"), resolved_value(1, 1, 21));
272
273 let composed = left.compose(right.clone());
274 let value = composed.updated.get(&key("same")).unwrap();
275 assert_eq!(value.idlp, IdLp::new(2, 1));
276 assert_eq!(value.value.as_ref().unwrap().to_value(), LoroValue::I64(20));
277 assert!(composed.updated.contains_key(&key("left-only")));
278 assert!(composed.updated.contains_key(&key("right-only")));
279
280 let mut left_prior = composed.clone();
281 left_prior.transform(&right, true);
282 assert!(left_prior.updated.contains_key(&key("same")));
283
284 let mut right_prior = composed;
285 right_prior.transform(&right, false);
286 assert!(!right_prior.updated.contains_key(&key("same")));
287 assert!(!right_prior.updated.contains_key(&key("right-only")));
288 assert!(right_prior.updated.contains_key(&key("left-only")));
289 }
290
291 #[test]
292 fn resolved_unset_uses_max_lamport_to_win_against_material_values() {
293 let unset = ResolvedMapValue::new_unset();
294 let value = resolved_value(Lamport::MAX - 1, PeerID::MAX, 1);
295
296 let composed = ResolvedMapDelta::new()
297 .with_entry(key("field"), value)
298 .compose(ResolvedMapDelta::new().with_entry(key("field"), unset));
299 let entry = composed.updated.get(&key("field")).unwrap();
300 assert_eq!(entry.idlp, IdLp::new(PeerID::default(), Lamport::MAX));
301 assert!(entry.value.is_none());
302 }
303
304 #[test]
305 fn map_value_serialization_exposes_value_lamport_and_id() {
306 let serialized = serde_json::to_value(map_value(7, 8, 42)).unwrap();
307 assert_eq!(serialized["value"], serde_json::json!(42));
308 assert_eq!(serialized["lamport"], serde_json::json!(7));
309 assert_eq!(
310 serialized["id"],
311 serde_json::json!({ "peer": 8, "lamport": 7 })
312 );
313 }
314}