loro_internal/
version.rs

1mod frontiers;
2pub use frontiers::Frontiers;
3
4use crate::{
5    change::Lamport,
6    id::{Counter, ID},
7    oplog::AppDag,
8    span::{CounterSpan, IdSpan},
9    LoroError, PeerID,
10};
11use fxhash::FxHashMap;
12use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdFull, IdSpanVector};
13use serde::{Deserialize, Serialize};
14use smallvec::SmallVec;
15use std::{
16    cmp::Ordering,
17    ops::{ControlFlow, Deref, DerefMut},
18};
19
20/// [VersionVector](https://en.wikipedia.org/wiki/Version_vector)
21/// is a map from [PeerID] to [Counter]. Its a right-open interval.
22///
23/// i.e. a [VersionVector] of `{A: 1, B: 2}` means that A has 1 atomic op and B has 2 atomic ops,
24/// thus ID of `{client: A, counter: 1}` is out of the range.
25#[repr(transparent)]
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct VersionVector(FxHashMap<PeerID, Counter>);
28
29#[repr(transparent)]
30#[derive(Debug, Clone, Default, PartialEq, Eq)]
31pub struct VersionRange(pub(crate) FxHashMap<PeerID, (Counter, Counter)>);
32
33#[macro_export]
34macro_rules! version_range {
35    ($($peer:expr => ($start:expr, $end:expr)),* $(,)?) => {{
36        let mut map = ::fxhash::FxHashMap::default();
37        $(
38            map.insert($peer, ($start, $end));
39        )*
40        $crate::version::VersionRange::from_map(map)
41    }};
42}
43
44impl VersionRange {
45    pub fn new() -> Self {
46        Self(Default::default())
47    }
48
49    pub fn from_map(map: FxHashMap<PeerID, (Counter, Counter)>) -> Self {
50        Self(map)
51    }
52
53    pub fn iter(&self) -> impl Iterator<Item = (&PeerID, &(Counter, Counter))> + '_ {
54        self.0.iter()
55    }
56
57    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&PeerID, &mut (Counter, Counter))> + '_ {
58        self.0.iter_mut()
59    }
60
61    pub fn clear(&mut self) {
62        self.0.clear()
63    }
64
65    pub fn get(&self, peer: &PeerID) -> Option<&(Counter, Counter)> {
66        self.0.get(peer)
67    }
68
69    pub fn insert(&mut self, peer: PeerID, start: Counter, end: Counter) {
70        self.0.insert(peer, (start, end));
71    }
72
73    pub fn from_vv(vv: &VersionVector) -> Self {
74        let mut ans = Self::new();
75        for (peer, counter) in vv.iter() {
76            ans.insert(*peer, 0, *counter);
77        }
78        ans
79    }
80
81    pub fn contains_ops_between(&self, vv_a: &VersionVector, vv_b: &VersionVector) -> bool {
82        for span in vv_a.sub_iter(vv_b) {
83            if !self.contains_id_span(IdSpan::new(
84                span.peer,
85                span.counter.start.saturating_sub(1),
86                span.counter.end,
87            )) {
88                return false;
89            }
90        }
91
92        for span in vv_b.sub_iter(vv_a) {
93            if !self.contains_id_span(IdSpan::new(
94                span.peer,
95                span.counter.start.saturating_sub(1),
96                span.counter.end,
97            )) {
98                return false;
99            }
100        }
101
102        true
103    }
104
105    pub fn has_overlap_with(&self, mut span: IdSpan) -> bool {
106        span.normalize_();
107        if let Some((start, end)) = self.get(&span.peer) {
108            start < &span.counter.end && end > &span.counter.start
109        } else {
110            false
111        }
112    }
113
114    pub fn contains_id(&self, id: ID) -> bool {
115        if let Some((start, end)) = self.get(&id.peer) {
116            start <= &id.counter && end > &id.counter
117        } else {
118            false
119        }
120    }
121
122    pub fn contains_id_span(&self, mut span: IdSpan) -> bool {
123        span.normalize_();
124        if let Some((start, end)) = self.get(&span.peer) {
125            start <= &span.counter.start && end >= &span.counter.end
126        } else {
127            false
128        }
129    }
130
131    pub fn extends_to_include_id_span(&mut self, mut span: IdSpan) {
132        span.normalize_();
133        if let Some((start, end)) = self.0.get_mut(&span.peer) {
134            *start = (*start).min(span.counter.start);
135            *end = (*end).max(span.counter.end);
136        } else {
137            self.insert(span.peer, span.counter.start, span.counter.end);
138        }
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.0.is_empty()
143    }
144
145    pub fn inner(&self) -> &FxHashMap<PeerID, (Counter, Counter)> {
146        &self.0
147    }
148}
149
150/// Immutable version vector
151///
152/// It has O(1) clone time and O(logN) insert/delete/lookup time.
153///
154/// It's more memory efficient than [VersionVector] when the version vector
155/// can be created from cloning and modifying other similar version vectors.
156#[repr(transparent)]
157#[derive(Debug, Clone, Default, Serialize, Deserialize)]
158pub struct ImVersionVector(im::HashMap<PeerID, Counter, fxhash::FxBuildHasher>);
159
160impl ImVersionVector {
161    pub fn new() -> Self {
162        Self(Default::default())
163    }
164
165    pub fn clear(&mut self) {
166        self.0.clear()
167    }
168
169    pub fn get(&self, key: &PeerID) -> Option<&Counter> {
170        self.0.get(key)
171    }
172
173    pub fn get_mut(&mut self, key: &PeerID) -> Option<&mut Counter> {
174        self.0.get_mut(key)
175    }
176
177    pub fn insert(&mut self, k: PeerID, v: Counter) {
178        self.0.insert(k, v);
179    }
180
181    pub fn is_empty(&self) -> bool {
182        self.0.is_empty()
183    }
184
185    pub fn iter(&self) -> im::hashmap::Iter<'_, PeerID, Counter> {
186        self.0.iter()
187    }
188
189    pub fn remove(&mut self, k: &PeerID) -> Option<Counter> {
190        self.0.remove(k)
191    }
192
193    pub fn len(&self) -> usize {
194        self.0.len()
195    }
196
197    pub fn contains_key(&self, k: &PeerID) -> bool {
198        self.0.contains_key(k)
199    }
200
201    pub fn encode(&self) -> Vec<u8> {
202        postcard::to_allocvec(self).unwrap()
203    }
204
205    pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
206        let vv = VersionVector::decode(bytes)?;
207        Ok(Self::from_vv(&vv))
208    }
209
210    pub fn to_vv(&self) -> VersionVector {
211        VersionVector(self.0.iter().map(|(&k, &v)| (k, v)).collect())
212    }
213
214    pub fn from_vv(vv: &VersionVector) -> Self {
215        ImVersionVector(vv.0.iter().map(|(&k, &v)| (k, v)).collect())
216    }
217
218    pub fn extend_to_include_vv<'a>(
219        &mut self,
220        vv: impl Iterator<Item = (&'a PeerID, &'a Counter)>,
221    ) {
222        for (&client_id, &counter) in vv {
223            if let Some(my_counter) = self.0.get_mut(&client_id) {
224                if *my_counter < counter {
225                    *my_counter = counter;
226                }
227            } else {
228                self.0.insert(client_id, counter);
229            }
230        }
231    }
232
233    #[inline]
234    pub fn merge(&mut self, other: &Self) {
235        self.extend_to_include_vv(other.0.iter());
236    }
237
238    #[inline]
239    pub fn merge_vv(&mut self, other: &VersionVector) {
240        self.extend_to_include_vv(other.0.iter());
241    }
242
243    #[inline]
244    pub fn set_last(&mut self, id: ID) {
245        self.0.insert(id.peer, id.counter + 1);
246    }
247
248    pub fn extend_to_include_last_id(&mut self, id: ID) {
249        if let Some(counter) = self.0.get_mut(&id.peer) {
250            if *counter <= id.counter {
251                *counter = id.counter + 1;
252            }
253        } else {
254            self.set_last(id)
255        }
256    }
257
258    pub(crate) fn includes_id(&self, x: ID) -> bool {
259        if self.is_empty() {
260            return false;
261        }
262
263        self.get(&x.peer).copied().unwrap_or(0) > x.counter
264    }
265}
266
267impl PartialEq for VersionVector {
268    fn eq(&self, other: &Self) -> bool {
269        self.iter()
270            .all(|(client, counter)| other.get(client).unwrap_or(&0) == counter)
271            && other
272                .iter()
273                .all(|(client, counter)| self.get(client).unwrap_or(&0) == counter)
274    }
275}
276
277impl Eq for VersionVector {}
278
279impl PartialEq for ImVersionVector {
280    fn eq(&self, other: &Self) -> bool {
281        self.0
282            .iter()
283            .all(|(client, counter)| other.0.get(client).unwrap_or(&0) == counter)
284            && other
285                .0
286                .iter()
287                .all(|(client, counter)| self.0.get(client).unwrap_or(&0) == counter)
288    }
289}
290
291impl Eq for ImVersionVector {}
292
293impl Deref for VersionVector {
294    type Target = FxHashMap<PeerID, Counter>;
295
296    fn deref(&self) -> &Self::Target {
297        &self.0
298    }
299}
300
301#[derive(Default, Debug, PartialEq, Eq)]
302pub struct VersionVectorDiff {
303    /// The spans that the `left` side needs to retreat to reach the `right` side
304    ///
305    /// these spans are included in the left, but not in the right
306    pub retreat: IdSpanVector,
307    /// The spans that the `left` side needs to forward to reach the `right` side
308    ///
309    /// these spans are included in the right, but not in the left
310    pub forward: IdSpanVector,
311}
312
313impl VersionVectorDiff {
314    #[inline]
315    pub fn merge_left(&mut self, span: IdSpan) {
316        merge(&mut self.retreat, span);
317    }
318
319    #[inline]
320    pub fn merge_right(&mut self, span: IdSpan) {
321        merge(&mut self.forward, span);
322    }
323
324    #[inline]
325    pub fn subtract_start_left(&mut self, span: IdSpan) {
326        subtract_start(&mut self.retreat, span);
327    }
328
329    #[inline]
330    pub fn subtract_start_right(&mut self, span: IdSpan) {
331        subtract_start(&mut self.forward, span);
332    }
333
334    pub fn get_id_spans_left(&self) -> impl Iterator<Item = IdSpan> + '_ {
335        self.retreat.iter().map(|(peer, span)| IdSpan {
336            peer: *peer,
337            counter: *span,
338        })
339    }
340
341    pub fn get_id_spans_right(&self) -> impl Iterator<Item = IdSpan> + '_ {
342        self.forward.iter().map(|(peer, span)| IdSpan {
343            peer: *peer,
344            counter: *span,
345        })
346    }
347}
348
349fn subtract_start(m: &mut FxHashMap<PeerID, CounterSpan>, target: IdSpan) {
350    if let Some(span) = m.get_mut(&target.peer) {
351        if span.start < target.counter.end {
352            span.start = target.counter.end;
353        }
354    }
355}
356
357fn merge(m: &mut FxHashMap<PeerID, CounterSpan>, mut target: IdSpan) {
358    target.normalize_();
359    if let Some(span) = m.get_mut(&target.peer) {
360        span.start = span.start.min(target.counter.start);
361        span.end = span.end.max(target.counter.end);
362    } else {
363        m.insert(target.peer, target.counter);
364    }
365}
366
367impl PartialOrd for VersionVector {
368    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
369        let mut self_greater = true;
370        let mut other_greater = true;
371        let mut eq = true;
372        for (client_id, other_end) in other.iter() {
373            if let Some(self_end) = self.get(client_id) {
374                if self_end < other_end {
375                    self_greater = false;
376                    eq = false;
377                }
378                if self_end > other_end {
379                    other_greater = false;
380                    eq = false;
381                }
382            } else if *other_end > 0 {
383                self_greater = false;
384                eq = false;
385            }
386        }
387
388        for (client_id, self_end) in self.iter() {
389            if other.contains_key(client_id) {
390                continue;
391            } else if *self_end > 0 {
392                other_greater = false;
393                eq = false;
394            }
395        }
396
397        if eq {
398            Some(Ordering::Equal)
399        } else if self_greater {
400            Some(Ordering::Greater)
401        } else if other_greater {
402            Some(Ordering::Less)
403        } else {
404            None
405        }
406    }
407}
408
409impl PartialOrd for ImVersionVector {
410    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
411        let mut self_greater = true;
412        let mut other_greater = true;
413        let mut eq = true;
414        for (client_id, other_end) in other.iter() {
415            if let Some(self_end) = self.get(client_id) {
416                if self_end < other_end {
417                    self_greater = false;
418                    eq = false;
419                }
420                if self_end > other_end {
421                    other_greater = false;
422                    eq = false;
423                }
424            } else if *other_end > 0 {
425                self_greater = false;
426                eq = false;
427            }
428        }
429
430        for (client_id, self_end) in self.iter() {
431            if other.contains_key(client_id) {
432                continue;
433            } else if *self_end > 0 {
434                other_greater = false;
435                eq = false;
436            }
437        }
438
439        if eq {
440            Some(Ordering::Equal)
441        } else if self_greater {
442            Some(Ordering::Greater)
443        } else if other_greater {
444            Some(Ordering::Less)
445        } else {
446            None
447        }
448    }
449}
450
451impl DerefMut for VersionVector {
452    fn deref_mut(&mut self) -> &mut Self::Target {
453        &mut self.0
454    }
455}
456
457impl VersionVector {
458    pub fn diff(&self, rhs: &Self) -> VersionVectorDiff {
459        let mut ans: VersionVectorDiff = Default::default();
460        for (client_id, &counter) in self.iter() {
461            if let Some(&rhs_counter) = rhs.get(client_id) {
462                match counter.cmp(&rhs_counter) {
463                    Ordering::Less => {
464                        ans.forward.insert(
465                            *client_id,
466                            CounterSpan {
467                                start: counter,
468                                end: rhs_counter,
469                            },
470                        );
471                    }
472                    Ordering::Greater => {
473                        ans.retreat.insert(
474                            *client_id,
475                            CounterSpan {
476                                start: rhs_counter,
477                                end: counter,
478                            },
479                        );
480                    }
481                    Ordering::Equal => {}
482                }
483            } else {
484                ans.retreat.insert(
485                    *client_id,
486                    CounterSpan {
487                        start: 0,
488                        end: counter,
489                    },
490                );
491            }
492        }
493        for (client_id, &rhs_counter) in rhs.iter() {
494            if !self.contains_key(client_id) {
495                ans.forward.insert(
496                    *client_id,
497                    CounterSpan {
498                        start: 0,
499                        end: rhs_counter,
500                    },
501                );
502            }
503        }
504
505        ans
506    }
507
508    /// Returns two iterators that cover the differences between two version vectors.
509    ///
510    /// - The first iterator contains the spans that are in `self` but not in `rhs`
511    /// - The second iterator contains the spans that are in `rhs` but not in `self`
512    pub fn diff_iter<'a>(
513        &'a self,
514        rhs: &'a Self,
515    ) -> (
516        impl Iterator<Item = IdSpan> + 'a,
517        impl Iterator<Item = IdSpan> + 'a,
518    ) {
519        (self.sub_iter(rhs), rhs.sub_iter(self))
520    }
521
522    /// Returns the spans that are in `self` but not in `rhs`
523    pub fn sub_iter<'a>(&'a self, rhs: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
524        self.iter().filter_map(move |(peer, &counter)| {
525            if let Some(&rhs_counter) = rhs.get(peer) {
526                if counter > rhs_counter {
527                    Some(IdSpan {
528                        peer: *peer,
529                        counter: CounterSpan {
530                            start: rhs_counter,
531                            end: counter,
532                        },
533                    })
534                } else {
535                    None
536                }
537            } else if counter > 0 {
538                Some(IdSpan {
539                    peer: *peer,
540                    counter: CounterSpan {
541                        start: 0,
542                        end: counter,
543                    },
544                })
545            } else {
546                None
547            }
548        })
549    }
550
551    /// Returns the spans that are in `self` but not in `rhs`
552    pub fn sub_iter_im<'a>(
553        &'a self,
554        rhs: &'a ImVersionVector,
555    ) -> impl Iterator<Item = IdSpan> + 'a {
556        self.iter().filter_map(move |(peer, &counter)| {
557            if let Some(&rhs_counter) = rhs.get(peer) {
558                if counter > rhs_counter {
559                    Some(IdSpan {
560                        peer: *peer,
561                        counter: CounterSpan {
562                            start: rhs_counter,
563                            end: counter,
564                        },
565                    })
566                } else {
567                    None
568                }
569            } else if counter > 0 {
570                Some(IdSpan {
571                    peer: *peer,
572                    counter: CounterSpan {
573                        start: 0,
574                        end: counter,
575                    },
576                })
577            } else {
578                None
579            }
580        })
581    }
582
583    /// Iter all span from a -> b and b -> a
584    pub fn iter_between<'a>(&'a self, other: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
585        // PERF: can be optimized a little
586        self.sub_iter(other).chain(other.sub_iter(self))
587    }
588
589    pub fn sub_vec(&self, rhs: &Self) -> IdSpanVector {
590        self.sub_iter(rhs).map(|x| (x.peer, x.counter)).collect()
591    }
592
593    pub fn distance_between(&self, other: &Self) -> usize {
594        let mut ans = 0;
595        for (client_id, &counter) in self.iter() {
596            if let Some(&other_counter) = other.get(client_id) {
597                ans += (counter - other_counter).abs();
598            } else if counter > 0 {
599                ans += counter;
600            }
601        }
602
603        for (client_id, &counter) in other.iter() {
604            if !self.contains_key(client_id) {
605                ans += counter;
606            }
607        }
608
609        ans as usize
610    }
611
612    pub fn to_spans(&self) -> IdSpanVector {
613        self.iter()
614            .map(|(client_id, &counter)| {
615                (
616                    *client_id,
617                    CounterSpan {
618                        start: 0,
619                        end: counter,
620                    },
621                )
622            })
623            .collect()
624    }
625
626    #[inline]
627    pub fn get_frontiers(&self) -> Frontiers {
628        self.iter()
629            .filter_map(|(client_id, &counter)| {
630                if counter > 0 {
631                    Some(ID {
632                        peer: *client_id,
633                        counter: counter - 1,
634                    })
635                } else {
636                    None
637                }
638            })
639            .collect()
640    }
641
642    #[inline]
643    pub fn new() -> Self {
644        Self(Default::default())
645    }
646
647    /// set the inclusive ending point. target id will be included by self
648    #[inline]
649    pub fn set_last(&mut self, id: ID) {
650        self.0.insert(id.peer, id.counter + 1);
651    }
652
653    #[inline]
654    pub fn get_last(&self, client_id: PeerID) -> Option<Counter> {
655        self.0
656            .get(&client_id)
657            .and_then(|&x| if x == 0 { None } else { Some(x - 1) })
658    }
659
660    /// set the exclusive ending point. target id will NOT be included by self
661    #[inline]
662    pub fn set_end(&mut self, id: ID) {
663        if id.counter <= 0 {
664            self.0.remove(&id.peer);
665        } else {
666            self.0.insert(id.peer, id.counter);
667        }
668    }
669
670    /// Update the end counter of the given client if the end is greater.
671    /// Return whether updated
672    #[inline]
673    pub fn try_update_last(&mut self, id: ID) -> bool {
674        if let Some(end) = self.0.get_mut(&id.peer) {
675            if *end < id.counter + 1 {
676                *end = id.counter + 1;
677                true
678            } else {
679                false
680            }
681        } else {
682            self.0.insert(id.peer, id.counter + 1);
683            true
684        }
685    }
686
687    pub fn get_missing_span(&self, target: &Self) -> Vec<IdSpan> {
688        let mut ans = vec![];
689        for (client_id, other_end) in target.iter() {
690            if let Some(my_end) = self.get(client_id) {
691                if my_end < other_end {
692                    ans.push(IdSpan::new(*client_id, *my_end, *other_end));
693                }
694            } else {
695                ans.push(IdSpan::new(*client_id, 0, *other_end));
696            }
697        }
698
699        ans
700    }
701
702    pub fn merge(&mut self, other: &Self) {
703        for (&client_id, &other_end) in other.iter() {
704            if let Some(my_end) = self.get_mut(&client_id) {
705                if *my_end < other_end {
706                    *my_end = other_end;
707                }
708            } else {
709                self.0.insert(client_id, other_end);
710            }
711        }
712    }
713
714    pub fn includes_vv(&self, other: &VersionVector) -> bool {
715        match self.partial_cmp(other) {
716            Some(ord) => match ord {
717                Ordering::Less => false,
718                Ordering::Equal => true,
719                Ordering::Greater => true,
720            },
721            None => false,
722        }
723    }
724
725    pub fn includes_id(&self, id: ID) -> bool {
726        if let Some(end) = self.get(&id.peer) {
727            if *end > id.counter {
728                return true;
729            }
730        }
731        false
732    }
733
734    pub fn intersect_span(&self, target: IdSpan) -> Option<CounterSpan> {
735        if let Some(&end) = self.get(&target.peer) {
736            if end > target.ctr_start() {
737                let count_end = target.ctr_end();
738                return Some(CounterSpan {
739                    start: target.ctr_start(),
740                    end: end.min(count_end),
741                });
742            }
743        }
744
745        None
746    }
747
748    pub fn extend_to_include_vv<'a>(
749        &mut self,
750        vv: impl Iterator<Item = (&'a PeerID, &'a Counter)>,
751    ) {
752        for (&client_id, &counter) in vv {
753            if let Some(my_counter) = self.get_mut(&client_id) {
754                if *my_counter < counter {
755                    *my_counter = counter;
756                }
757            } else {
758                self.0.insert(client_id, counter);
759            }
760        }
761    }
762
763    pub fn extend_to_include_last_id(&mut self, id: ID) {
764        if let Some(counter) = self.get_mut(&id.peer) {
765            if *counter <= id.counter {
766                *counter = id.counter + 1;
767            }
768        } else {
769            self.set_last(id)
770        }
771    }
772
773    pub fn extend_to_include_end_id(&mut self, id: ID) {
774        if let Some(counter) = self.get_mut(&id.peer) {
775            if *counter < id.counter {
776                *counter = id.counter;
777            }
778        } else {
779            self.set_end(id)
780        }
781    }
782
783    pub fn extend_to_include(&mut self, span: IdSpan) {
784        if let Some(counter) = self.get_mut(&span.peer) {
785            if *counter < span.counter.norm_end() {
786                *counter = span.counter.norm_end();
787            }
788        } else {
789            self.insert(span.peer, span.counter.norm_end());
790        }
791    }
792
793    pub fn shrink_to_exclude(&mut self, span: IdSpan) {
794        if span.counter.min() == 0 {
795            self.remove(&span.peer);
796            return;
797        }
798
799        if let Some(counter) = self.get_mut(&span.peer) {
800            if *counter > span.counter.min() {
801                *counter = span.counter.min();
802            }
803        }
804    }
805
806    pub fn forward(&mut self, spans: &IdSpanVector) {
807        for span in spans.iter() {
808            self.extend_to_include(IdSpan {
809                peer: *span.0,
810                counter: *span.1,
811            });
812        }
813    }
814
815    pub fn retreat(&mut self, spans: &IdSpanVector) {
816        for span in spans.iter() {
817            self.shrink_to_exclude(IdSpan {
818                peer: *span.0,
819                counter: *span.1,
820            });
821        }
822    }
823
824    pub fn intersection(&self, other: &VersionVector) -> VersionVector {
825        let mut ans = VersionVector::new();
826        for (client_id, &counter) in self.iter() {
827            if let Some(&other_counter) = other.get(client_id) {
828                if counter < other_counter {
829                    if counter != 0 {
830                        ans.insert(*client_id, counter);
831                    }
832                } else if other_counter != 0 {
833                    ans.insert(*client_id, other_counter);
834                }
835            }
836        }
837        ans
838    }
839
840    #[inline(always)]
841    pub fn encode(&self) -> Vec<u8> {
842        postcard::to_allocvec(self).unwrap()
843    }
844
845    #[inline(always)]
846    pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
847        postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError)
848    }
849
850    pub(crate) fn trim(&self, vv: &VersionVector) -> VersionVector {
851        let mut ans = VersionVector::new();
852        for (client_id, &counter) in self.iter() {
853            if let Some(&other_counter) = vv.get(client_id) {
854                ans.insert(*client_id, counter.min(other_counter));
855            }
856        }
857        ans
858    }
859
860    pub fn to_im_vv(&self) -> ImVersionVector {
861        ImVersionVector(self.0.iter().map(|(&k, &v)| (k, v)).collect())
862    }
863
864    pub fn from_im_vv(im_vv: &ImVersionVector) -> Self {
865        VersionVector(im_vv.0.iter().map(|(&k, &v)| (k, v)).collect())
866    }
867}
868
869/// Use minimal set of ids to represent the frontiers
870#[tracing::instrument(skip(dag))]
871pub fn shrink_frontiers(last_ids: &Frontiers, dag: &AppDag) -> Result<Frontiers, ID> {
872    // it only keep the ids of ops that are concurrent to each other
873
874    if last_ids.len() <= 1 {
875        return Ok(last_ids.clone());
876    }
877
878    let mut last_ids = {
879        let ids = filter_duplicated_peer_id(last_ids);
880        if last_ids.len() == 1 {
881            let mut frontiers = Frontiers::default();
882            frontiers.push(last_ids.as_single().unwrap());
883            return Ok(frontiers);
884        }
885
886        let mut last_ids = Vec::with_capacity(ids.len());
887        for id in ids {
888            let Some(lamport) = dag.get_lamport(&id) else {
889                return Err(id);
890            };
891            last_ids.push(IdFull::new(id.peer, id.counter, lamport))
892        }
893
894        last_ids
895    };
896
897    let mut frontiers = Vec::new();
898    // Iterate from the greatest lamport to the smallest
899    last_ids.sort_by_key(|x| x.lamport);
900    for id in last_ids.iter().rev() {
901        let mut should_insert = true;
902        let mut len = 0;
903        // travel backward because they have more similar lamport
904        for f_id in frontiers.iter().rev() {
905            dag.travel_ancestors(*f_id, &mut |x| {
906                len += 1;
907                if x.contains_id(id.id()) {
908                    should_insert = false;
909                    ControlFlow::Break(())
910                } else if x.lamport_last() < id.lamport {
911                    // Already travel to a node with smaller lamport, no need to continue, we are sure two ops are concurrent now
912                    ControlFlow::Break(())
913                } else {
914                    ControlFlow::Continue(())
915                }
916            });
917        }
918
919        if should_insert {
920            frontiers.push(id.id());
921        }
922    }
923
924    Ok(frontiers.into())
925}
926
927fn filter_duplicated_peer_id(last_ids: &Frontiers) -> Vec<ID> {
928    let mut peer_max_counters = FxHashMap::default();
929    for id in last_ids.iter() {
930        let counter = peer_max_counters.entry(id.peer).or_insert(id.counter);
931        if id.counter > *counter {
932            *counter = id.counter;
933        }
934    }
935
936    peer_max_counters
937        .into_iter()
938        .map(|(peer, counter)| ID::new(peer, counter))
939        .collect()
940}
941
942impl Default for VersionVector {
943    fn default() -> Self {
944        Self::new()
945    }
946}
947
948impl From<FxHashMap<PeerID, Counter>> for VersionVector {
949    fn from(map: FxHashMap<PeerID, Counter>) -> Self {
950        let mut im_map = FxHashMap::default();
951        for (client_id, counter) in map {
952            im_map.insert(client_id, counter);
953        }
954        Self(im_map)
955    }
956}
957
958impl From<Vec<ID>> for VersionVector {
959    fn from(vec: Vec<ID>) -> Self {
960        let mut vv = VersionVector::new();
961        for id in vec {
962            vv.set_last(id);
963        }
964
965        vv
966    }
967}
968
969impl FromIterator<ID> for VersionVector {
970    fn from_iter<T: IntoIterator<Item = ID>>(iter: T) -> Self {
971        let iter = iter.into_iter();
972        let mut vv = VersionVector(FxHashMap::with_capacity_and_hasher(
973            iter.size_hint().0,
974            Default::default(),
975        ));
976        for id in iter {
977            vv.set_last(id);
978        }
979
980        vv
981    }
982}
983
984impl FromIterator<(PeerID, Counter)> for VersionVector {
985    fn from_iter<T: IntoIterator<Item = (PeerID, Counter)>>(iter: T) -> Self {
986        VersionVector(FxHashMap::from_iter(iter))
987    }
988}
989
990// Note: It will be encoded into binary format, so the order of its fields should not be changed.
991#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord, Serialize, Deserialize)]
992pub(crate) struct TotalOrderStamp {
993    pub(crate) lamport: Lamport,
994    pub(crate) client_id: PeerID,
995}
996
997pub fn are_frontiers_eq(a: &[ID], b: &[ID]) -> bool {
998    if a.len() != b.len() {
999        return false;
1000    }
1001
1002    let mut a: SmallVec<[ID; 10]> = a.into();
1003    let mut b: SmallVec<[ID; 10]> = b.into();
1004
1005    a.sort();
1006    b.sort();
1007
1008    a == b
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    #![allow(clippy::neg_cmp_op_on_partial_ord)]
1014    use super::*;
1015    mod cmp {
1016        use super::*;
1017        #[test]
1018        fn test() {
1019            let a: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1020            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1021            assert_eq!(a.partial_cmp(&b), Some(Ordering::Equal));
1022            assert!(a == b);
1023
1024            let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 1)].into();
1025            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1026            assert_eq!(a.partial_cmp(&b), None);
1027
1028            assert!(!(a > b));
1029            assert!(!(b > a));
1030            assert!(!(b == a));
1031
1032            let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 3)].into();
1033            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1034            assert_eq!(a.partial_cmp(&b), Some(Ordering::Greater));
1035            assert!(a > b);
1036            assert!(a >= b);
1037
1038            let a: VersionVector = vec![ID::new(1, 0), ID::new(2, 2)].into();
1039            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1040            assert_eq!(a.partial_cmp(&b), Some(Ordering::Less));
1041            assert!(a < b);
1042            assert!(a <= b);
1043        }
1044    }
1045
1046    #[test]
1047    fn im() {
1048        let mut a = VersionVector::new();
1049        a.set_last(ID::new(1, 1));
1050        a.set_last(ID::new(2, 1));
1051        let mut b = a.clone();
1052        b.merge(&vec![ID::new(1, 2), ID::new(2, 2)].into());
1053        assert!(a != b);
1054        assert_eq!(a.get(&1), Some(&2));
1055        assert_eq!(a.get(&2), Some(&2));
1056        assert_eq!(b.get(&1), Some(&3));
1057        assert_eq!(b.get(&2), Some(&3));
1058    }
1059
1060    #[test]
1061    fn field_order() {
1062        let tos = TotalOrderStamp {
1063            lamport: 0,
1064            client_id: 1,
1065        };
1066        let buf = vec![0, 1];
1067        assert_eq!(postcard::from_bytes::<TotalOrderStamp>(&buf).unwrap(), tos);
1068    }
1069
1070    #[test]
1071    fn test_encode_decode_im_version_vector() {
1072        let vv = VersionVector::from_iter([(1, 1), (2, 2), (3, 3)]);
1073        let im_vv = vv.to_im_vv();
1074        let decoded_vv = VersionVector::from_im_vv(&im_vv);
1075        assert_eq!(vv, decoded_vv);
1076    }
1077
1078    #[test]
1079    fn test_version_vector_encoding_decoding() {
1080        let mut vv = VersionVector::new();
1081        vv.insert(1, 10);
1082        vv.insert(2, 20);
1083        vv.insert(3, 30);
1084
1085        // Encode VersionVector
1086        let encoded = vv.encode();
1087
1088        // Decode to ImVersionVector
1089        let decoded_im_vv = ImVersionVector::decode(&encoded).unwrap();
1090
1091        // Convert VersionVector to ImVersionVector for comparison
1092        let im_vv = vv.to_im_vv();
1093
1094        // Compare the original ImVersionVector with the decoded one
1095        assert_eq!(im_vv, decoded_im_vv);
1096
1097        // Convert back to VersionVector and compare
1098        let decoded_vv = VersionVector::from_im_vv(&decoded_im_vv);
1099        assert_eq!(vv, decoded_vv);
1100    }
1101}