loro_internal/oplog/
loro_dag.rs

1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
6use fxhash::FxHashSet;
7use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
8use once_cell::sync::OnceCell;
9use rle::{HasIndex, HasLength, Mergable, Sliceable};
10use smallvec::SmallVec;
11use std::cmp::Ordering;
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
13use std::fmt::Display;
14use std::ops::{ControlFlow, Deref};
15use std::sync::{Arc, Mutex};
16use tracing::{instrument, trace};
17
18use super::change_store::BatchDecodeInfo;
19use super::ChangeStore;
20
21/// [AppDag] maintains the causal graph of the app.
22/// It's faster to answer the question like what's the LCA version
23#[derive(Debug)]
24pub struct AppDag {
25    change_store: ChangeStore,
26    /// It only contains nodes that are already parsed.
27    ///
28    /// - All the unparsed op ids must be included in `unparsed_vv`.
29    /// - All the parsed and unparsed op ids must be included in `vv`.
30    map: Mutex<BTreeMap<ID, AppDagNode>>,
31    /// The latest known frontiers
32    frontiers: Frontiers,
33    /// The latest known version vectorG
34    vv: VersionVector,
35    /// The earliest known frontiers
36    shallow_since_frontiers: Frontiers,
37    /// The deps of the shallow frontiers
38    shallow_root_frontiers_deps: Frontiers,
39    /// The vv of shallow_frontiers_deps
40    shallow_since_vv: ImVersionVector,
41    /// Ops included in the version vector but not parsed yet
42    ///
43    /// # Invariants
44    ///
45    /// - `vv` >= `unparsed_vv`
46    unparsed_vv: Mutex<VersionVector>,
47    /// It's a set of points which are deps of some parsed ops.
48    /// But the ops in this set are not parsed yet. When they are parsed,
49    /// we need to make sure it breaks at the given point.
50    unhandled_dep_points: Mutex<BTreeSet<ID>>,
51    pending_txn_node: Option<AppDagNode>,
52}
53
54#[derive(Debug, Clone)]
55pub struct AppDagNode {
56    inner: Arc<AppDagNodeInner>,
57}
58
59impl Deref for AppDagNode {
60    type Target = AppDagNodeInner;
61
62    fn deref(&self) -> &Self::Target {
63        &self.inner
64    }
65}
66
67impl AppDagNode {
68    pub fn new(inner: AppDagNodeInner) -> Self {
69        Self {
70            inner: Arc::new(inner),
71        }
72    }
73}
74
75#[derive(Debug, Clone)]
76pub struct AppDagNodeInner {
77    pub(crate) peer: PeerID,
78    pub(crate) cnt: Counter,
79    pub(crate) lamport: Lamport,
80    pub(crate) deps: Frontiers,
81    pub(crate) vv: OnceCell<ImVersionVector>,
82    /// A flag indicating whether any other nodes from a different peer depend on this node.
83    /// The calculation of frontiers is based on the property that a node does not depend
84    /// on the middle of other nodes.
85    pub(crate) has_succ: bool,
86    pub(crate) len: usize,
87}
88
89impl From<AppDagNodeInner> for AppDagNode {
90    fn from(inner: AppDagNodeInner) -> Self {
91        AppDagNode {
92            inner: Arc::new(inner),
93        }
94    }
95}
96
97impl AppDag {
98    pub(super) fn new(change_store: ChangeStore) -> Self {
99        Self {
100            change_store,
101            map: Mutex::new(BTreeMap::new()),
102            frontiers: Frontiers::default(),
103            vv: VersionVector::default(),
104            unparsed_vv: Mutex::new(VersionVector::default()),
105            unhandled_dep_points: Mutex::new(BTreeSet::new()),
106            shallow_since_frontiers: Default::default(),
107            shallow_root_frontiers_deps: Default::default(),
108            shallow_since_vv: Default::default(),
109            pending_txn_node: None,
110        }
111    }
112
113    pub fn frontiers(&self) -> &Frontiers {
114        &self.frontiers
115    }
116
117    pub fn vv(&self) -> &VersionVector {
118        &self.vv
119    }
120
121    pub fn shallow_since_vv(&self) -> &ImVersionVector {
122        &self.shallow_since_vv
123    }
124
125    pub fn shallow_since_frontiers(&self) -> &Frontiers {
126        &self.shallow_since_frontiers
127    }
128
129    pub fn is_empty(&self) -> bool {
130        self.vv.is_empty()
131    }
132
133    #[tracing::instrument(skip_all, name = "handle_new_change")]
134    pub(super) fn handle_new_change(&mut self, change: &Change, from_local: bool) {
135        let len = change.content_len();
136        self.update_version_on_new_change(change, from_local);
137        #[cfg(debug_assertions)]
138        {
139            let unhandled_dep_points = self.unhandled_dep_points.try_lock().unwrap();
140            let c = unhandled_dep_points
141                .range(change.id_start()..change.id_end())
142                .count();
143            assert!(c == 0);
144        }
145
146        let mut inserted = false;
147        if change.deps_on_self() {
148            // We may not need to push new element to dag because it only depends on itself
149            inserted = self.with_last_mut_of_peer(change.id.peer, |last| {
150                let last = last.unwrap();
151                if last.has_succ {
152                    // Don't merge the node if there are other nodes depending on it
153                    return false;
154                }
155
156                assert_eq!(last.peer, change.id.peer, "peer id is not the same");
157                assert_eq!(
158                    last.cnt + last.len as Counter,
159                    change.id.counter,
160                    "counter is not continuous"
161                );
162                assert_eq!(
163                    last.lamport + last.len as Lamport,
164                    change.lamport,
165                    "lamport is not continuous"
166                );
167                let last = Arc::make_mut(&mut last.inner);
168                last.len = (change.id.counter - last.cnt) as usize + len;
169                last.has_succ = false;
170                true
171            });
172        }
173
174        if !inserted {
175            let node: AppDagNode = AppDagNodeInner {
176                vv: OnceCell::new(),
177                peer: change.id.peer,
178                cnt: change.id.counter,
179                lamport: change.lamport,
180                deps: change.deps.clone(),
181                has_succ: false,
182                len,
183            }
184            .into();
185
186            let mut map = self.map.try_lock().unwrap();
187            map.insert(node.id_start(), node);
188            self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
189        }
190    }
191
192    fn try_with_node_mut<R>(
193        &self,
194        map: &mut BTreeMap<ID, AppDagNode>,
195        id: ID,
196        f: impl FnOnce(Option<&mut AppDagNode>) -> R,
197    ) -> R {
198        let x = map.range_mut(..=id).next_back();
199        if let Some((_, node)) = x {
200            if node.contains_id(id) {
201                f(Some(node))
202            } else {
203                f(None)
204            }
205        } else {
206            f(None)
207        }
208    }
209
210    /// If the lamport of change can be calculated, return Ok, otherwise, Err
211    pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
212        for dep in change.deps.iter() {
213            match self.get_lamport(&dep) {
214                Some(lamport) => {
215                    change.lamport = change.lamport.max(lamport + 1);
216                }
217                None => return Err(()),
218            }
219        }
220        Ok(())
221    }
222
223    pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
224        let Some(node) = self.get(id) else {
225            return Frontiers::default();
226        };
227
228        let offset = id.counter - node.cnt;
229        if offset == 0 {
230            node.deps.clone()
231        } else {
232            ID::new(id.peer, node.cnt + offset - 1).into()
233        }
234    }
235
236    pub(crate) fn with_last_mut_of_peer<R>(
237        &mut self,
238        peer: PeerID,
239        f: impl FnOnce(Option<&mut AppDagNode>) -> R,
240    ) -> R {
241        self.lazy_load_last_of_peer(peer);
242        let mut binding = self.map.try_lock().unwrap();
243        let last = binding
244            .range_mut(..=ID::new(peer, Counter::MAX))
245            .next_back()
246            .map(|(_, v)| v);
247        f(last)
248    }
249
250    fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
251        if from_local {
252            assert!(self.pending_txn_node.take().is_some());
253            assert_eq!(
254                self.vv.get(&change.id.peer).copied().unwrap_or(0),
255                change.ctr_end()
256            );
257        } else {
258            let id_last = change.id_last();
259            self.frontiers
260                .update_frontiers_on_new_change(id_last, &change.deps);
261            assert!(self.pending_txn_node.is_none());
262            assert_eq!(
263                self.vv.get(&change.id.peer).copied().unwrap_or(0),
264                change.id.counter
265            );
266            self.vv.extend_to_include_last_id(id_last);
267        }
268    }
269
270    pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
271        let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
272        if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
273            return;
274        }
275
276        let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
277            panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
278        };
279
280        self.lazy_load_nodes_internal(nodes, peer, None);
281    }
282
283    fn lazy_load_nodes_internal(
284        &self,
285        nodes: Vec<AppDagNode>,
286        peer: u64,
287        map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
288    ) {
289        assert!(!nodes.is_empty());
290        let mut map_guard = None;
291        let map = map_input.unwrap_or_else(|| {
292            map_guard = Some(self.map.try_lock().unwrap());
293            map_guard.as_mut().unwrap()
294        });
295        let new_dag_start_counter_for_the_peer = nodes[0].cnt;
296        let nodes_cnt_end = nodes.last().unwrap().ctr_end();
297        let mut unparsed_vv = self.unparsed_vv.try_lock().unwrap();
298        let end_counter = unparsed_vv[&peer];
299        assert!(end_counter <= nodes_cnt_end);
300        let mut deps_on_others = Vec::new();
301        let mut break_point_set = self.unhandled_dep_points.try_lock().unwrap();
302        for mut node in nodes {
303            if node.cnt >= end_counter {
304                // skip already parsed nodes
305                break;
306            }
307
308            if node.cnt + node.len as Counter > end_counter {
309                node = node.slice(0, (end_counter - node.cnt) as usize);
310                // This is unlikely to happen
311            }
312
313            for dep in node.deps.iter() {
314                if dep.peer != peer {
315                    deps_on_others.push(dep);
316                }
317            }
318
319            // PERF: we can try to merge the node with the previous node
320            let break_point_ends: Vec<_> = break_point_set
321                .range(node.id_start()..node.id_end())
322                .map(|id| (id.counter - node.cnt) as usize + 1)
323                .collect();
324            if break_point_ends.is_empty() {
325                map.insert(node.id_start(), node);
326            } else {
327                let mut slice_start = 0;
328                for slice_end in break_point_ends.iter().copied() {
329                    let mut slice_node = node.slice(slice_start, slice_end);
330                    let inner = Arc::make_mut(&mut slice_node.inner);
331                    inner.has_succ = true;
332                    map.insert(slice_node.id_start(), slice_node);
333                    slice_start = slice_end;
334                }
335
336                let last_break_point = break_point_ends.last().copied().unwrap();
337                if last_break_point != node.len {
338                    let slice_node = node.slice(last_break_point, node.len);
339                    map.insert(slice_node.id_start(), slice_node);
340                }
341
342                for break_point in break_point_ends.into_iter() {
343                    break_point_set.remove(&node.id_start().inc(break_point as Counter - 1));
344                }
345            }
346        }
347
348        if new_dag_start_counter_for_the_peer == 0 {
349            unparsed_vv.remove(&peer);
350        } else {
351            unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
352        }
353        drop(unparsed_vv);
354        drop(break_point_set);
355        self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
356    }
357
358    fn handle_deps_break_points(
359        &self,
360        ids: impl IntoIterator<Item = ID>,
361        skip_peer: PeerID,
362        map: Option<&mut BTreeMap<ID, AppDagNode>>,
363    ) {
364        let mut map_guard = None;
365        let map = map.unwrap_or_else(|| {
366            map_guard = Some(self.map.try_lock().unwrap());
367            map_guard.as_mut().unwrap()
368        });
369        for id in ids {
370            if id.peer == skip_peer {
371                continue;
372            }
373
374            let mut handled = false;
375            let ans = self.try_with_node_mut(map, id, |target| {
376                // We don't need to break the dag node if it's not loaded yet
377                let target = target?;
378                if target.ctr_last() == id.counter {
379                    let target = Arc::make_mut(&mut target.inner);
380                    handled = true;
381                    target.has_succ = true;
382                    None
383                } else {
384                    // We need to split the target node into two part
385                    // so that we can ensure the new change depends on the
386                    // last id of a dag node.
387
388                    let new_node =
389                        target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
390                    let target = Arc::make_mut(&mut target.inner);
391                    target.len -= new_node.len;
392                    Some(new_node)
393                }
394            });
395
396            if let Some(new_node) = ans {
397                map.insert(new_node.id_start(), new_node);
398            } else if !handled {
399                self.unhandled_dep_points.try_lock().unwrap().insert(id);
400            }
401        }
402    }
403
404    fn ensure_lazy_load_node(&self, id: ID) {
405        if self.shallow_since_vv.includes_id(id) {
406            return;
407        }
408
409        loop {
410            // We need to load all the dag nodes that has the same peer and greater counter than the given `id`
411            // Because we only record the end counter of the unparsed version on `unparsed_vv`
412            let unparsed_end = {
413                let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
414                unparsed_vv.get(&id.peer).copied().unwrap_or(0)
415            };
416            if unparsed_end <= id.counter {
417                return;
418            }
419
420            let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
421            let Some(nodes) = self
422                .change_store
423                .get_dag_nodes_that_contains(last_unparsed_id)
424            else {
425                panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
426            };
427
428            self.lazy_load_nodes_internal(nodes, id.peer, None);
429        }
430    }
431
432    pub fn total_parsed_dag_node(&self) -> usize {
433        self.map.try_lock().unwrap().len()
434    }
435
436    pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
437        assert!(self.vv.is_empty());
438        *self.unparsed_vv.try_lock().unwrap() = v.vv.clone();
439        self.vv = v.vv;
440        self.frontiers = v.frontiers;
441        if let Some((vv, f)) = v.start_version {
442            if !f.is_empty() {
443                assert!(f.len() == 1);
444                let id = f.as_single().unwrap();
445                let node = self.get(id).unwrap();
446                assert!(node.cnt == id.counter);
447                self.shallow_root_frontiers_deps = node.deps.clone();
448            }
449            self.shallow_since_frontiers = f;
450            self.shallow_since_vv = ImVersionVector::from_vv(&vv);
451        }
452    }
453
454    /// This method is slow and should only be used for debugging and testing.
455    ///
456    /// It will check the following properties:
457    ///
458    /// 1. Counter is continuous
459    /// 2. A node always depends of the last ids of other nodes
460    /// 3. Lamport is correctly calculated
461    /// 4. VV for each node is correctly calculated
462    /// 5. Frontiers are correctly calculated
463    #[instrument(skip(self))]
464    pub fn check_dag_correctness(&self) {
465        {
466            // parse all nodes
467            let unparsed_vv = self.unparsed_vv.try_lock().unwrap().clone();
468            for (peer, cnt) in unparsed_vv.iter() {
469                if *cnt == 0 {
470                    continue;
471                }
472
473                let mut end_cnt = *cnt;
474                let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
475                while end_cnt > init_counter {
476                    let cnt = end_cnt - 1;
477                    self.ensure_lazy_load_node(ID::new(*peer, cnt));
478                    end_cnt = self
479                        .unparsed_vv
480                        .try_lock()
481                        .unwrap()
482                        .get(peer)
483                        .copied()
484                        .unwrap_or(0);
485                }
486            }
487
488            self.unparsed_vv.try_lock().unwrap().clear();
489        }
490        {
491            // check property 1: Counter is continuous
492            let map = self.map.try_lock().unwrap();
493            let mut last_end_id = ID::new(0, 0);
494            for (&id, node) in map.iter() {
495                let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
496                if id.peer == last_end_id.peer {
497                    assert!(id.counter == last_end_id.counter);
498                } else {
499                    assert_eq!(id.counter, init_counter);
500                }
501
502                last_end_id = id.inc(node.len as Counter);
503            }
504        }
505        {
506            // check property 2: A node always depends of the last ids of other nodes
507            let map = self.map.try_lock().unwrap();
508            check_always_dep_on_last_id(&map);
509        }
510        {
511            // check property 3: Lamport is correctly calculated
512            let map = self.map.try_lock().unwrap();
513            'outer: for (_, node) in map.iter() {
514                let mut this_lamport = 0;
515                for dep in node.deps.iter() {
516                    if self.shallow_since_vv.includes_id(dep) {
517                        continue 'outer;
518                    }
519
520                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
521                    this_lamport = this_lamport.max(dep_node.lamport_end());
522                }
523
524                assert_eq!(this_lamport, node.lamport);
525            }
526        }
527        {
528            // check property 4: VV for each node is correctly calculated
529            let map = self.map.try_lock().unwrap().clone();
530            'outer: for (_, node) in map.iter() {
531                let actual_vv = self.ensure_vv_for(node);
532                let mut expected_vv = ImVersionVector::default();
533                for dep in node.deps.iter() {
534                    if self.shallow_since_vv.includes_id(dep) {
535                        continue 'outer;
536                    }
537
538                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
539                    self.ensure_vv_for(dep_node);
540                    expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
541                    expected_vv.extend_to_include_last_id(dep);
542                }
543
544                assert_eq!(actual_vv, expected_vv);
545            }
546        }
547        {
548            // check property 5: Frontiers are correctly calculated
549            let mut maybe_frontiers = FxHashSet::default();
550            let map = self.map.try_lock().unwrap();
551            for (_, node) in map.iter() {
552                maybe_frontiers.insert(node.id_last());
553            }
554
555            for (_, node) in map.iter() {
556                for dep in node.deps.iter() {
557                    maybe_frontiers.remove(&dep);
558                }
559            }
560
561            let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
562            assert_eq!(maybe_frontiers, frontiers);
563        }
564    }
565
566    pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
567        for id in deps.iter() {
568            if !self.vv.includes_id(id) {
569                return false;
570            }
571        }
572
573        if self.is_before_shallow_root(deps) {
574            return false;
575        }
576
577        true
578    }
579
580    pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
581        trace!("Is on shallow history? deps={:?}", deps);
582        trace!("self.shallow_since_vv {:?}", &self.shallow_since_vv);
583        trace!("self.shallow_frontiers {:?}", &self.shallow_since_frontiers);
584
585        if self.shallow_since_vv.is_empty() {
586            return false;
587        }
588
589        if deps.is_empty() {
590            return true;
591        }
592
593        if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
594            return true;
595        }
596
597        if deps
598            .iter()
599            .any(|x| self.shallow_since_frontiers.contains(&x))
600        {
601            return deps != &self.shallow_since_frontiers;
602        }
603
604        false
605    }
606
607    /// Travel the ancestors of the given id, and call the callback for each node
608    ///
609    /// It will travel the ancestors in the reverse order (from the greatest lamport to the smallest)
610    pub(crate) fn travel_ancestors(
611        &self,
612        id: ID,
613        f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
614    ) {
615        struct PendingNode(AppDagNode);
616        impl PartialEq for PendingNode {
617            fn eq(&self, other: &Self) -> bool {
618                self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
619            }
620        }
621        impl Eq for PendingNode {}
622        impl PartialOrd for PendingNode {
623            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
624                Some(self.cmp(other))
625            }
626        }
627        impl Ord for PendingNode {
628            fn cmp(&self, other: &Self) -> Ordering {
629                self.0
630                    .lamport_last()
631                    .cmp(&other.0.lamport_last())
632                    .then_with(|| self.0.peer.cmp(&other.0.peer))
633            }
634        }
635
636        let mut visited = FxHashSet::default();
637        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
638        pending.push(PendingNode(self.get(id).unwrap()));
639        while let Some(PendingNode(node)) = pending.pop() {
640            if f(&node).is_break() {
641                break;
642            }
643
644            for dep in node.deps.iter() {
645                let Some(dep_node) = self.get(dep) else {
646                    continue;
647                };
648                if visited.contains(&dep_node.id_start()) {
649                    continue;
650                }
651
652                visited.insert(dep_node.id_start());
653                pending.push(PendingNode(dep_node));
654            }
655        }
656    }
657
658    pub(crate) fn update_version_on_new_local_op(
659        &mut self,
660        deps: &Frontiers,
661        start_id: ID,
662        start_lamport: Lamport,
663        len: usize,
664    ) {
665        let last_id = start_id.inc(len as Counter - 1);
666        // PERF: we can cache this last_id - this is a hot path
667        self.vv.set_last(last_id);
668        self.frontiers.update_frontiers_on_new_change(last_id, deps);
669        match &mut self.pending_txn_node {
670            Some(node) => {
671                assert!(
672                    node.peer == start_id.peer
673                        && node.cnt + node.len as Counter == start_id.counter
674                        && deps.len() == 1
675                        && deps.as_single().unwrap().peer == start_id.peer
676                );
677                let inner = Arc::make_mut(&mut node.inner);
678                inner.len += len;
679            }
680            None => {
681                let node = AppDagNode {
682                    inner: Arc::new(AppDagNodeInner {
683                        peer: start_id.peer,
684                        cnt: start_id.counter,
685                        lamport: start_lamport,
686                        deps: deps.clone(),
687                        vv: OnceCell::new(),
688                        has_succ: false,
689                        len,
690                    }),
691                };
692                self.pending_txn_node = Some(node);
693            }
694        }
695    }
696}
697
698fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
699    for (_, node) in map.iter() {
700        for dep in node.deps.iter() {
701            let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
702                // It's shallow
703                continue;
704            };
705            assert_eq!(dep_node.id_start(), dep_id);
706            if dep_node.contains_id(dep) {
707                assert_eq!(dep_node.id_last(), dep);
708            }
709        }
710    }
711}
712
713impl HasIndex for AppDagNode {
714    type Int = Counter;
715    fn get_start_index(&self) -> Self::Int {
716        self.cnt
717    }
718
719    fn get_end_index(&self) -> Self::Int {
720        self.cnt + self.len as Counter
721    }
722}
723
724impl Sliceable for AppDagNode {
725    fn slice(&self, from: usize, to: usize) -> Self {
726        AppDagNodeInner {
727            peer: self.peer,
728            cnt: self.cnt + from as Counter,
729            lamport: self.lamport + from as Lamport,
730            deps: if from > 0 {
731                Frontiers::from_id(self.id_start().inc(from as Counter - 1))
732            } else {
733                self.deps.clone()
734            },
735            vv: if let Some(vv) = self.vv.get() {
736                let mut new = vv.clone();
737                new.insert(self.peer, self.cnt + from as Counter);
738                OnceCell::with_value(new)
739            } else {
740                OnceCell::new()
741            },
742            has_succ: if to == self.len { self.has_succ } else { true },
743            len: to - from,
744        }
745        .into()
746    }
747}
748
749impl HasId for AppDagNode {
750    fn id_start(&self) -> ID {
751        ID {
752            peer: self.peer,
753            counter: self.cnt,
754        }
755    }
756}
757
758impl HasCounter for AppDagNode {
759    fn ctr_start(&self) -> Counter {
760        self.cnt
761    }
762}
763
764impl HasLength for AppDagNode {
765    fn atom_len(&self) -> usize {
766        self.len
767    }
768
769    fn content_len(&self) -> usize {
770        self.len
771    }
772}
773
774impl Mergable for AppDagNode {
775    fn is_mergable(&self, other: &Self, _conf: &()) -> bool
776    where
777        Self: Sized,
778    {
779        !self.has_succ
780            && self.peer == other.peer
781            && self.cnt + self.len as Counter == other.cnt
782            && other.deps.len() == 1
783            && self.lamport + self.len as Lamport == other.lamport
784            && other.deps.as_single().unwrap().peer == self.peer
785    }
786
787    fn merge(&mut self, other: &Self, _conf: &())
788    where
789        Self: Sized,
790    {
791        assert_eq!(
792            other.deps.as_single().unwrap().counter,
793            self.cnt + self.len as Counter - 1
794        );
795        let this = Arc::make_mut(&mut self.inner);
796        this.len += other.len;
797        this.has_succ = other.has_succ;
798    }
799}
800
801impl HasLamport for AppDagNode {
802    fn lamport(&self) -> Lamport {
803        self.lamport
804    }
805}
806
807impl DagNode for AppDagNode {
808    fn deps(&self) -> &Frontiers {
809        &self.deps
810    }
811}
812
813impl Dag for AppDag {
814    type Node = AppDagNode;
815
816    fn frontier(&self) -> &Frontiers {
817        &self.frontiers
818    }
819
820    fn get(&self, id: ID) -> Option<Self::Node> {
821        self.ensure_lazy_load_node(id);
822        let binding = self.map.try_lock().unwrap();
823        if let Some(x) = binding.range(..=id).next_back() {
824            if x.1.contains_id(id) {
825                // PERF: do we need to optimize clone like this?
826                // by adding another layer of Arc?
827                return Some(x.1.clone());
828            }
829        }
830
831        if let Some(node) = &self.pending_txn_node {
832            if node.peer == id.peer && node.cnt <= id.counter {
833                assert!(node.cnt + node.len as Counter > id.counter);
834                return Some(node.clone());
835            }
836        }
837
838        None
839    }
840
841    fn vv(&self) -> &VersionVector {
842        &self.vv
843    }
844
845    fn contains(&self, id: ID) -> bool {
846        self.vv.includes_id(id)
847    }
848}
849
850impl AppDag {
851    // PERF: this may be painfully slow
852    /// get the version vector for a certain op.
853    /// It's the version when the op is applied
854    pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
855        self.get(id).map(|x| {
856            let mut vv = self.ensure_vv_for(&x);
857            vv.insert(id.peer, id.counter + 1);
858            vv
859        })
860    }
861
862    pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
863        if target_node.vv.get().is_none() {
864            // (node, has_processed_children)
865            let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
866            while let Some(top_node) = stack.pop() {
867                let mut ans_vv = ImVersionVector::default();
868                // trace!("node={:?} {:?}", &top_node, has_all_deps_met);
869                // trace!("deps={:?}", &top_node.deps);
870                // trace!("this.shallow_f_deps={:?}", &self.shallow_frontiers_deps);
871                // trace!("this.vv={:?}", &self.vv);
872                // trace!("this.unparsed_vv={:?}", &self.unparsed_vv);
873                // trace!("this.shallow_since_vv={:?}", &self.shallow_since_vv);
874                if top_node.deps == self.shallow_root_frontiers_deps {
875                    for (&p, &c) in self.shallow_since_vv.iter() {
876                        ans_vv.insert(p, c);
877                    }
878                } else {
879                    let mut all_deps_processed = true;
880                    for id in top_node.deps.iter() {
881                        let node = self.get(id).expect("deps should be in the dag");
882                        if node.vv.get().is_none() {
883                            // assert!(!has_all_deps_met);
884                            if all_deps_processed {
885                                stack.push(top_node.clone());
886                            }
887                            all_deps_processed = false;
888                            stack.push(node);
889                            continue;
890                        };
891                    }
892
893                    if !all_deps_processed {
894                        continue;
895                    }
896
897                    for id in top_node.deps.iter() {
898                        let node = self.get(id).expect("deps should be in the dag");
899                        let dep_vv = node.vv.get().unwrap();
900                        if ans_vv.is_empty() {
901                            ans_vv = dep_vv.clone();
902                        } else {
903                            ans_vv.extend_to_include_vv(dep_vv.iter());
904                        }
905
906                        ans_vv.insert(node.peer, node.ctr_end());
907                    }
908                }
909
910                // trace!("ans_vv={:?}", &ans_vv);
911                top_node.vv.set(ans_vv.clone()).unwrap();
912            }
913        }
914
915        target_node.vv.get().unwrap().clone()
916    }
917
918    /// Compare the causal order of two versions.
919    /// If None, two versions are concurrent to each other
920    pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
921        if a.peer == b.peer {
922            return Some(a.counter.cmp(&b.counter));
923        }
924
925        let a = self.get_vv(a).unwrap();
926        let b = self.get_vv(b).unwrap();
927        a.partial_cmp(&b)
928    }
929
930    pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
931        self.get(*id).and_then(|node| {
932            assert!(id.counter >= node.cnt);
933            if node.cnt + node.len as Counter > id.counter {
934                Some(node.lamport + (id.counter - node.cnt) as Lamport)
935            } else {
936                None
937            }
938        })
939    }
940
941    pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
942        let mut lamport = 0;
943        for id in deps.iter() {
944            let x = self.get_lamport(&id)?;
945            lamport = lamport.max(x + 1);
946        }
947
948        Some(lamport)
949    }
950
951    /// Convert a frontiers to a version vector
952    ///
953    /// If the frontiers version is not found in the dag, return None
954    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
955        if frontiers == &self.shallow_root_frontiers_deps {
956            let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
957            return Some(vv);
958        }
959
960        let mut vv: VersionVector = Default::default();
961        for id in frontiers.iter() {
962            let x = self.get(id)?;
963            let target_vv = self.ensure_vv_for(&x);
964            vv.extend_to_include_vv(target_vv.iter());
965            vv.extend_to_include_last_id(id);
966        }
967
968        Some(vv)
969    }
970
971    #[allow(unused)]
972    pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
973        if frontiers.is_empty() {
974            return Default::default();
975        }
976
977        let mut iter = frontiers.iter();
978        let mut vv = {
979            let id = iter.next().unwrap();
980            let Some(x) = self.get(id) else {
981                unreachable!()
982            };
983            let mut vv = self.ensure_vv_for(&x);
984            vv.extend_to_include_last_id(id);
985            vv
986        };
987
988        for id in iter {
989            let Some(x) = self.get(id) else {
990                unreachable!()
991            };
992            let x = self.ensure_vv_for(&x);
993            vv.extend_to_include_vv(x.iter());
994            vv.extend_to_include_last_id(id);
995        }
996
997        vv
998    }
999
1000    pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1001        if vv.is_empty() {
1002            return Default::default();
1003        }
1004
1005        let this = vv;
1006        let last_ids: Frontiers = this
1007            .iter()
1008            .filter_map(|(client_id, cnt)| {
1009                if *cnt == 0 {
1010                    return None;
1011                }
1012
1013                if self
1014                    .shallow_since_vv
1015                    .includes_id(ID::new(*client_id, *cnt - 1))
1016                {
1017                    return None;
1018                }
1019
1020                Some(ID::new(*client_id, cnt - 1))
1021            })
1022            .collect();
1023
1024        if last_ids.is_empty() {
1025            return self.shallow_since_frontiers.clone();
1026        }
1027
1028        shrink_frontiers(&last_ids, self).unwrap()
1029    }
1030
1031    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1032        if vv.is_empty() {
1033            return Default::default();
1034        }
1035
1036        let this = vv;
1037        let last_ids: Frontiers = this
1038            .iter()
1039            .filter_map(|(client_id, cnt)| {
1040                if *cnt == 0 {
1041                    return None;
1042                }
1043
1044                if self
1045                    .shallow_since_vv
1046                    .includes_id(ID::new(*client_id, *cnt - 1))
1047                {
1048                    return None;
1049                }
1050
1051                Some(ID::new(*client_id, cnt - 1))
1052            })
1053            .collect();
1054
1055        if last_ids.is_empty() {
1056            return self.shallow_since_frontiers.clone();
1057        }
1058
1059        shrink_frontiers(&last_ids, self).unwrap()
1060    }
1061
1062    pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1063        if frontiers.is_empty() {
1064            return 0;
1065        }
1066
1067        let mut iter = frontiers.iter();
1068        let mut lamport = {
1069            let id = iter.next().unwrap();
1070            let Some(x) = self.get(id) else {
1071                unreachable!()
1072            };
1073            assert!(id.counter >= x.cnt);
1074            (id.counter - x.cnt) as Lamport + x.lamport + 1
1075        };
1076
1077        for id in iter {
1078            let Some(x) = self.get(id) else {
1079                unreachable!()
1080            };
1081            assert!(id.counter >= x.cnt);
1082            lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1083        }
1084
1085        lamport
1086    }
1087
1088    pub fn get_frontiers(&self) -> &Frontiers {
1089        &self.frontiers
1090    }
1091
1092    /// - Ordering::Less means self is less than target or parallel
1093    /// - Ordering::Equal means versions equal
1094    /// - Ordering::Greater means self's version is greater than target
1095    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1096        if &self.frontiers == other {
1097            Ordering::Equal
1098        } else if other.iter().all(|id| self.vv.includes_id(id)) {
1099            Ordering::Greater
1100        } else {
1101            Ordering::Less
1102        }
1103    }
1104
1105    // PERF
1106    /// Compare two [Frontiers] causally.
1107    ///
1108    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1109    pub fn cmp_frontiers(
1110        &self,
1111        a: &Frontiers,
1112        b: &Frontiers,
1113    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1114        let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1115        let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1116        Ok(a.partial_cmp(&b))
1117    }
1118}
1119
1120#[derive(Debug, PartialEq, Eq)]
1121pub struct FrontiersNotIncluded;
1122impl Display for FrontiersNotIncluded {
1123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1124        f.write_str("The given Frontiers are not included by the doc")
1125    }
1126}