Skip to main content

loro_internal/
version.rs

1mod frontiers;
2pub use frontiers::Frontiers;
3
4use crate::{
5    id::{Counter, ID},
6    oplog::AppDag,
7    span::{CounterSpan, IdSpan},
8    LoroError, PeerID,
9};
10use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdFull, IdSpanVector};
11use rustc_hash::FxHashMap;
12use serde::{Deserialize, Serialize};
13use smallvec::SmallVec;
14use std::{
15    cmp::Ordering,
16    ops::{ControlFlow, Deref, DerefMut},
17};
18
19/// [VersionVector](https://en.wikipedia.org/wiki/Version_vector)
20/// is a map from [PeerID] to [Counter]. Its a right-open interval.
21///
22/// i.e. a [VersionVector] of `{A: 1, B: 2}` means that A has 1 atomic op and B has 2 atomic ops,
23/// thus ID of `{client: A, counter: 1}` is out of the range.
24//
25// NOTE: though normally it doesn't make sense to have an entry with counter = 0, but it can be 0 in certain cases.
26// Like, when using start_vv and end_vv to mark the version range, start_vv may have an entry with counter = 0.
27#[repr(transparent)]
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct VersionVector(FxHashMap<PeerID, Counter>);
30
31#[repr(transparent)]
32#[derive(Debug, Clone, Default, PartialEq, Eq)]
33pub struct VersionRange(pub(crate) FxHashMap<PeerID, (Counter, Counter)>);
34
35#[macro_export]
36macro_rules! version_range {
37    ($($peer:expr => ($start:expr, $end:expr)),* $(,)?) => {{
38        let mut map = ::rustc_hash::FxHashMap::default();
39        $(
40            map.insert($peer, ($start, $end));
41        )*
42        $crate::version::VersionRange::from_map(map)
43    }};
44}
45
46impl VersionRange {
47    pub fn new() -> Self {
48        Self(Default::default())
49    }
50
51    pub fn from_map(map: FxHashMap<PeerID, (Counter, Counter)>) -> Self {
52        Self(map)
53    }
54
55    pub fn iter(&self) -> impl Iterator<Item = (&PeerID, &(Counter, Counter))> + '_ {
56        self.0.iter()
57    }
58
59    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&PeerID, &mut (Counter, Counter))> + '_ {
60        self.0.iter_mut()
61    }
62
63    pub fn clear(&mut self) {
64        self.0.clear()
65    }
66
67    pub fn get(&self, peer: &PeerID) -> Option<&(Counter, Counter)> {
68        self.0.get(peer)
69    }
70
71    pub fn insert(&mut self, peer: PeerID, start: Counter, end: Counter) {
72        self.0.insert(peer, (start, end));
73    }
74
75    pub fn from_vv(vv: &VersionVector) -> Self {
76        let mut ans = Self::new();
77        for (peer, counter) in vv.iter() {
78            let counter = normalize_vv_counter(*counter);
79            if counter > 0 {
80                ans.insert(*peer, 0, counter);
81            }
82        }
83        ans
84    }
85
86    pub fn contains_ops_between(&self, vv_a: &VersionVector, vv_b: &VersionVector) -> bool {
87        for span in vv_a.sub_iter(vv_b) {
88            if !self.contains_id_span(IdSpan::new(
89                span.peer,
90                span.counter.start.saturating_sub(1),
91                span.counter.end,
92            )) {
93                return false;
94            }
95        }
96
97        for span in vv_b.sub_iter(vv_a) {
98            if !self.contains_id_span(IdSpan::new(
99                span.peer,
100                span.counter.start.saturating_sub(1),
101                span.counter.end,
102            )) {
103                return false;
104            }
105        }
106
107        true
108    }
109
110    pub fn has_overlap_with(&self, mut span: IdSpan) -> bool {
111        span.normalize_();
112        if let Some((start, end)) = self.get(&span.peer) {
113            start < &span.counter.end && end > &span.counter.start
114        } else {
115            false
116        }
117    }
118
119    pub fn contains_id(&self, id: ID) -> bool {
120        if let Some((start, end)) = self.get(&id.peer) {
121            start <= &id.counter && end > &id.counter
122        } else {
123            false
124        }
125    }
126
127    pub fn contains_id_span(&self, mut span: IdSpan) -> bool {
128        span.normalize_();
129        if let Some((start, end)) = self.get(&span.peer) {
130            start <= &span.counter.start && end >= &span.counter.end
131        } else {
132            false
133        }
134    }
135
136    pub fn extends_to_include_id_span(&mut self, mut span: IdSpan) {
137        span.normalize_();
138        if let Some((start, end)) = self.0.get_mut(&span.peer) {
139            *start = (*start).min(span.counter.start);
140            *end = (*end).max(span.counter.end);
141        } else {
142            self.insert(span.peer, span.counter.start, span.counter.end);
143        }
144    }
145
146    pub fn is_empty(&self) -> bool {
147        self.0.is_empty()
148    }
149
150    pub fn inner(&self) -> &FxHashMap<PeerID, (Counter, Counter)> {
151        &self.0
152    }
153}
154
155/// Immutable version vector
156///
157/// It has O(1) clone time and O(logN) insert/delete/lookup time.
158///
159/// It's more memory efficient than [VersionVector] when the version vector
160/// can be created from cloning and modifying other similar version vectors.
161#[repr(transparent)]
162#[derive(Debug, Clone, Default, Serialize, Deserialize)]
163pub struct ImVersionVector(im::HashMap<PeerID, Counter, rustc_hash::FxBuildHasher>);
164
165#[inline]
166fn normalize_vv_counter(counter: Counter) -> Counter {
167    counter.max(0)
168}
169
170#[inline]
171fn last_id_to_vv_end(id: ID) -> Counter {
172    id.counter.saturating_add(1).max(0)
173}
174
175impl ImVersionVector {
176    pub fn new() -> Self {
177        Self(Default::default())
178    }
179
180    pub fn clear(&mut self) {
181        self.0.clear()
182    }
183
184    pub fn get(&self, key: &PeerID) -> Option<&Counter> {
185        self.0.get(key)
186    }
187
188    pub fn get_mut(&mut self, key: &PeerID) -> Option<&mut Counter> {
189        self.0.get_mut(key)
190    }
191
192    pub fn insert(&mut self, k: PeerID, v: Counter) {
193        self.0.insert(k, v);
194    }
195
196    pub fn is_empty(&self) -> bool {
197        self.0.is_empty()
198    }
199
200    pub fn iter(&self) -> im::hashmap::Iter<'_, PeerID, Counter> {
201        self.0.iter()
202    }
203
204    pub fn remove(&mut self, k: &PeerID) -> Option<Counter> {
205        self.0.remove(k)
206    }
207
208    pub fn len(&self) -> usize {
209        self.0.len()
210    }
211
212    pub fn contains_key(&self, k: &PeerID) -> bool {
213        self.0.contains_key(k)
214    }
215
216    pub fn encode(&self) -> Vec<u8> {
217        postcard::to_allocvec(self).unwrap()
218    }
219
220    pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
221        let vv = VersionVector::decode(bytes)?;
222        Ok(Self::from_vv(&vv))
223    }
224
225    pub fn to_vv(&self) -> VersionVector {
226        VersionVector(self.0.iter().map(|(&k, &v)| (k, v)).collect())
227    }
228
229    pub fn from_vv(vv: &VersionVector) -> Self {
230        ImVersionVector(vv.0.iter().map(|(&k, &v)| (k, v)).collect())
231    }
232
233    pub fn extend_to_include_vv<'a>(
234        &mut self,
235        vv: impl Iterator<Item = (&'a PeerID, &'a Counter)>,
236    ) {
237        for (&client_id, &counter) in vv {
238            let counter = normalize_vv_counter(counter);
239            if counter == 0 {
240                continue;
241            }
242
243            if let Some(my_counter) = self.0.get_mut(&client_id) {
244                *my_counter = normalize_vv_counter(*my_counter);
245                if *my_counter < counter {
246                    *my_counter = counter;
247                }
248            } else {
249                self.0.insert(client_id, counter);
250            }
251        }
252    }
253
254    #[inline]
255    pub fn merge(&mut self, other: &Self) {
256        self.extend_to_include_vv(other.0.iter());
257    }
258
259    #[inline]
260    pub fn merge_vv(&mut self, other: &VersionVector) {
261        self.extend_to_include_vv(other.0.iter());
262    }
263
264    #[inline]
265    pub fn set_last(&mut self, id: ID) {
266        let end = last_id_to_vv_end(id);
267        if end == 0 {
268            self.0.remove(&id.peer);
269        } else {
270            self.0.insert(id.peer, end);
271        }
272    }
273
274    pub fn extend_to_include_last_id(&mut self, id: ID) {
275        let end = last_id_to_vv_end(id);
276        if end == 0 {
277            return;
278        }
279
280        if let Some(counter) = self.0.get_mut(&id.peer) {
281            *counter = normalize_vv_counter(*counter);
282            if *counter < end {
283                *counter = end;
284            }
285        } else {
286            self.0.insert(id.peer, end);
287        }
288    }
289
290    pub(crate) fn includes_id(&self, x: ID) -> bool {
291        if self.is_empty() || x.counter < 0 {
292            return false;
293        }
294
295        normalize_vv_counter(self.get(&x.peer).copied().unwrap_or(0)) > x.counter
296    }
297}
298
299impl PartialEq for VersionVector {
300    fn eq(&self, other: &Self) -> bool {
301        self.iter().all(|(client, counter)| {
302            normalize_vv_counter(*other.get(client).unwrap_or(&0)) == normalize_vv_counter(*counter)
303        }) && other.iter().all(|(client, counter)| {
304            normalize_vv_counter(*self.get(client).unwrap_or(&0)) == normalize_vv_counter(*counter)
305        })
306    }
307}
308
309impl Eq for VersionVector {}
310
311impl PartialEq for ImVersionVector {
312    fn eq(&self, other: &Self) -> bool {
313        self.0.iter().all(|(client, counter)| {
314            normalize_vv_counter(*other.0.get(client).unwrap_or(&0))
315                == normalize_vv_counter(*counter)
316        }) && other.0.iter().all(|(client, counter)| {
317            normalize_vv_counter(*self.0.get(client).unwrap_or(&0))
318                == normalize_vv_counter(*counter)
319        })
320    }
321}
322
323impl Eq for ImVersionVector {}
324
325impl Deref for VersionVector {
326    type Target = FxHashMap<PeerID, Counter>;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333#[derive(Default, Debug, PartialEq, Eq)]
334pub struct VersionVectorDiff {
335    /// The spans that the `left` side needs to retreat to reach the `right` side
336    ///
337    /// these spans are included in the left, but not in the right
338    pub retreat: IdSpanVector,
339    /// The spans that the `left` side needs to forward to reach the `right` side
340    ///
341    /// these spans are included in the right, but not in the left
342    pub forward: IdSpanVector,
343}
344
345impl VersionVectorDiff {
346    #[inline]
347    pub fn merge_left(&mut self, span: IdSpan) {
348        merge(&mut self.retreat, span);
349    }
350
351    #[inline]
352    pub fn merge_right(&mut self, span: IdSpan) {
353        merge(&mut self.forward, span);
354    }
355
356    #[inline]
357    pub fn subtract_start_left(&mut self, span: IdSpan) {
358        subtract_start(&mut self.retreat, span);
359    }
360
361    #[inline]
362    pub fn subtract_start_right(&mut self, span: IdSpan) {
363        subtract_start(&mut self.forward, span);
364    }
365
366    pub fn get_id_spans_left(&self) -> impl Iterator<Item = IdSpan> + '_ {
367        self.retreat.iter().map(|(peer, span)| IdSpan {
368            peer: *peer,
369            counter: *span,
370        })
371    }
372
373    pub fn get_id_spans_right(&self) -> impl Iterator<Item = IdSpan> + '_ {
374        self.forward.iter().map(|(peer, span)| IdSpan {
375            peer: *peer,
376            counter: *span,
377        })
378    }
379}
380
381fn subtract_start(m: &mut FxHashMap<PeerID, CounterSpan>, target: IdSpan) {
382    if let Some(span) = m.get_mut(&target.peer) {
383        if span.start < target.counter.end {
384            span.start = target.counter.end;
385        }
386    }
387}
388
389fn merge(m: &mut FxHashMap<PeerID, CounterSpan>, mut target: IdSpan) {
390    target.normalize_();
391    if let Some(span) = m.get_mut(&target.peer) {
392        span.start = span.start.min(target.counter.start);
393        span.end = span.end.max(target.counter.end);
394    } else {
395        m.insert(target.peer, target.counter);
396    }
397}
398
399impl PartialOrd for VersionVector {
400    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
401        let mut self_greater = true;
402        let mut other_greater = true;
403        let mut eq = true;
404        for (client_id, other_end) in other.iter() {
405            let other_end = normalize_vv_counter(*other_end);
406            if let Some(self_end) = self.get(client_id) {
407                let self_end = normalize_vv_counter(*self_end);
408                if self_end < other_end {
409                    self_greater = false;
410                    eq = false;
411                }
412                if self_end > other_end {
413                    other_greater = false;
414                    eq = false;
415                }
416            } else if other_end > 0 {
417                self_greater = false;
418                eq = false;
419            }
420        }
421
422        for (client_id, self_end) in self.iter() {
423            if other.contains_key(client_id) {
424                continue;
425            } else if normalize_vv_counter(*self_end) > 0 {
426                other_greater = false;
427                eq = false;
428            }
429        }
430
431        if eq {
432            Some(Ordering::Equal)
433        } else if self_greater {
434            Some(Ordering::Greater)
435        } else if other_greater {
436            Some(Ordering::Less)
437        } else {
438            None
439        }
440    }
441}
442
443impl PartialOrd for ImVersionVector {
444    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
445        let mut self_greater = true;
446        let mut other_greater = true;
447        let mut eq = true;
448        for (client_id, other_end) in other.iter() {
449            let other_end = normalize_vv_counter(*other_end);
450            if let Some(self_end) = self.get(client_id) {
451                let self_end = normalize_vv_counter(*self_end);
452                if self_end < other_end {
453                    self_greater = false;
454                    eq = false;
455                }
456                if self_end > other_end {
457                    other_greater = false;
458                    eq = false;
459                }
460            } else if other_end > 0 {
461                self_greater = false;
462                eq = false;
463            }
464        }
465
466        for (client_id, self_end) in self.iter() {
467            if other.contains_key(client_id) {
468                continue;
469            } else if normalize_vv_counter(*self_end) > 0 {
470                other_greater = false;
471                eq = false;
472            }
473        }
474
475        if eq {
476            Some(Ordering::Equal)
477        } else if self_greater {
478            Some(Ordering::Greater)
479        } else if other_greater {
480            Some(Ordering::Less)
481        } else {
482            None
483        }
484    }
485}
486
487impl DerefMut for VersionVector {
488    fn deref_mut(&mut self) -> &mut Self::Target {
489        &mut self.0
490    }
491}
492
493impl VersionVector {
494    pub fn diff(&self, rhs: &Self) -> VersionVectorDiff {
495        let mut ans: VersionVectorDiff = Default::default();
496        for (client_id, &counter) in self.iter() {
497            let counter = normalize_vv_counter(counter);
498            if let Some(&rhs_counter) = rhs.get(client_id) {
499                let rhs_counter = normalize_vv_counter(rhs_counter);
500                match counter.cmp(&rhs_counter) {
501                    Ordering::Less => {
502                        ans.forward.insert(
503                            *client_id,
504                            CounterSpan {
505                                start: counter,
506                                end: rhs_counter,
507                            },
508                        );
509                    }
510                    Ordering::Greater => {
511                        ans.retreat.insert(
512                            *client_id,
513                            CounterSpan {
514                                start: rhs_counter,
515                                end: counter,
516                            },
517                        );
518                    }
519                    Ordering::Equal => {}
520                }
521            } else if counter > 0 {
522                ans.retreat.insert(
523                    *client_id,
524                    CounterSpan {
525                        start: 0,
526                        end: counter,
527                    },
528                );
529            }
530        }
531        for (client_id, &rhs_counter) in rhs.iter() {
532            let rhs_counter = normalize_vv_counter(rhs_counter);
533            if rhs_counter > 0 && !self.contains_key(client_id) {
534                ans.forward.insert(
535                    *client_id,
536                    CounterSpan {
537                        start: 0,
538                        end: rhs_counter,
539                    },
540                );
541            }
542        }
543
544        ans
545    }
546
547    /// Returns two iterators that cover the differences between two version vectors.
548    ///
549    /// - The first iterator contains the spans that are in `self` but not in `rhs`
550    /// - The second iterator contains the spans that are in `rhs` but not in `self`
551    pub fn diff_iter<'a>(
552        &'a self,
553        rhs: &'a Self,
554    ) -> (
555        impl Iterator<Item = IdSpan> + 'a,
556        impl Iterator<Item = IdSpan> + 'a,
557    ) {
558        (self.sub_iter(rhs), rhs.sub_iter(self))
559    }
560
561    /// Returns the spans that are in `self` but not in `rhs`
562    pub fn sub_iter<'a>(&'a self, rhs: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
563        self.iter().filter_map(move |(peer, &counter)| {
564            let counter = normalize_vv_counter(counter);
565            if let Some(&rhs_counter) = rhs.get(peer) {
566                let rhs_counter = normalize_vv_counter(rhs_counter);
567                if counter > rhs_counter {
568                    Some(IdSpan {
569                        peer: *peer,
570                        counter: CounterSpan {
571                            start: rhs_counter,
572                            end: counter,
573                        },
574                    })
575                } else {
576                    None
577                }
578            } else if counter > 0 {
579                Some(IdSpan {
580                    peer: *peer,
581                    counter: CounterSpan {
582                        start: 0,
583                        end: counter,
584                    },
585                })
586            } else {
587                None
588            }
589        })
590    }
591
592    /// Returns the spans that are in `self` but not in `rhs`
593    pub fn sub_iter_im<'a>(
594        &'a self,
595        rhs: &'a ImVersionVector,
596    ) -> impl Iterator<Item = IdSpan> + 'a {
597        self.iter().filter_map(move |(peer, &counter)| {
598            let counter = normalize_vv_counter(counter);
599            if let Some(&rhs_counter) = rhs.get(peer) {
600                let rhs_counter = normalize_vv_counter(rhs_counter);
601                if counter > rhs_counter {
602                    Some(IdSpan {
603                        peer: *peer,
604                        counter: CounterSpan {
605                            start: rhs_counter,
606                            end: counter,
607                        },
608                    })
609                } else {
610                    None
611                }
612            } else if counter > 0 {
613                Some(IdSpan {
614                    peer: *peer,
615                    counter: CounterSpan {
616                        start: 0,
617                        end: counter,
618                    },
619                })
620            } else {
621                None
622            }
623        })
624    }
625
626    /// Iter all span from a -> b and b -> a
627    pub fn iter_between<'a>(&'a self, other: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
628        // PERF: can be optimized a little
629        self.sub_iter(other).chain(other.sub_iter(self))
630    }
631
632    pub fn sub_vec(&self, rhs: &Self) -> IdSpanVector {
633        self.sub_iter(rhs).map(|x| (x.peer, x.counter)).collect()
634    }
635
636    pub fn distance_between(&self, other: &Self) -> usize {
637        let mut ans = 0usize;
638        for (client_id, &counter) in self.iter() {
639            let counter = counter.max(0) as i64;
640            if let Some(&other_counter) = other.get(client_id) {
641                let other_counter = other_counter.max(0) as i64;
642                ans += counter.abs_diff(other_counter) as usize;
643            } else {
644                ans += counter as usize;
645            }
646        }
647
648        for (client_id, &counter) in other.iter() {
649            if !self.contains_key(client_id) {
650                ans += counter.max(0) as usize;
651            }
652        }
653
654        ans
655    }
656
657    pub fn to_spans(&self) -> IdSpanVector {
658        self.iter()
659            .filter_map(|(client_id, &counter)| {
660                let counter = normalize_vv_counter(counter);
661                (counter > 0).then_some((
662                    *client_id,
663                    CounterSpan {
664                        start: 0,
665                        end: counter,
666                    },
667                ))
668            })
669            .collect()
670    }
671
672    #[inline]
673    pub fn get_frontiers(&self) -> Frontiers {
674        self.iter()
675            .filter_map(|(client_id, &counter)| {
676                if counter > 0 {
677                    Some(ID {
678                        peer: *client_id,
679                        counter: counter - 1,
680                    })
681                } else {
682                    None
683                }
684            })
685            .collect()
686    }
687
688    #[inline]
689    pub fn new() -> Self {
690        Self(Default::default())
691    }
692
693    /// set the inclusive ending point. target id will be included by self
694    #[inline]
695    pub fn set_last(&mut self, id: ID) {
696        let end = last_id_to_vv_end(id);
697        if end == 0 {
698            self.0.remove(&id.peer);
699        } else {
700            self.0.insert(id.peer, end);
701        }
702    }
703
704    #[inline]
705    pub fn get_last(&self, client_id: PeerID) -> Option<Counter> {
706        self.0.get(&client_id).and_then(|&x| {
707            let x = normalize_vv_counter(x);
708            if x == 0 {
709                None
710            } else {
711                Some(x - 1)
712            }
713        })
714    }
715
716    /// set the exclusive ending point. target id will NOT be included by self
717    #[inline]
718    pub fn set_end(&mut self, id: ID) {
719        if id.counter <= 0 {
720            self.0.remove(&id.peer);
721        } else {
722            self.0.insert(id.peer, id.counter);
723        }
724    }
725
726    /// Update the end counter of the given client if the end is greater.
727    /// Return whether updated
728    #[inline]
729    pub fn try_update_last(&mut self, id: ID) -> bool {
730        let new_end = last_id_to_vv_end(id);
731        if new_end == 0 {
732            if self.0.get(&id.peer).is_some_and(|counter| *counter < 0) {
733                self.0.remove(&id.peer);
734            }
735            return false;
736        }
737
738        if let Some(end) = self.0.get_mut(&id.peer) {
739            *end = normalize_vv_counter(*end);
740            if *end < new_end {
741                *end = new_end;
742                true
743            } else {
744                false
745            }
746        } else {
747            self.0.insert(id.peer, new_end);
748            true
749        }
750    }
751
752    pub fn get_missing_span(&self, target: &Self) -> Vec<IdSpan> {
753        let mut ans = vec![];
754        for (client_id, other_end) in target.iter() {
755            let other_end = normalize_vv_counter(*other_end);
756            if other_end == 0 {
757                continue;
758            }
759
760            let my_end = self
761                .get(client_id)
762                .map(|counter| normalize_vv_counter(*counter))
763                .unwrap_or(0);
764            if my_end < other_end {
765                ans.push(IdSpan::new(*client_id, my_end, other_end));
766            }
767        }
768
769        ans
770    }
771
772    pub fn merge(&mut self, other: &Self) {
773        for (&client_id, &other_end) in other.iter() {
774            let other_end = normalize_vv_counter(other_end);
775            if other_end == 0 {
776                continue;
777            }
778
779            if let Some(my_end) = self.get_mut(&client_id) {
780                *my_end = normalize_vv_counter(*my_end);
781                if *my_end < other_end {
782                    *my_end = other_end;
783                }
784            } else {
785                self.0.insert(client_id, other_end);
786            }
787        }
788    }
789
790    pub fn includes_vv(&self, other: &VersionVector) -> bool {
791        match self.partial_cmp(other) {
792            Some(ord) => match ord {
793                Ordering::Less => false,
794                Ordering::Equal => true,
795                Ordering::Greater => true,
796            },
797            None => false,
798        }
799    }
800
801    pub fn includes_id(&self, id: ID) -> bool {
802        if id.counter < 0 {
803            return false;
804        }
805
806        if let Some(end) = self.get(&id.peer) {
807            if normalize_vv_counter(*end) > id.counter {
808                return true;
809            }
810        }
811        false
812    }
813
814    pub fn intersect_span(&self, target: IdSpan) -> Option<CounterSpan> {
815        if let Some(&end) = self.get(&target.peer) {
816            let end = normalize_vv_counter(end);
817            let count_start = target.ctr_start().max(0);
818            let count_end = target.ctr_end().max(0);
819            if end > count_start && count_end > count_start {
820                return Some(CounterSpan {
821                    start: count_start,
822                    end: end.min(count_end),
823                });
824            }
825        }
826
827        None
828    }
829
830    pub fn extend_to_include_vv<'a>(
831        &mut self,
832        vv: impl Iterator<Item = (&'a PeerID, &'a Counter)>,
833    ) {
834        for (&client_id, &counter) in vv {
835            let counter = normalize_vv_counter(counter);
836            if counter == 0 {
837                continue;
838            }
839
840            if let Some(my_counter) = self.get_mut(&client_id) {
841                *my_counter = normalize_vv_counter(*my_counter);
842                if *my_counter < counter {
843                    *my_counter = counter;
844                }
845            } else {
846                self.0.insert(client_id, counter);
847            }
848        }
849    }
850
851    pub fn extend_to_include_last_id(&mut self, id: ID) {
852        let end = last_id_to_vv_end(id);
853        if end == 0 {
854            return;
855        }
856
857        if let Some(counter) = self.get_mut(&id.peer) {
858            *counter = normalize_vv_counter(*counter);
859            if *counter < end {
860                *counter = end;
861            }
862        } else {
863            self.0.insert(id.peer, end);
864        }
865    }
866
867    pub fn extend_to_include_end_id(&mut self, id: ID) {
868        let end = normalize_vv_counter(id.counter);
869        if end == 0 {
870            return;
871        }
872
873        if let Some(counter) = self.get_mut(&id.peer) {
874            *counter = normalize_vv_counter(*counter);
875            if *counter < end {
876                *counter = end;
877            }
878        } else {
879            self.0.insert(id.peer, end);
880        }
881    }
882
883    pub fn extend_to_include(&mut self, span: IdSpan) {
884        let end = normalize_vv_counter(span.counter.norm_end());
885        if end == 0 {
886            if self.0.get(&span.peer).is_some_and(|counter| *counter < 0) {
887                self.0.remove(&span.peer);
888            }
889            return;
890        }
891
892        if let Some(counter) = self.get_mut(&span.peer) {
893            *counter = normalize_vv_counter(*counter);
894            if *counter < end {
895                *counter = end;
896            }
897        } else {
898            self.insert(span.peer, end);
899        }
900    }
901
902    pub fn shrink_to_exclude(&mut self, span: IdSpan) {
903        let start = normalize_vv_counter(span.counter.min());
904        let end = normalize_vv_counter(span.counter.norm_end());
905        if end <= start {
906            return;
907        }
908
909        if start == 0 {
910            self.remove(&span.peer);
911            return;
912        }
913
914        if let Some(counter) = self.get_mut(&span.peer) {
915            *counter = normalize_vv_counter(*counter);
916            if *counter > start {
917                *counter = start;
918            }
919            if *counter == 0 {
920                self.remove(&span.peer);
921            }
922        }
923    }
924
925    pub fn forward(&mut self, spans: &IdSpanVector) {
926        for span in spans.iter() {
927            self.extend_to_include(IdSpan {
928                peer: *span.0,
929                counter: *span.1,
930            });
931        }
932    }
933
934    pub fn retreat(&mut self, spans: &IdSpanVector) {
935        for span in spans.iter() {
936            self.shrink_to_exclude(IdSpan {
937                peer: *span.0,
938                counter: *span.1,
939            });
940        }
941    }
942
943    pub fn intersection(&self, other: &VersionVector) -> VersionVector {
944        let mut ans = VersionVector::new();
945        for (client_id, &counter) in self.iter() {
946            let counter = normalize_vv_counter(counter);
947            if let Some(&other_counter) = other.get(client_id) {
948                let other_counter = normalize_vv_counter(other_counter);
949                if counter < other_counter {
950                    if counter != 0 {
951                        ans.insert(*client_id, counter);
952                    }
953                } else if other_counter != 0 {
954                    ans.insert(*client_id, other_counter);
955                }
956            }
957        }
958        ans
959    }
960
961    #[inline(always)]
962    pub fn encode(&self) -> Vec<u8> {
963        postcard::to_allocvec(self).unwrap()
964    }
965
966    #[inline(always)]
967    pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
968        postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError)
969    }
970
971    pub fn to_im_vv(&self) -> ImVersionVector {
972        ImVersionVector(self.0.iter().map(|(&k, &v)| (k, v)).collect())
973    }
974
975    pub fn from_im_vv(im_vv: &ImVersionVector) -> Self {
976        VersionVector(im_vv.0.iter().map(|(&k, &v)| (k, v)).collect())
977    }
978}
979
980/// Use minimal set of ids to represent the frontiers
981#[tracing::instrument(skip(dag))]
982pub fn shrink_frontiers(last_ids: &Frontiers, dag: &AppDag) -> Result<Frontiers, ID> {
983    // it only keep the ids of ops that are concurrent to each other
984
985    if last_ids.len() <= 1 {
986        return Ok(last_ids.clone());
987    }
988
989    let mut last_ids = {
990        let ids = filter_duplicated_peer_id(last_ids);
991        if last_ids.len() == 1 {
992            let mut frontiers = Frontiers::default();
993            frontiers.push(last_ids.as_single().unwrap());
994            return Ok(frontiers);
995        }
996
997        let mut last_ids = Vec::with_capacity(ids.len());
998        for id in ids {
999            let Some(lamport) = dag.get_lamport(&id) else {
1000                return Err(id);
1001            };
1002            last_ids.push(IdFull::new(id.peer, id.counter, lamport))
1003        }
1004
1005        last_ids
1006    };
1007
1008    let mut frontiers = Vec::new();
1009    // Iterate from the greatest lamport to the smallest
1010    last_ids.sort_by_key(|x| x.lamport);
1011    for id in last_ids.iter().rev() {
1012        let mut should_insert = true;
1013        let mut len = 0;
1014        // travel backward because they have more similar lamport
1015        for f_id in frontiers.iter().rev() {
1016            dag.travel_ancestors(*f_id, &mut |x| {
1017                len += 1;
1018                if x.contains_id(id.id()) {
1019                    should_insert = false;
1020                    ControlFlow::Break(())
1021                } else if x.lamport_last() < id.lamport {
1022                    // Already travel to a node with smaller lamport, no need to continue, we are sure two ops are concurrent now
1023                    ControlFlow::Break(())
1024                } else {
1025                    ControlFlow::Continue(())
1026                }
1027            });
1028        }
1029
1030        if should_insert {
1031            frontiers.push(id.id());
1032        }
1033    }
1034
1035    Ok(frontiers.into())
1036}
1037
1038fn filter_duplicated_peer_id(last_ids: &Frontiers) -> Vec<ID> {
1039    let mut peer_max_counters = FxHashMap::default();
1040    for id in last_ids.iter() {
1041        let counter = peer_max_counters.entry(id.peer).or_insert(id.counter);
1042        if id.counter > *counter {
1043            *counter = id.counter;
1044        }
1045    }
1046
1047    peer_max_counters
1048        .into_iter()
1049        .map(|(peer, counter)| ID::new(peer, counter))
1050        .collect()
1051}
1052
1053impl Default for VersionVector {
1054    fn default() -> Self {
1055        Self::new()
1056    }
1057}
1058
1059impl From<FxHashMap<PeerID, Counter>> for VersionVector {
1060    fn from(map: FxHashMap<PeerID, Counter>) -> Self {
1061        let mut im_map = FxHashMap::default();
1062        for (client_id, counter) in map {
1063            im_map.insert(client_id, counter);
1064        }
1065        Self(im_map)
1066    }
1067}
1068
1069impl From<Vec<ID>> for VersionVector {
1070    fn from(vec: Vec<ID>) -> Self {
1071        let mut vv = VersionVector::new();
1072        for id in vec {
1073            vv.set_last(id);
1074        }
1075
1076        vv
1077    }
1078}
1079
1080impl FromIterator<ID> for VersionVector {
1081    fn from_iter<T: IntoIterator<Item = ID>>(iter: T) -> Self {
1082        let iter = iter.into_iter();
1083        let mut vv = VersionVector(FxHashMap::with_capacity_and_hasher(
1084            iter.size_hint().0,
1085            Default::default(),
1086        ));
1087        for id in iter {
1088            vv.set_last(id);
1089        }
1090
1091        vv
1092    }
1093}
1094
1095impl FromIterator<(PeerID, Counter)> for VersionVector {
1096    fn from_iter<T: IntoIterator<Item = (PeerID, Counter)>>(iter: T) -> Self {
1097        VersionVector(FxHashMap::from_iter(iter))
1098    }
1099}
1100
1101// Note: It will be encoded into binary format, so the order of its fields should not be changed.
1102
1103pub fn are_frontiers_eq(a: &[ID], b: &[ID]) -> bool {
1104    if a.len() != b.len() {
1105        return false;
1106    }
1107
1108    let mut a: SmallVec<[ID; 10]> = a.into();
1109    let mut b: SmallVec<[ID; 10]> = b.into();
1110
1111    a.sort();
1112    b.sort();
1113
1114    a == b
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    #![allow(clippy::neg_cmp_op_on_partial_ord)]
1120    use super::*;
1121    mod cmp {
1122        use super::*;
1123        #[test]
1124        fn test() {
1125            let a: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1126            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1127            assert_eq!(a.partial_cmp(&b), Some(Ordering::Equal));
1128            assert!(a == b);
1129
1130            let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 1)].into();
1131            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1132            assert_eq!(a.partial_cmp(&b), None);
1133
1134            assert!(!(a > b));
1135            assert!(!(b > a));
1136            assert!(!(b == a));
1137
1138            let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 3)].into();
1139            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1140            assert_eq!(a.partial_cmp(&b), Some(Ordering::Greater));
1141            assert!(a > b);
1142            assert!(a >= b);
1143
1144            let a: VersionVector = vec![ID::new(1, 0), ID::new(2, 2)].into();
1145            let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into();
1146            assert_eq!(a.partial_cmp(&b), Some(Ordering::Less));
1147            assert!(a < b);
1148            assert!(a <= b);
1149        }
1150    }
1151
1152    #[test]
1153    fn im() {
1154        let mut a = VersionVector::new();
1155        a.set_last(ID::new(1, 1));
1156        a.set_last(ID::new(2, 1));
1157        let mut b = a.clone();
1158        b.merge(&vec![ID::new(1, 2), ID::new(2, 2)].into());
1159        assert!(a != b);
1160        assert_eq!(a.get(&1), Some(&2));
1161        assert_eq!(a.get(&2), Some(&2));
1162        assert_eq!(b.get(&1), Some(&3));
1163        assert_eq!(b.get(&2), Some(&3));
1164    }
1165
1166    #[test]
1167    fn test_encode_decode_im_version_vector() {
1168        let vv = VersionVector::from_iter([(1, 1), (2, 2), (3, 3)]);
1169        let im_vv = vv.to_im_vv();
1170        let decoded_vv = VersionVector::from_im_vv(&im_vv);
1171        assert_eq!(vv, decoded_vv);
1172    }
1173
1174    #[test]
1175    fn test_version_vector_encoding_decoding() {
1176        let mut vv = VersionVector::new();
1177        vv.insert(1, 10);
1178        vv.insert(2, 20);
1179        vv.insert(3, 30);
1180
1181        // Encode VersionVector
1182        let encoded = vv.encode();
1183
1184        // Decode to ImVersionVector
1185        let decoded_im_vv = ImVersionVector::decode(&encoded).unwrap();
1186
1187        // Convert VersionVector to ImVersionVector for comparison
1188        let im_vv = vv.to_im_vv();
1189
1190        // Compare the original ImVersionVector with the decoded one
1191        assert_eq!(im_vv, decoded_im_vv);
1192
1193        // Convert back to VersionVector and compare
1194        let decoded_vv = VersionVector::from_im_vv(&decoded_im_vv);
1195        assert_eq!(vv, decoded_vv);
1196    }
1197}