loro_internal/oplog/
loro_dag.rs

1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::sync::Mutex;
6use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
7use fxhash::FxHashSet;
8use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
9use once_cell::sync::OnceCell;
10use rle::{HasIndex, HasLength, Mergable, Sliceable};
11use smallvec::SmallVec;
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
14use std::fmt::Display;
15use std::ops::{ControlFlow, Deref};
16use std::sync::Arc;
17use tracing::instrument;
18
19use super::change_store::BatchDecodeInfo;
20use super::ChangeStore;
21
22/// [AppDag] maintains the causal graph of the app.
23/// It's faster to answer the question like what's the LCA version
24#[derive(Debug)]
25pub struct AppDag {
26    change_store: ChangeStore,
27    /// It only contains nodes that are already parsed.
28    ///
29    /// - All the unparsed op ids must be included in `unparsed_vv`.
30    /// - All the parsed and unparsed op ids must be included in `vv`.
31    map: Mutex<BTreeMap<ID, AppDagNode>>,
32    /// The latest known frontiers
33    frontiers: Frontiers,
34    /// The latest known version vector
35    vv: VersionVector,
36    /// The earliest known frontiers
37    shallow_since_frontiers: Frontiers,
38    /// The deps of the shallow frontiers
39    shallow_root_frontiers_deps: Frontiers,
40    /// The vv of shallow_frontiers_deps
41    shallow_since_vv: ImVersionVector,
42    /// Ops included in the version vector but not parsed yet
43    ///
44    /// # Invariants
45    ///
46    /// - `vv` >= `unparsed_vv`
47    unparsed_vv: Mutex<VersionVector>,
48    /// It's a set of points which are deps of some parsed ops.
49    /// But the ops in this set are not parsed yet. When they are parsed,
50    /// we need to make sure it breaks at the given point.
51    unhandled_dep_points: Mutex<BTreeSet<ID>>,
52    pending_txn_node: Option<AppDagNode>,
53}
54
55#[derive(Debug, Clone)]
56pub struct AppDagNode {
57    inner: Arc<AppDagNodeInner>,
58}
59
60impl Deref for AppDagNode {
61    type Target = AppDagNodeInner;
62
63    fn deref(&self) -> &Self::Target {
64        &self.inner
65    }
66}
67
68impl AppDagNode {
69    pub fn new(inner: AppDagNodeInner) -> Self {
70        Self {
71            inner: Arc::new(inner),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct AppDagNodeInner {
78    pub(crate) peer: PeerID,
79    pub(crate) cnt: Counter,
80    pub(crate) lamport: Lamport,
81    pub(crate) deps: Frontiers,
82    pub(crate) vv: OnceCell<ImVersionVector>,
83    /// A flag indicating whether any other nodes from a different peer depend on this node.
84    /// The calculation of frontiers is based on the property that a node does not depend
85    /// on the middle of other nodes.
86    pub(crate) has_succ: bool,
87    pub(crate) len: usize,
88}
89
90impl From<AppDagNodeInner> for AppDagNode {
91    fn from(inner: AppDagNodeInner) -> Self {
92        AppDagNode {
93            inner: Arc::new(inner),
94        }
95    }
96}
97
98impl AppDag {
99    pub(super) fn new(change_store: ChangeStore) -> Self {
100        Self {
101            change_store,
102            map: Mutex::new(BTreeMap::new()),
103            frontiers: Frontiers::default(),
104            vv: VersionVector::default(),
105            unparsed_vv: Mutex::new(VersionVector::default()),
106            unhandled_dep_points: Mutex::new(BTreeSet::new()),
107            shallow_since_frontiers: Default::default(),
108            shallow_root_frontiers_deps: Default::default(),
109            shallow_since_vv: Default::default(),
110            pending_txn_node: None,
111        }
112    }
113
114    pub fn frontiers(&self) -> &Frontiers {
115        &self.frontiers
116    }
117
118    pub fn vv(&self) -> &VersionVector {
119        &self.vv
120    }
121
122    pub fn shallow_since_vv(&self) -> &ImVersionVector {
123        &self.shallow_since_vv
124    }
125
126    pub fn shallow_since_frontiers(&self) -> &Frontiers {
127        &self.shallow_since_frontiers
128    }
129
130    pub fn is_empty(&self) -> bool {
131        self.vv.is_empty()
132    }
133
134    #[tracing::instrument(skip_all, name = "handle_new_change")]
135    pub(super) fn handle_new_change(&mut self, change: &Change, from_local: bool) {
136        let len = change.content_len();
137        self.update_version_on_new_change(change, from_local);
138        #[cfg(debug_assertions)]
139        {
140            let unhandled_dep_points = self.unhandled_dep_points.lock().unwrap();
141            let c = unhandled_dep_points
142                .range(change.id_start()..change.id_end())
143                .count();
144            assert!(c == 0);
145        }
146
147        let mut inserted = false;
148        if change.deps_on_self() {
149            // We may not need to push new element to dag because it only depends on itself
150            inserted = self.with_last_mut_of_peer(change.id.peer, |last| {
151                let last = last.unwrap();
152                if last.has_succ {
153                    // Don't merge the node if there are other nodes depending on it
154                    return false;
155                }
156
157                assert_eq!(last.peer, change.id.peer, "peer id is not the same");
158                assert_eq!(
159                    last.cnt + last.len as Counter,
160                    change.id.counter,
161                    "counter is not continuous"
162                );
163                assert_eq!(
164                    last.lamport + last.len as Lamport,
165                    change.lamport,
166                    "lamport is not continuous"
167                );
168                let last = Arc::make_mut(&mut last.inner);
169                last.len = (change.id.counter - last.cnt) as usize + len;
170                last.has_succ = false;
171                true
172            });
173        }
174
175        if !inserted {
176            let node: AppDagNode = AppDagNodeInner {
177                vv: OnceCell::new(),
178                peer: change.id.peer,
179                cnt: change.id.counter,
180                lamport: change.lamport,
181                deps: change.deps.clone(),
182                has_succ: false,
183                len,
184            }
185            .into();
186
187            let mut map = self.map.lock().unwrap();
188            map.insert(node.id_start(), node);
189            self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
190        }
191    }
192
193    fn try_with_node_mut<R>(
194        &self,
195        map: &mut BTreeMap<ID, AppDagNode>,
196        id: ID,
197        f: impl FnOnce(Option<&mut AppDagNode>) -> R,
198    ) -> R {
199        let x = map.range_mut(..=id).next_back();
200        if let Some((_, node)) = x {
201            if node.contains_id(id) {
202                f(Some(node))
203            } else {
204                f(None)
205            }
206        } else {
207            f(None)
208        }
209    }
210
211    /// If the lamport of change can be calculated, return Ok, otherwise, Err
212    pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
213        for dep in change.deps.iter() {
214            match self.get_lamport(&dep) {
215                Some(lamport) => {
216                    change.lamport = change.lamport.max(lamport + 1);
217                }
218                None => return Err(()),
219            }
220        }
221        Ok(())
222    }
223
224    pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
225        let Some(node) = self.get(id) else {
226            return Frontiers::default();
227        };
228
229        let offset = id.counter - node.cnt;
230        if offset == 0 {
231            node.deps.clone()
232        } else {
233            ID::new(id.peer, node.cnt + offset - 1).into()
234        }
235    }
236
237    pub(crate) fn with_last_mut_of_peer<R>(
238        &mut self,
239        peer: PeerID,
240        f: impl FnOnce(Option<&mut AppDagNode>) -> R,
241    ) -> R {
242        self.lazy_load_last_of_peer(peer);
243        let mut binding = self.map.lock().unwrap();
244        let last = binding
245            .range_mut(..=ID::new(peer, Counter::MAX))
246            .next_back()
247            .map(|(_, v)| v);
248        f(last)
249    }
250
251    fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
252        if from_local {
253            assert!(self.pending_txn_node.take().is_some());
254            assert_eq!(
255                self.vv.get(&change.id.peer).copied().unwrap_or(0),
256                change.ctr_end()
257            );
258        } else {
259            let id_last = change.id_last();
260            self.frontiers
261                .update_frontiers_on_new_change(id_last, &change.deps);
262            assert!(self.pending_txn_node.is_none());
263            assert_eq!(
264                self.vv.get(&change.id.peer).copied().unwrap_or(0),
265                change.id.counter
266            );
267            self.vv.extend_to_include_last_id(id_last);
268        }
269    }
270
271    pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
272        let unparsed_vv = self.unparsed_vv.lock().unwrap();
273        if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
274            return;
275        }
276
277        let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
278            panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
279        };
280
281        self.lazy_load_nodes_internal(nodes, peer, None);
282    }
283
284    fn lazy_load_nodes_internal(
285        &self,
286        nodes: Vec<AppDagNode>,
287        peer: u64,
288        map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
289    ) {
290        assert!(!nodes.is_empty());
291        let mut map_guard = None;
292        let map = map_input.unwrap_or_else(|| {
293            map_guard = Some(self.map.lock().unwrap());
294            map_guard.as_mut().unwrap()
295        });
296        let new_dag_start_counter_for_the_peer = nodes[0].cnt;
297        let nodes_cnt_end = nodes.last().unwrap().ctr_end();
298        let mut unparsed_vv = self.unparsed_vv.lock().unwrap();
299        let end_counter = unparsed_vv[&peer];
300        assert!(end_counter <= nodes_cnt_end);
301        let mut deps_on_others = Vec::new();
302        let mut break_point_set = self.unhandled_dep_points.lock().unwrap();
303        for mut node in nodes {
304            if node.cnt >= end_counter {
305                // skip already parsed nodes
306                break;
307            }
308
309            if node.cnt + node.len as Counter > end_counter {
310                node = node.slice(0, (end_counter - node.cnt) as usize);
311                // This is unlikely to happen
312            }
313
314            for dep in node.deps.iter() {
315                if dep.peer != peer {
316                    deps_on_others.push(dep);
317                }
318            }
319
320            // PERF: we can try to merge the node with the previous node
321            let break_point_ends: Vec<_> = break_point_set
322                .range(node.id_start()..node.id_end())
323                .map(|id| (id.counter - node.cnt) as usize + 1)
324                .collect();
325            if break_point_ends.is_empty() {
326                map.insert(node.id_start(), node);
327            } else {
328                let mut slice_start = 0;
329                for slice_end in break_point_ends.iter().copied() {
330                    let mut slice_node = node.slice(slice_start, slice_end);
331                    let inner = Arc::make_mut(&mut slice_node.inner);
332                    inner.has_succ = true;
333                    map.insert(slice_node.id_start(), slice_node);
334                    slice_start = slice_end;
335                }
336
337                let last_break_point = break_point_ends.last().copied().unwrap();
338                if last_break_point != node.len {
339                    let slice_node = node.slice(last_break_point, node.len);
340                    map.insert(slice_node.id_start(), slice_node);
341                }
342
343                for break_point in break_point_ends.into_iter() {
344                    break_point_set.remove(&node.id_start().inc(break_point as Counter - 1));
345                }
346            }
347        }
348
349        if new_dag_start_counter_for_the_peer == 0 {
350            unparsed_vv.remove(&peer);
351        } else {
352            unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
353        }
354        drop(unparsed_vv);
355        drop(break_point_set);
356        self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
357    }
358
359    fn handle_deps_break_points(
360        &self,
361        ids: impl IntoIterator<Item = ID>,
362        skip_peer: PeerID,
363        map: Option<&mut BTreeMap<ID, AppDagNode>>,
364    ) {
365        let mut map_guard = None;
366        let map = map.unwrap_or_else(|| {
367            map_guard = Some(self.map.lock().unwrap());
368            map_guard.as_mut().unwrap()
369        });
370        for id in ids {
371            if id.peer == skip_peer {
372                continue;
373            }
374
375            let mut handled = false;
376            let ans = self.try_with_node_mut(map, id, |target| {
377                // We don't need to break the dag node if it's not loaded yet
378                let target = target?;
379                if target.ctr_last() == id.counter {
380                    let target = Arc::make_mut(&mut target.inner);
381                    handled = true;
382                    target.has_succ = true;
383                    None
384                } else {
385                    // We need to split the target node into two part
386                    // so that we can ensure the new change depends on the
387                    // last id of a dag node.
388
389                    let new_node =
390                        target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
391                    let target = Arc::make_mut(&mut target.inner);
392                    target.len -= new_node.len;
393                    Some(new_node)
394                }
395            });
396
397            if let Some(new_node) = ans {
398                map.insert(new_node.id_start(), new_node);
399            } else if !handled {
400                self.unhandled_dep_points.lock().unwrap().insert(id);
401            }
402        }
403    }
404
405    fn ensure_lazy_load_node(&self, id: ID) {
406        if self.shallow_since_vv.includes_id(id) {
407            return;
408        }
409
410        loop {
411            // We need to load all the dag nodes that has the same peer and greater counter than the given `id`
412            // Because we only record the end counter of the unparsed version on `unparsed_vv`
413            let unparsed_end = {
414                let unparsed_vv = self.unparsed_vv.lock().unwrap();
415                unparsed_vv.get(&id.peer).copied().unwrap_or(0)
416            };
417            if unparsed_end <= id.counter {
418                return;
419            }
420
421            let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
422            let Some(nodes) = self
423                .change_store
424                .get_dag_nodes_that_contains(last_unparsed_id)
425            else {
426                panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
427            };
428
429            self.lazy_load_nodes_internal(nodes, id.peer, None);
430        }
431    }
432
433    pub fn total_parsed_dag_node(&self) -> usize {
434        self.map.lock().unwrap().len()
435    }
436
437    pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
438        assert!(self.vv.is_empty());
439        *self.unparsed_vv.lock().unwrap() = v.vv.clone();
440        self.vv = v.vv;
441        self.frontiers = v.frontiers;
442        if let Some((vv, f)) = v.start_version {
443            if !f.is_empty() {
444                assert!(f.len() == 1);
445                let id = f.as_single().unwrap();
446                let node = self.get(id).unwrap();
447                assert!(node.cnt == id.counter);
448                self.shallow_root_frontiers_deps = node.deps.clone();
449            }
450            self.shallow_since_frontiers = f;
451            self.shallow_since_vv = ImVersionVector::from_vv(&vv);
452        }
453    }
454
455    /// This method is slow and should only be used for debugging and testing.
456    ///
457    /// It will check the following properties:
458    ///
459    /// 1. Counter is continuous
460    /// 2. A node always depends of the last ids of other nodes
461    /// 3. Lamport is correctly calculated
462    /// 4. VV for each node is correctly calculated
463    /// 5. Frontiers are correctly calculated
464    #[instrument(skip(self))]
465    pub fn check_dag_correctness(&self) {
466        {
467            // parse all nodes
468            let unparsed_vv = self.unparsed_vv.lock().unwrap().clone();
469            for (peer, cnt) in unparsed_vv.iter() {
470                if *cnt == 0 {
471                    continue;
472                }
473
474                let mut end_cnt = *cnt;
475                let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
476                while end_cnt > init_counter {
477                    let cnt = end_cnt - 1;
478                    self.ensure_lazy_load_node(ID::new(*peer, cnt));
479                    end_cnt = self
480                        .unparsed_vv
481                        .lock()
482                        .unwrap()
483                        .get(peer)
484                        .copied()
485                        .unwrap_or(0);
486                }
487            }
488
489            self.unparsed_vv.lock().unwrap().clear();
490        }
491        {
492            // check property 1: Counter is continuous
493            let map = self.map.lock().unwrap();
494            let mut last_end_id = ID::new(0, 0);
495            for (&id, node) in map.iter() {
496                let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
497                if id.peer == last_end_id.peer {
498                    assert!(id.counter == last_end_id.counter);
499                } else {
500                    assert_eq!(id.counter, init_counter);
501                }
502
503                last_end_id = id.inc(node.len as Counter);
504            }
505        }
506        {
507            // check property 2: A node always depends of the last ids of other nodes
508            let map = self.map.lock().unwrap();
509            check_always_dep_on_last_id(&map);
510        }
511        {
512            // check property 3: Lamport is correctly calculated
513            let map = self.map.lock().unwrap();
514            'outer: for (_, node) in map.iter() {
515                let mut this_lamport = 0;
516                for dep in node.deps.iter() {
517                    if self.shallow_since_vv.includes_id(dep) {
518                        continue 'outer;
519                    }
520
521                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
522                    this_lamport = this_lamport.max(dep_node.lamport_end());
523                }
524
525                assert_eq!(this_lamport, node.lamport);
526            }
527        }
528        {
529            // check property 4: VV for each node is correctly calculated
530            let map = self.map.lock().unwrap().clone();
531            'outer: for (_, node) in map.iter() {
532                let actual_vv = self.ensure_vv_for(node);
533                let mut expected_vv = ImVersionVector::default();
534                for dep in node.deps.iter() {
535                    if self.shallow_since_vv.includes_id(dep) {
536                        continue 'outer;
537                    }
538
539                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
540                    self.ensure_vv_for(dep_node);
541                    expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
542                    expected_vv.extend_to_include_last_id(dep);
543                }
544
545                assert_eq!(actual_vv, expected_vv);
546            }
547        }
548        {
549            // check property 5: Frontiers are correctly calculated
550            let mut maybe_frontiers = FxHashSet::default();
551            let map = self.map.lock().unwrap();
552            for (_, node) in map.iter() {
553                maybe_frontiers.insert(node.id_last());
554            }
555
556            for (_, node) in map.iter() {
557                for dep in node.deps.iter() {
558                    maybe_frontiers.remove(&dep);
559                }
560            }
561
562            let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
563            assert_eq!(maybe_frontiers, frontiers);
564        }
565    }
566
567    pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
568        for id in deps.iter() {
569            if !self.vv.includes_id(id) {
570                return false;
571            }
572        }
573
574        if self.is_before_shallow_root(deps) {
575            return false;
576        }
577
578        true
579    }
580
581    pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
582        // trace!("Is on shallow history? deps={:?}", deps);
583        // trace!("self.shallow_since_vv {:?}", &self.shallow_since_vv);
584        // trace!("self.shallow_frontiers {:?}", &self.shallow_since_frontiers);
585
586        if self.shallow_since_vv.is_empty() {
587            return false;
588        }
589
590        if deps.is_empty() {
591            return true;
592        }
593
594        if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
595            return true;
596        }
597
598        if deps
599            .iter()
600            .any(|x| self.shallow_since_frontiers.contains(&x))
601        {
602            return deps != &self.shallow_since_frontiers;
603        }
604
605        false
606    }
607
608    /// Travel the ancestors of the given id, and call the callback for each node
609    ///
610    /// It will travel the ancestors in the reverse order (from the greatest lamport to the smallest)
611    pub(crate) fn travel_ancestors(
612        &self,
613        id: ID,
614        f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
615    ) {
616        struct PendingNode(AppDagNode);
617        impl PartialEq for PendingNode {
618            fn eq(&self, other: &Self) -> bool {
619                self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
620            }
621        }
622        impl Eq for PendingNode {}
623        impl PartialOrd for PendingNode {
624            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
625                Some(self.cmp(other))
626            }
627        }
628        impl Ord for PendingNode {
629            fn cmp(&self, other: &Self) -> Ordering {
630                self.0
631                    .lamport_last()
632                    .cmp(&other.0.lamport_last())
633                    .then_with(|| self.0.peer.cmp(&other.0.peer))
634            }
635        }
636
637        let mut visited = FxHashSet::default();
638        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
639        pending.push(PendingNode(self.get(id).unwrap()));
640        while let Some(PendingNode(node)) = pending.pop() {
641            if f(&node).is_break() {
642                break;
643            }
644
645            for dep in node.deps.iter() {
646                let Some(dep_node) = self.get(dep) else {
647                    continue;
648                };
649                if visited.contains(&dep_node.id_start()) {
650                    continue;
651                }
652
653                visited.insert(dep_node.id_start());
654                pending.push(PendingNode(dep_node));
655            }
656        }
657    }
658
659    pub(crate) fn update_version_on_new_local_op(
660        &mut self,
661        deps: &Frontiers,
662        start_id: ID,
663        start_lamport: Lamport,
664        len: usize,
665    ) {
666        let last_id = start_id.inc(len as Counter - 1);
667        // PERF: we can cache this last_id - this is a hot path
668        self.vv.set_last(last_id);
669        self.frontiers.update_frontiers_on_new_change(last_id, deps);
670        match &mut self.pending_txn_node {
671            Some(node) => {
672                assert!(
673                    node.peer == start_id.peer
674                        && node.cnt + node.len as Counter == start_id.counter
675                        && deps.len() == 1
676                        && deps.as_single().unwrap().peer == start_id.peer
677                );
678                let inner = Arc::make_mut(&mut node.inner);
679                inner.len += len;
680            }
681            None => {
682                let node = AppDagNode {
683                    inner: Arc::new(AppDagNodeInner {
684                        peer: start_id.peer,
685                        cnt: start_id.counter,
686                        lamport: start_lamport,
687                        deps: deps.clone(),
688                        vv: OnceCell::new(),
689                        has_succ: false,
690                        len,
691                    }),
692                };
693                self.pending_txn_node = Some(node);
694            }
695        }
696    }
697
698    pub(crate) fn latest_vv_contains_peer(&self, peer: PeerID) -> bool {
699        self.vv.contains_key(&peer) && *self.vv.get(&peer).unwrap() > 0
700    }
701}
702
703fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
704    for (_, node) in map.iter() {
705        for dep in node.deps.iter() {
706            let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
707                // It's shallow
708                continue;
709            };
710            assert_eq!(dep_node.id_start(), dep_id);
711            if dep_node.contains_id(dep) {
712                assert_eq!(dep_node.id_last(), dep);
713            }
714        }
715    }
716}
717
718impl HasIndex for AppDagNode {
719    type Int = Counter;
720    fn get_start_index(&self) -> Self::Int {
721        self.cnt
722    }
723
724    fn get_end_index(&self) -> Self::Int {
725        self.cnt + self.len as Counter
726    }
727}
728
729impl Sliceable for AppDagNode {
730    fn slice(&self, from: usize, to: usize) -> Self {
731        AppDagNodeInner {
732            peer: self.peer,
733            cnt: self.cnt + from as Counter,
734            lamport: self.lamport + from as Lamport,
735            deps: if from > 0 {
736                Frontiers::from_id(self.id_start().inc(from as Counter - 1))
737            } else {
738                self.deps.clone()
739            },
740            vv: if let Some(vv) = self.vv.get() {
741                let mut new = vv.clone();
742                new.insert(self.peer, self.cnt + from as Counter);
743                OnceCell::with_value(new)
744            } else {
745                OnceCell::new()
746            },
747            has_succ: if to == self.len { self.has_succ } else { true },
748            len: to - from,
749        }
750        .into()
751    }
752}
753
754impl HasId for AppDagNode {
755    fn id_start(&self) -> ID {
756        ID {
757            peer: self.peer,
758            counter: self.cnt,
759        }
760    }
761}
762
763impl HasCounter for AppDagNode {
764    fn ctr_start(&self) -> Counter {
765        self.cnt
766    }
767}
768
769impl HasLength for AppDagNode {
770    fn atom_len(&self) -> usize {
771        self.len
772    }
773
774    fn content_len(&self) -> usize {
775        self.len
776    }
777}
778
779impl Mergable for AppDagNode {
780    fn is_mergable(&self, other: &Self, _conf: &()) -> bool
781    where
782        Self: Sized,
783    {
784        !self.has_succ
785            && self.peer == other.peer
786            && self.cnt + self.len as Counter == other.cnt
787            && other.deps.len() == 1
788            && self.lamport + self.len as Lamport == other.lamport
789            && other.deps.as_single().unwrap().peer == self.peer
790    }
791
792    fn merge(&mut self, other: &Self, _conf: &())
793    where
794        Self: Sized,
795    {
796        assert_eq!(
797            other.deps.as_single().unwrap().counter,
798            self.cnt + self.len as Counter - 1
799        );
800        let this = Arc::make_mut(&mut self.inner);
801        this.len += other.len;
802        this.has_succ = other.has_succ;
803    }
804}
805
806impl HasLamport for AppDagNode {
807    fn lamport(&self) -> Lamport {
808        self.lamport
809    }
810}
811
812impl DagNode for AppDagNode {
813    fn deps(&self) -> &Frontiers {
814        &self.deps
815    }
816}
817
818impl Dag for AppDag {
819    type Node = AppDagNode;
820
821    fn frontier(&self) -> &Frontiers {
822        &self.frontiers
823    }
824
825    fn get(&self, id: ID) -> Option<Self::Node> {
826        self.ensure_lazy_load_node(id);
827        let binding = self.map.lock().unwrap();
828        if let Some(x) = binding.range(..=id).next_back() {
829            if x.1.contains_id(id) {
830                // PERF: do we need to optimize clone like this?
831                // by adding another layer of Arc?
832                return Some(x.1.clone());
833            }
834        }
835
836        if let Some(node) = &self.pending_txn_node {
837            if node.peer == id.peer && node.cnt <= id.counter {
838                assert!(node.cnt + node.len as Counter > id.counter);
839                return Some(node.clone());
840            }
841        }
842
843        None
844    }
845
846    fn vv(&self) -> &VersionVector {
847        &self.vv
848    }
849
850    fn contains(&self, id: ID) -> bool {
851        self.vv.includes_id(id)
852    }
853}
854
855impl AppDag {
856    // PERF: this may be painfully slow
857    /// get the version vector for a certain op.
858    /// It's the version when the op is applied
859    pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
860        self.get(id).map(|x| {
861            let mut vv = self.ensure_vv_for(&x);
862            vv.insert(id.peer, id.counter + 1);
863            vv
864        })
865    }
866
867    pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
868        if target_node.vv.get().is_none() {
869            // (node, has_processed_children)
870            let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
871            while let Some(top_node) = stack.pop() {
872                let mut ans_vv = ImVersionVector::default();
873                // trace!("node={:?} {:?}", &top_node, has_all_deps_met);
874                // trace!("deps={:?}", &top_node.deps);
875                // trace!("this.shallow_f_deps={:?}", &self.shallow_frontiers_deps);
876                // trace!("this.vv={:?}", &self.vv);
877                // trace!("this.unparsed_vv={:?}", &self.unparsed_vv);
878                // trace!("this.shallow_since_vv={:?}", &self.shallow_since_vv);
879                if top_node.deps == self.shallow_root_frontiers_deps {
880                    for (&p, &c) in self.shallow_since_vv.iter() {
881                        ans_vv.insert(p, c);
882                    }
883                } else {
884                    let mut all_deps_processed = true;
885                    for id in top_node.deps.iter() {
886                        let node = self.get(id).expect("deps should be in the dag");
887                        if node.vv.get().is_none() {
888                            // assert!(!has_all_deps_met);
889                            if all_deps_processed {
890                                stack.push(top_node.clone());
891                            }
892                            all_deps_processed = false;
893                            stack.push(node);
894                            continue;
895                        };
896                    }
897
898                    if !all_deps_processed {
899                        continue;
900                    }
901
902                    for id in top_node.deps.iter() {
903                        let node = self.get(id).expect("deps should be in the dag");
904                        let dep_vv = node.vv.get().unwrap();
905                        if ans_vv.is_empty() {
906                            ans_vv = dep_vv.clone();
907                        } else {
908                            ans_vv.extend_to_include_vv(dep_vv.iter());
909                        }
910
911                        ans_vv.insert(node.peer, node.ctr_end());
912                    }
913                }
914
915                // trace!("ans_vv={:?}", &ans_vv);
916                top_node.vv.set(ans_vv.clone()).unwrap();
917            }
918        }
919
920        target_node.vv.get().unwrap().clone()
921    }
922
923    /// Compare the causal order of two versions.
924    /// If None, two versions are concurrent to each other
925    pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
926        if a.peer == b.peer {
927            return Some(a.counter.cmp(&b.counter));
928        }
929
930        let a = self.get_vv(a).unwrap();
931        let b = self.get_vv(b).unwrap();
932        a.partial_cmp(&b)
933    }
934
935    pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
936        self.get(*id).and_then(|node| {
937            assert!(id.counter >= node.cnt);
938            if node.cnt + node.len as Counter > id.counter {
939                Some(node.lamport + (id.counter - node.cnt) as Lamport)
940            } else {
941                None
942            }
943        })
944    }
945
946    pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
947        let mut lamport = 0;
948        for id in deps.iter() {
949            let x = self.get_lamport(&id)?;
950            lamport = lamport.max(x + 1);
951        }
952
953        Some(lamport)
954    }
955
956    /// Convert a frontiers to a version vector
957    ///
958    /// If the frontiers version is not found in the dag, return None
959    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
960        if frontiers == &self.shallow_root_frontiers_deps {
961            let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
962            return Some(vv);
963        }
964
965        let mut vv: VersionVector = Default::default();
966        for id in frontiers.iter() {
967            let x = self.get(id)?;
968            let target_vv = self.ensure_vv_for(&x);
969            vv.extend_to_include_vv(target_vv.iter());
970            vv.extend_to_include_last_id(id);
971        }
972
973        Some(vv)
974    }
975
976    #[allow(unused)]
977    pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
978        if frontiers.is_empty() {
979            return Default::default();
980        }
981
982        let mut iter = frontiers.iter();
983        let mut vv = {
984            let id = iter.next().unwrap();
985            let Some(x) = self.get(id) else {
986                unreachable!()
987            };
988            let mut vv = self.ensure_vv_for(&x);
989            vv.extend_to_include_last_id(id);
990            vv
991        };
992
993        for id in iter {
994            let Some(x) = self.get(id) else {
995                unreachable!()
996            };
997            let x = self.ensure_vv_for(&x);
998            vv.extend_to_include_vv(x.iter());
999            vv.extend_to_include_last_id(id);
1000        }
1001
1002        vv
1003    }
1004
1005    pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1006        if vv.is_empty() {
1007            return Default::default();
1008        }
1009
1010        let this = vv;
1011        let last_ids: Frontiers = this
1012            .iter()
1013            .filter_map(|(client_id, cnt)| {
1014                if *cnt == 0 {
1015                    return None;
1016                }
1017
1018                if self
1019                    .shallow_since_vv
1020                    .includes_id(ID::new(*client_id, *cnt - 1))
1021                {
1022                    return None;
1023                }
1024
1025                Some(ID::new(*client_id, cnt - 1))
1026            })
1027            .collect();
1028
1029        if last_ids.is_empty() {
1030            return self.shallow_since_frontiers.clone();
1031        }
1032
1033        shrink_frontiers(&last_ids, self).unwrap()
1034    }
1035
1036    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1037        if vv.is_empty() {
1038            return Default::default();
1039        }
1040
1041        let this = vv;
1042        let last_ids: Frontiers = this
1043            .iter()
1044            .filter_map(|(client_id, cnt)| {
1045                if *cnt == 0 {
1046                    return None;
1047                }
1048
1049                if self
1050                    .shallow_since_vv
1051                    .includes_id(ID::new(*client_id, *cnt - 1))
1052                {
1053                    return None;
1054                }
1055
1056                Some(ID::new(*client_id, cnt - 1))
1057            })
1058            .collect();
1059
1060        if last_ids.is_empty() {
1061            return self.shallow_since_frontiers.clone();
1062        }
1063
1064        shrink_frontiers(&last_ids, self).unwrap()
1065    }
1066
1067    pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1068        if frontiers.is_empty() {
1069            return 0;
1070        }
1071
1072        let mut iter = frontiers.iter();
1073        let mut lamport = {
1074            let id = iter.next().unwrap();
1075            let Some(x) = self.get(id) else {
1076                unreachable!()
1077            };
1078            assert!(id.counter >= x.cnt);
1079            (id.counter - x.cnt) as Lamport + x.lamport + 1
1080        };
1081
1082        for id in iter {
1083            let Some(x) = self.get(id) else {
1084                unreachable!()
1085            };
1086            assert!(id.counter >= x.cnt);
1087            lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1088        }
1089
1090        lamport
1091    }
1092
1093    pub fn get_frontiers(&self) -> &Frontiers {
1094        &self.frontiers
1095    }
1096
1097    /// - Ordering::Less means self is less than target or parallel
1098    /// - Ordering::Equal means versions equal
1099    /// - Ordering::Greater means self's version is greater than target
1100    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1101        if &self.frontiers == other {
1102            Ordering::Equal
1103        } else if other.iter().all(|id| self.vv.includes_id(id)) {
1104            Ordering::Greater
1105        } else {
1106            Ordering::Less
1107        }
1108    }
1109
1110    // PERF
1111    /// Compare two [Frontiers] causally.
1112    ///
1113    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1114    pub fn cmp_frontiers(
1115        &self,
1116        a: &Frontiers,
1117        b: &Frontiers,
1118    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1119        let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1120        let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1121        Ok(a.partial_cmp(&b))
1122    }
1123}
1124
1125#[derive(Debug, PartialEq, Eq)]
1126pub struct FrontiersNotIncluded;
1127impl Display for FrontiersNotIncluded {
1128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129        f.write_str("The given Frontiers are not included by the doc")
1130    }
1131}