Skip to main content

loro_internal/oplog/
loro_dag.rs

1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::sync::Mutex;
6use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
7use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
8use once_cell::sync::OnceCell;
9use rle::{HasIndex, HasLength, Mergable, Sliceable};
10use rustc_hash::FxHashSet;
11use smallvec::SmallVec;
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
14use std::fmt::Display;
15use std::ops::{ControlFlow, Deref};
16use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
17use std::sync::Arc;
18use tracing::instrument;
19
20use super::change_store::BatchDecodeInfo;
21use super::ChangeStore;
22
23/// [AppDag] maintains the causal graph of the app.
24/// It's faster to answer the question like what's the LCA version
25#[derive(Debug)]
26pub struct AppDag {
27    change_store: ChangeStore,
28    /// It only contains nodes that are already parsed.
29    ///
30    /// - All the unparsed op ids must be included in `unparsed_vv`.
31    /// - All the parsed and unparsed op ids must be included in `vv`.
32    map: Mutex<BTreeMap<ID, AppDagNode>>,
33    /// The latest known frontiers
34    frontiers: Frontiers,
35    /// The latest known version vector
36    vv: VersionVector,
37    /// The earliest known frontiers
38    shallow_since_frontiers: Frontiers,
39    /// The deps of the shallow frontiers
40    shallow_root_frontiers_deps: Frontiers,
41    /// The vv of shallow_frontiers_deps
42    shallow_since_vv: ImVersionVector,
43    /// Ops included in the version vector but not parsed yet
44    ///
45    /// # Invariants
46    ///
47    /// - `vv` >= `unparsed_vv`
48    unparsed_vv: Mutex<VersionVector>,
49    /// It's a set of points which are deps of some parsed ops.
50    /// But the ops in this set are not parsed yet. When they are parsed,
51    /// we need to make sure it breaks at the given point.
52    unhandled_dep_points: Mutex<BTreeSet<ID>>,
53    pending_txn_node: Option<AppDagNode>,
54    import_rollback_has_journal: AtomicBool,
55    import_rollback: Mutex<Option<AppDagRollback>>,
56}
57
58#[derive(Debug)]
59pub(crate) struct AppDagRollback {
60    frontiers: Frontiers,
61    vv: VersionVector,
62    unparsed_vv: VersionVector,
63    shallow_since_frontiers: Frontiers,
64    shallow_root_frontiers_deps: Frontiers,
65    shallow_since_vv: ImVersionVector,
66    pending_txn_node: Option<AppDagNode>,
67    map_entries_before_mutation: BTreeMap<ID, Option<AppDagNode>>,
68    unhandled_dep_point_log: Vec<UnhandledDepPointLog>,
69}
70
71#[derive(Debug)]
72enum UnhandledDepPointLog {
73    Added(ID),
74    Removed(ID),
75}
76
77#[derive(Debug, Clone)]
78pub struct AppDagNode {
79    inner: Arc<AppDagNodeInner>,
80}
81
82impl Deref for AppDagNode {
83    type Target = AppDagNodeInner;
84
85    fn deref(&self) -> &Self::Target {
86        &self.inner
87    }
88}
89
90impl AppDagNode {
91    pub fn new(inner: AppDagNodeInner) -> Self {
92        Self {
93            inner: Arc::new(inner),
94        }
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct AppDagNodeInner {
100    pub(crate) peer: PeerID,
101    pub(crate) cnt: Counter,
102    pub(crate) lamport: Lamport,
103    pub(crate) deps: Frontiers,
104    pub(crate) vv: OnceCell<ImVersionVector>,
105    /// A flag indicating whether any other nodes from a different peer depend on this node.
106    /// The calculation of frontiers is based on the property that a node does not depend
107    /// on the middle of other nodes.
108    pub(crate) has_succ: bool,
109    pub(crate) len: usize,
110}
111
112impl From<AppDagNodeInner> for AppDagNode {
113    fn from(inner: AppDagNodeInner) -> Self {
114        AppDagNode {
115            inner: Arc::new(inner),
116        }
117    }
118}
119
120impl AppDag {
121    pub(super) fn new(change_store: ChangeStore) -> Self {
122        Self {
123            change_store,
124            map: Mutex::new(BTreeMap::new()),
125            frontiers: Frontiers::default(),
126            vv: VersionVector::default(),
127            unparsed_vv: Mutex::new(VersionVector::default()),
128            unhandled_dep_points: Mutex::new(BTreeSet::new()),
129            shallow_since_frontiers: Default::default(),
130            shallow_root_frontiers_deps: Default::default(),
131            shallow_since_vv: Default::default(),
132            pending_txn_node: None,
133            import_rollback_has_journal: AtomicBool::new(false),
134            import_rollback: Mutex::new(None),
135        }
136    }
137
138    pub fn frontiers(&self) -> &Frontiers {
139        &self.frontiers
140    }
141
142    pub fn vv(&self) -> &VersionVector {
143        &self.vv
144    }
145
146    pub fn shallow_since_vv(&self) -> &ImVersionVector {
147        &self.shallow_since_vv
148    }
149
150    pub fn shallow_since_frontiers(&self) -> &Frontiers {
151        &self.shallow_since_frontiers
152    }
153
154    pub(crate) fn begin_import_rollback(&mut self) {
155        let old_vv_is_empty = self.vv.is_empty();
156        let mut rollback = self.import_rollback.lock();
157        debug_assert!(rollback.is_none());
158        *rollback = Some(AppDagRollback {
159            frontiers: self.frontiers.clone(),
160            vv: self.vv.clone(),
161            unparsed_vv: self.unparsed_vv.lock().clone(),
162            shallow_since_frontiers: self.shallow_since_frontiers.clone(),
163            shallow_root_frontiers_deps: self.shallow_root_frontiers_deps.clone(),
164            shallow_since_vv: self.shallow_since_vv.clone(),
165            pending_txn_node: self.pending_txn_node.clone(),
166            map_entries_before_mutation: BTreeMap::new(),
167            unhandled_dep_point_log: Vec::new(),
168        });
169        self.import_rollback_has_journal
170            .store(!old_vv_is_empty, AtomicOrdering::Relaxed);
171    }
172
173    pub(crate) fn commit_import_rollback(&mut self) {
174        self.import_rollback_has_journal
175            .store(false, AtomicOrdering::Relaxed);
176        *self.import_rollback.lock() = None;
177    }
178
179    pub(crate) fn rollback_import(&mut self) {
180        self.import_rollback_has_journal
181            .store(false, AtomicOrdering::Relaxed);
182        let Some(checkpoint) = self.import_rollback.lock().take() else {
183            return;
184        };
185
186        let imported_spans = self.vv.sub_iter(&checkpoint.vv).collect::<Vec<_>>();
187        let mut map = self.map.lock();
188        for span in imported_spans {
189            let start = ID::new(span.peer, span.counter.start);
190            let end = ID::new(span.peer, span.counter.end);
191            let keys = map.range(start..end).map(|(id, _)| *id).collect::<Vec<_>>();
192            for key in keys {
193                map.remove(&key);
194            }
195        }
196
197        for (id, node) in checkpoint.map_entries_before_mutation {
198            if let Some(node) = node {
199                map.insert(id, node);
200            } else {
201                map.remove(&id);
202            }
203        }
204        drop(map);
205
206        self.frontiers = checkpoint.frontiers;
207        self.vv = checkpoint.vv.clone();
208        self.shallow_since_frontiers = checkpoint.shallow_since_frontiers;
209        self.shallow_root_frontiers_deps = checkpoint.shallow_root_frontiers_deps;
210        self.shallow_since_vv = checkpoint.shallow_since_vv;
211        *self.unparsed_vv.lock() = checkpoint.unparsed_vv;
212
213        let mut unhandled_dep_points = self.unhandled_dep_points.lock();
214        if checkpoint.vv.is_empty() {
215            unhandled_dep_points.clear();
216        }
217        for item in checkpoint.unhandled_dep_point_log.into_iter().rev() {
218            match item {
219                UnhandledDepPointLog::Added(id) => {
220                    unhandled_dep_points.remove(&id);
221                }
222                UnhandledDepPointLog::Removed(id) => {
223                    unhandled_dep_points.insert(id);
224                }
225            }
226        }
227        drop(unhandled_dep_points);
228
229        self.pending_txn_node = checkpoint.pending_txn_node;
230    }
231
232    fn record_map_entry_before_mutation(&self, map: &BTreeMap<ID, AppDagNode>, id: ID) {
233        if !self
234            .import_rollback_has_journal
235            .load(AtomicOrdering::Relaxed)
236        {
237            return;
238        }
239
240        self.record_map_entry_before_replacement(id, map.get(&id).cloned());
241    }
242
243    fn record_map_entry_before_replacement(&self, id: ID, old: Option<AppDagNode>) {
244        if !self
245            .import_rollback_has_journal
246            .load(AtomicOrdering::Relaxed)
247        {
248            return;
249        }
250
251        let mut rollback = self.import_rollback.lock();
252        let Some(rollback) = rollback.as_mut() else {
253            return;
254        };
255
256        let old_end = rollback.vv.get(&id.peer).copied().unwrap_or(0);
257        if id.counter >= old_end {
258            return;
259        }
260
261        rollback
262            .map_entries_before_mutation
263            .entry(id)
264            .or_insert(old);
265    }
266
267    fn record_unhandled_dep_point_added(&self, id: ID) {
268        if !self
269            .import_rollback_has_journal
270            .load(AtomicOrdering::Relaxed)
271        {
272            return;
273        }
274
275        if let Some(rollback) = self.import_rollback.lock().as_mut() {
276            rollback
277                .unhandled_dep_point_log
278                .push(UnhandledDepPointLog::Added(id));
279        }
280    }
281
282    fn record_unhandled_dep_point_removed(&self, id: ID) {
283        if !self
284            .import_rollback_has_journal
285            .load(AtomicOrdering::Relaxed)
286        {
287            return;
288        }
289
290        if let Some(rollback) = self.import_rollback.lock().as_mut() {
291            rollback
292                .unhandled_dep_point_log
293                .push(UnhandledDepPointLog::Removed(id));
294        }
295    }
296
297    pub fn is_empty(&self) -> bool {
298        self.vv.is_empty()
299    }
300
301    #[tracing::instrument(skip_all, name = "handle_new_change")]
302    pub(super) fn handle_new_change(
303        &mut self,
304        change: &Change,
305        from_local: bool,
306        rollback_old_vv: Option<&VersionVector>,
307    ) {
308        let len = change.content_len();
309        self.update_version_on_new_change(change, from_local);
310        #[cfg(debug_assertions)]
311        {
312            let unhandled_dep_points = self.unhandled_dep_points.lock();
313            let c = unhandled_dep_points
314                .range(change.id_start()..change.id_end())
315                .count();
316            assert!(c == 0);
317        }
318
319        let mut inserted = false;
320        if change.deps_on_self() {
321            let record_old_node_before_merge = rollback_old_vv
322                .and_then(|vv| vv.get(&change.id.peer).copied())
323                .filter(|old_end| *old_end == change.id.counter);
324            // We may not need to push new element to dag because it only depends on itself
325            inserted =
326                self.with_last_mut_of_peer(change.id.peer, record_old_node_before_merge, |last| {
327                    let (_, last) = last.unwrap();
328                    if last.has_succ {
329                        // Don't merge the node if there are other nodes depending on it
330                        return false;
331                    }
332
333                    assert_eq!(last.peer, change.id.peer, "peer id is not the same");
334                    assert_eq!(
335                        last.cnt + last.len as Counter,
336                        change.id.counter,
337                        "counter is not continuous"
338                    );
339                    assert_eq!(
340                        last.lamport + last.len as Lamport,
341                        change.lamport,
342                        "lamport is not continuous"
343                    );
344                    let last = Arc::make_mut(&mut last.inner);
345                    last.len = (change.id.counter - last.cnt) as usize + len;
346                    last.has_succ = false;
347                    true
348                });
349        }
350
351        if !inserted {
352            let node: AppDagNode = AppDagNodeInner {
353                vv: OnceCell::new(),
354                peer: change.id.peer,
355                cnt: change.id.counter,
356                lamport: change.lamport,
357                deps: change.deps.clone(),
358                has_succ: false,
359                len,
360            }
361            .into();
362
363            let mut map = self.map.lock();
364            map.insert(node.id_start(), node);
365            self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
366        }
367    }
368
369    fn try_with_node_mut<R>(
370        &self,
371        map: &mut BTreeMap<ID, AppDagNode>,
372        id: ID,
373        f: impl FnOnce(Option<(ID, &mut AppDagNode)>) -> R,
374    ) -> R {
375        let x = map.range_mut(..=id).next_back();
376        if let Some((node_id, node)) = x {
377            if node.contains_id(id) {
378                f(Some((*node_id, node)))
379            } else {
380                f(None)
381            }
382        } else {
383            f(None)
384        }
385    }
386
387    /// If the lamport of change can be calculated, return Ok, otherwise, Err
388    pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
389        for dep in change.deps.iter() {
390            match self.get_lamport(&dep) {
391                Some(lamport) => {
392                    change.lamport = change.lamport.max(lamport + 1);
393                }
394                None => return Err(()),
395            }
396        }
397        Ok(())
398    }
399
400    pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
401        let Some(node) = self.get(id) else {
402            return Frontiers::default();
403        };
404
405        let offset = id.counter - node.cnt;
406        if offset == 0 {
407            node.deps.clone()
408        } else {
409            ID::new(id.peer, node.cnt + offset - 1).into()
410        }
411    }
412
413    pub(crate) fn with_last_mut_of_peer<R>(
414        &mut self,
415        peer: PeerID,
416        record_if_before: Option<Counter>,
417        f: impl FnOnce(Option<(ID, &mut AppDagNode)>) -> R,
418    ) -> R {
419        self.lazy_load_last_of_peer(peer);
420        let mut binding = self.map.lock();
421        let last = binding
422            .range_mut(..=ID::new(peer, Counter::MAX))
423            .next_back()
424            .map(|(id, v)| {
425                if record_if_before.is_some_and(|old_end| id.counter < old_end) {
426                    self.record_map_entry_before_replacement(*id, Some(v.clone()));
427                }
428                (*id, v)
429            });
430        f(last)
431    }
432
433    fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
434        if from_local {
435            assert!(self.pending_txn_node.take().is_some());
436            assert_eq!(
437                self.vv.get(&change.id.peer).copied().unwrap_or(0),
438                change.ctr_end()
439            );
440        } else {
441            let id_last = change.id_last();
442            self.frontiers
443                .update_frontiers_on_new_change(id_last, &change.deps);
444            assert!(self.pending_txn_node.is_none());
445            assert_eq!(
446                self.vv.get(&change.id.peer).copied().unwrap_or(0),
447                change.id.counter
448            );
449            self.vv.extend_to_include_last_id(id_last);
450        }
451    }
452
453    pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
454        let unparsed_vv = self.unparsed_vv.lock();
455        if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
456            return;
457        }
458
459        let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
460            panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
461        };
462
463        self.lazy_load_nodes_internal(nodes, peer, None);
464    }
465
466    fn lazy_load_nodes_internal(
467        &self,
468        nodes: Vec<AppDagNode>,
469        peer: u64,
470        map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
471    ) {
472        assert!(!nodes.is_empty());
473        let mut map_guard = None;
474        let map = map_input.unwrap_or_else(|| {
475            map_guard = Some(self.map.lock());
476            map_guard.as_mut().unwrap()
477        });
478        let new_dag_start_counter_for_the_peer = nodes[0].cnt;
479        let nodes_cnt_end = nodes.last().unwrap().ctr_end();
480        let mut unparsed_vv = self.unparsed_vv.lock();
481        let end_counter = unparsed_vv[&peer];
482        assert!(end_counter <= nodes_cnt_end);
483        let mut deps_on_others = Vec::new();
484        let mut break_point_set = self.unhandled_dep_points.lock();
485        for mut node in nodes {
486            if node.cnt >= end_counter {
487                // skip already parsed nodes
488                break;
489            }
490
491            if node.cnt + node.len as Counter > end_counter {
492                node = node.slice(0, (end_counter - node.cnt) as usize);
493                // This is unlikely to happen
494            }
495
496            for dep in node.deps.iter() {
497                if dep.peer != peer {
498                    deps_on_others.push(dep);
499                }
500            }
501
502            // PERF: we can try to merge the node with the previous node
503            let break_point_ends: Vec<_> = break_point_set
504                .range(node.id_start()..node.id_end())
505                .map(|id| (id.counter - node.cnt) as usize + 1)
506                .collect();
507            if break_point_ends.is_empty() {
508                self.record_map_entry_before_mutation(map, node.id_start());
509                map.insert(node.id_start(), node);
510            } else {
511                let mut slice_start = 0;
512                for slice_end in break_point_ends.iter().copied() {
513                    let mut slice_node = node.slice(slice_start, slice_end);
514                    let inner = Arc::make_mut(&mut slice_node.inner);
515                    inner.has_succ = true;
516                    self.record_map_entry_before_mutation(map, slice_node.id_start());
517                    map.insert(slice_node.id_start(), slice_node);
518                    slice_start = slice_end;
519                }
520
521                let last_break_point = break_point_ends.last().copied().unwrap();
522                if last_break_point != node.len {
523                    let slice_node = node.slice(last_break_point, node.len);
524                    self.record_map_entry_before_mutation(map, slice_node.id_start());
525                    map.insert(slice_node.id_start(), slice_node);
526                }
527
528                for break_point in break_point_ends.into_iter() {
529                    let id = node.id_start().inc(break_point as Counter - 1);
530                    if break_point_set.remove(&id) {
531                        self.record_unhandled_dep_point_removed(id);
532                    }
533                }
534            }
535        }
536
537        if new_dag_start_counter_for_the_peer == 0 {
538            unparsed_vv.remove(&peer);
539        } else {
540            unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
541        }
542        drop(unparsed_vv);
543        drop(break_point_set);
544        self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
545    }
546
547    fn handle_deps_break_points(
548        &self,
549        ids: impl IntoIterator<Item = ID>,
550        skip_peer: PeerID,
551        map: Option<&mut BTreeMap<ID, AppDagNode>>,
552    ) {
553        let mut map_guard = None;
554        let map = map.unwrap_or_else(|| {
555            map_guard = Some(self.map.lock());
556            map_guard.as_mut().unwrap()
557        });
558        for id in ids {
559            if id.peer == skip_peer {
560                continue;
561            }
562
563            let mut handled = false;
564            let ans = self.try_with_node_mut(map, id, |target| {
565                // We don't need to break the dag node if it's not loaded yet
566                let (target_id, target) = target?;
567                if target.ctr_last() == id.counter {
568                    self.record_map_entry_before_replacement(target_id, Some(target.clone()));
569                    let target = Arc::make_mut(&mut target.inner);
570                    handled = true;
571                    target.has_succ = true;
572                    None
573                } else {
574                    // We need to split the target node into two part
575                    // so that we can ensure the new change depends on the
576                    // last id of a dag node.
577
578                    let new_node =
579                        target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
580                    self.record_map_entry_before_replacement(target_id, Some(target.clone()));
581                    let target = Arc::make_mut(&mut target.inner);
582                    target.len -= new_node.len;
583                    Some(new_node)
584                }
585            });
586
587            if let Some(new_node) = ans {
588                self.record_map_entry_before_mutation(map, new_node.id_start());
589                map.insert(new_node.id_start(), new_node);
590            } else if !handled {
591                let mut unhandled_dep_points = self.unhandled_dep_points.lock();
592                if unhandled_dep_points.insert(id) {
593                    self.record_unhandled_dep_point_added(id);
594                }
595            }
596        }
597    }
598
599    fn ensure_lazy_load_node(&self, id: ID) {
600        if self.shallow_since_vv.includes_id(id) {
601            return;
602        }
603
604        loop {
605            // We need to load all the dag nodes that has the same peer and greater counter than the given `id`
606            // Because we only record the end counter of the unparsed version on `unparsed_vv`
607            let unparsed_end = {
608                let unparsed_vv = self.unparsed_vv.lock();
609                unparsed_vv.get(&id.peer).copied().unwrap_or(0)
610            };
611            if unparsed_end <= id.counter {
612                return;
613            }
614
615            let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
616            let Some(nodes) = self
617                .change_store
618                .get_dag_nodes_that_contains(last_unparsed_id)
619            else {
620                panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
621            };
622
623            self.lazy_load_nodes_internal(nodes, id.peer, None);
624        }
625    }
626
627    pub fn total_parsed_dag_node(&self) -> usize {
628        self.map.lock().len()
629    }
630
631    pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
632        assert!(self.vv.is_empty());
633        *self.unparsed_vv.lock() = v.vv.clone();
634        self.vv = v.vv;
635        self.frontiers = v.frontiers;
636        if let Some((vv, f)) = v.start_version {
637            if !f.is_empty() {
638                assert!(f.len() == 1);
639                let id = f.as_single().unwrap();
640                let node = self.get(id).unwrap();
641                assert!(node.cnt == id.counter);
642                self.shallow_root_frontiers_deps = node.deps.clone();
643            }
644            self.shallow_since_frontiers = f;
645            self.shallow_since_vv = ImVersionVector::from_vv(&vv);
646        }
647    }
648
649    /// This method is slow and should only be used for debugging and testing.
650    ///
651    /// It will check the following properties:
652    ///
653    /// 1. Counter is continuous
654    /// 2. A node always depends of the last ids of other nodes
655    /// 3. Lamport is correctly calculated
656    /// 4. VV for each node is correctly calculated
657    /// 5. Frontiers are correctly calculated
658    #[instrument(skip(self))]
659    pub fn check_dag_correctness(&self) {
660        {
661            // parse all nodes
662            let unparsed_vv = self.unparsed_vv.lock().clone();
663            for (peer, cnt) in unparsed_vv.iter() {
664                if *cnt == 0 {
665                    continue;
666                }
667
668                let mut end_cnt = *cnt;
669                let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
670                while end_cnt > init_counter {
671                    let cnt = end_cnt - 1;
672                    self.ensure_lazy_load_node(ID::new(*peer, cnt));
673                    end_cnt = self.unparsed_vv.lock().get(peer).copied().unwrap_or(0);
674                }
675            }
676
677            self.unparsed_vv.lock().clear();
678        }
679        {
680            // check property 1: Counter is continuous
681            let map = self.map.lock();
682            let mut last_end_id = ID::new(0, 0);
683            for (&id, node) in map.iter() {
684                let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
685                if id.peer == last_end_id.peer {
686                    assert!(id.counter == last_end_id.counter);
687                } else {
688                    assert_eq!(id.counter, init_counter);
689                }
690
691                last_end_id = id.inc(node.len as Counter);
692            }
693        }
694        {
695            // check property 2: A node always depends of the last ids of other nodes
696            let map = self.map.lock();
697            check_always_dep_on_last_id(&map);
698        }
699        {
700            // check property 3: Lamport is correctly calculated
701            let map = self.map.lock();
702            'outer: for (_, node) in map.iter() {
703                let mut this_lamport = 0;
704                for dep in node.deps.iter() {
705                    if self.shallow_since_vv.includes_id(dep) {
706                        continue 'outer;
707                    }
708
709                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
710                    this_lamport = this_lamport.max(dep_node.lamport_end());
711                }
712
713                assert_eq!(this_lamport, node.lamport);
714            }
715        }
716        {
717            // check property 4: VV for each node is correctly calculated
718            let map = self.map.lock().clone();
719            'outer: for (_, node) in map.iter() {
720                let actual_vv = self.ensure_vv_for(node);
721                let mut expected_vv = ImVersionVector::default();
722                for dep in node.deps.iter() {
723                    if self.shallow_since_vv.includes_id(dep) {
724                        continue 'outer;
725                    }
726
727                    let (_, dep_node) = map.range(..=dep).next_back().unwrap();
728                    self.ensure_vv_for(dep_node);
729                    expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
730                    expected_vv.extend_to_include_last_id(dep);
731                }
732
733                assert_eq!(actual_vv, expected_vv);
734            }
735        }
736        {
737            // check property 5: Frontiers are correctly calculated
738            let mut maybe_frontiers = FxHashSet::default();
739            let map = self.map.lock();
740            for (_, node) in map.iter() {
741                maybe_frontiers.insert(node.id_last());
742            }
743
744            for (_, node) in map.iter() {
745                for dep in node.deps.iter() {
746                    maybe_frontiers.remove(&dep);
747                }
748            }
749
750            let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
751            assert_eq!(maybe_frontiers, frontiers);
752        }
753    }
754
755    pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
756        for id in deps.iter() {
757            if !self.vv.includes_id(id) {
758                return false;
759            }
760        }
761
762        if self.is_before_shallow_root(deps) {
763            return false;
764        }
765
766        true
767    }
768
769    pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
770        // trace!("Is on shallow history? deps={:?}", deps);
771        // trace!("self.shallow_since_vv {:?}", &self.shallow_since_vv);
772        // trace!("self.shallow_frontiers {:?}", &self.shallow_since_frontiers);
773
774        if self.shallow_since_vv.is_empty() {
775            return false;
776        }
777
778        if deps.is_empty() {
779            return true;
780        }
781
782        if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
783            return true;
784        }
785
786        if deps
787            .iter()
788            .any(|x| self.shallow_since_frontiers.contains(&x))
789        {
790            return deps != &self.shallow_since_frontiers;
791        }
792
793        false
794    }
795
796    /// Travel the ancestors of the given id, and call the callback for each node
797    ///
798    /// It will travel the ancestors in the reverse order (from the greatest lamport to the smallest)
799    pub(crate) fn travel_ancestors(
800        &self,
801        id: ID,
802        f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
803    ) {
804        struct PendingNode(AppDagNode);
805        impl PartialEq for PendingNode {
806            fn eq(&self, other: &Self) -> bool {
807                self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
808            }
809        }
810        impl Eq for PendingNode {}
811        impl PartialOrd for PendingNode {
812            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
813                Some(self.cmp(other))
814            }
815        }
816        impl Ord for PendingNode {
817            fn cmp(&self, other: &Self) -> Ordering {
818                self.0
819                    .lamport_last()
820                    .cmp(&other.0.lamport_last())
821                    .then_with(|| self.0.peer.cmp(&other.0.peer))
822            }
823        }
824
825        let mut visited = FxHashSet::default();
826        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
827        pending.push(PendingNode(self.get(id).unwrap()));
828        while let Some(PendingNode(node)) = pending.pop() {
829            if f(&node).is_break() {
830                break;
831            }
832
833            for dep in node.deps.iter() {
834                let Some(dep_node) = self.get(dep) else {
835                    continue;
836                };
837                if visited.contains(&dep_node.id_start()) {
838                    continue;
839                }
840
841                visited.insert(dep_node.id_start());
842                pending.push(PendingNode(dep_node));
843            }
844        }
845    }
846
847    pub(crate) fn update_version_on_new_local_op(
848        &mut self,
849        deps: &Frontiers,
850        start_id: ID,
851        start_lamport: Lamport,
852        len: usize,
853    ) {
854        let last_id = start_id.inc(len as Counter - 1);
855        // PERF: we can cache this last_id - this is a hot path
856        self.vv.set_last(last_id);
857        self.frontiers.update_frontiers_on_new_change(last_id, deps);
858        match &mut self.pending_txn_node {
859            Some(node) => {
860                assert!(
861                    node.peer == start_id.peer
862                        && node.cnt + node.len as Counter == start_id.counter
863                        && deps.len() == 1
864                        && deps.as_single().unwrap().peer == start_id.peer
865                );
866                let inner = Arc::make_mut(&mut node.inner);
867                inner.len += len;
868            }
869            None => {
870                let node = AppDagNode {
871                    inner: Arc::new(AppDagNodeInner {
872                        peer: start_id.peer,
873                        cnt: start_id.counter,
874                        lamport: start_lamport,
875                        deps: deps.clone(),
876                        vv: OnceCell::new(),
877                        has_succ: false,
878                        len,
879                    }),
880                };
881                self.pending_txn_node = Some(node);
882            }
883        }
884    }
885
886    pub(crate) fn latest_vv_contains_peer(&self, peer: PeerID) -> bool {
887        self.vv.contains_key(&peer) && *self.vv.get(&peer).unwrap() > 0
888    }
889}
890
891fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
892    for (_, node) in map.iter() {
893        for dep in node.deps.iter() {
894            let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
895                // It's shallow
896                continue;
897            };
898            assert_eq!(dep_node.id_start(), dep_id);
899            if dep_node.contains_id(dep) {
900                assert_eq!(dep_node.id_last(), dep);
901            }
902        }
903    }
904}
905
906impl HasIndex for AppDagNode {
907    type Int = Counter;
908    fn get_start_index(&self) -> Self::Int {
909        self.cnt
910    }
911
912    fn get_end_index(&self) -> Self::Int {
913        self.cnt + self.len as Counter
914    }
915}
916
917impl Sliceable for AppDagNode {
918    fn slice(&self, from: usize, to: usize) -> Self {
919        AppDagNodeInner {
920            peer: self.peer,
921            cnt: self.cnt + from as Counter,
922            lamport: self.lamport + from as Lamport,
923            deps: if from > 0 {
924                Frontiers::from_id(self.id_start().inc(from as Counter - 1))
925            } else {
926                self.deps.clone()
927            },
928            vv: if let Some(vv) = self.vv.get() {
929                let mut new = vv.clone();
930                new.insert(self.peer, self.cnt + from as Counter);
931                OnceCell::with_value(new)
932            } else {
933                OnceCell::new()
934            },
935            has_succ: if to == self.len { self.has_succ } else { true },
936            len: to - from,
937        }
938        .into()
939    }
940}
941
942impl HasId for AppDagNode {
943    fn id_start(&self) -> ID {
944        ID {
945            peer: self.peer,
946            counter: self.cnt,
947        }
948    }
949}
950
951impl HasCounter for AppDagNode {
952    fn ctr_start(&self) -> Counter {
953        self.cnt
954    }
955}
956
957impl HasLength for AppDagNode {
958    fn atom_len(&self) -> usize {
959        self.len
960    }
961
962    fn content_len(&self) -> usize {
963        self.len
964    }
965}
966
967impl Mergable for AppDagNode {
968    fn is_mergable(&self, other: &Self, _conf: &()) -> bool
969    where
970        Self: Sized,
971    {
972        !self.has_succ
973            && self.peer == other.peer
974            && self.cnt + self.len as Counter == other.cnt
975            && other.deps.len() == 1
976            && self.lamport + self.len as Lamport == other.lamport
977            && other.deps.as_single().unwrap().peer == self.peer
978    }
979
980    fn merge(&mut self, other: &Self, _conf: &())
981    where
982        Self: Sized,
983    {
984        assert_eq!(
985            other.deps.as_single().unwrap().counter,
986            self.cnt + self.len as Counter - 1
987        );
988        let this = Arc::make_mut(&mut self.inner);
989        this.len += other.len;
990        this.has_succ = other.has_succ;
991    }
992}
993
994impl HasLamport for AppDagNode {
995    fn lamport(&self) -> Lamport {
996        self.lamport
997    }
998}
999
1000impl DagNode for AppDagNode {
1001    fn deps(&self) -> &Frontiers {
1002        &self.deps
1003    }
1004}
1005
1006impl Dag for AppDag {
1007    type Node = AppDagNode;
1008
1009    fn frontier(&self) -> &Frontiers {
1010        &self.frontiers
1011    }
1012
1013    fn get(&self, id: ID) -> Option<Self::Node> {
1014        self.ensure_lazy_load_node(id);
1015        let binding = self.map.lock();
1016        if let Some(x) = binding.range(..=id).next_back() {
1017            if x.1.contains_id(id) {
1018                // PERF: do we need to optimize clone like this?
1019                // by adding another layer of Arc?
1020                return Some(x.1.clone());
1021            }
1022        }
1023
1024        if let Some(node) = &self.pending_txn_node {
1025            if node.peer == id.peer && node.cnt <= id.counter {
1026                assert!(node.cnt + node.len as Counter > id.counter);
1027                return Some(node.clone());
1028            }
1029        }
1030
1031        None
1032    }
1033
1034    fn vv(&self) -> &VersionVector {
1035        &self.vv
1036    }
1037
1038    fn contains(&self, id: ID) -> bool {
1039        self.vv.includes_id(id)
1040    }
1041}
1042
1043impl AppDag {
1044    // PERF: this may be painfully slow
1045    /// get the version vector for a certain op.
1046    /// It's the version when the op is applied
1047    pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
1048        self.get(id).map(|x| {
1049            let mut vv = self.ensure_vv_for(&x);
1050            vv.insert(id.peer, id.counter + 1);
1051            vv
1052        })
1053    }
1054
1055    pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
1056        if target_node.vv.get().is_none() {
1057            // Iterative DFS. When the DAG contains a diamond, a dep can end
1058            // up on the stack multiple times (once for each ancestor that
1059            // re-queued the current node before its deps were ready). We
1060            // skip nodes whose vv has already been computed — the original
1061            // code eagerly called `OnceCell::set(..).unwrap()` on every pop
1062            // and panicked on the second visit (see loro-dev/loro#929).
1063            let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
1064            while let Some(top_node) = stack.pop() {
1065                if top_node.vv.get().is_some() {
1066                    continue;
1067                }
1068
1069                let mut ans_vv = ImVersionVector::default();
1070                if top_node.deps == self.shallow_root_frontiers_deps {
1071                    for (&p, &c) in self.shallow_since_vv.iter() {
1072                        ans_vv.insert(p, c);
1073                    }
1074                } else {
1075                    let mut all_deps_processed = true;
1076                    for id in top_node.deps.iter() {
1077                        let node = self.get(id).expect("deps should be in the dag");
1078                        if node.vv.get().is_none() {
1079                            if all_deps_processed {
1080                                stack.push(top_node.clone());
1081                            }
1082                            all_deps_processed = false;
1083                            stack.push(node);
1084                            continue;
1085                        };
1086                    }
1087
1088                    if !all_deps_processed {
1089                        continue;
1090                    }
1091
1092                    for id in top_node.deps.iter() {
1093                        let node = self.get(id).expect("deps should be in the dag");
1094                        let dep_vv = node.vv.get().unwrap();
1095                        if ans_vv.is_empty() {
1096                            ans_vv = dep_vv.clone();
1097                        } else {
1098                            ans_vv.extend_to_include_vv(dep_vv.iter());
1099                        }
1100
1101                        ans_vv.insert(node.peer, node.ctr_end());
1102                    }
1103                }
1104
1105                // Tolerate a racing set from the diamond case above: if
1106                // another path already initialized this cell, trust that
1107                // value (it was computed from the same DAG) and move on.
1108                let _ = top_node.vv.set(ans_vv);
1109            }
1110        }
1111
1112        target_node.vv.get().unwrap().clone()
1113    }
1114
1115    /// Compare the causal order of two versions.
1116    /// If None, two versions are concurrent to each other
1117    pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
1118        if a.peer == b.peer {
1119            return Some(a.counter.cmp(&b.counter));
1120        }
1121
1122        let a = self.get_vv(a).unwrap();
1123        let b = self.get_vv(b).unwrap();
1124        a.partial_cmp(&b)
1125    }
1126
1127    pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
1128        self.get(*id).and_then(|node| {
1129            assert!(id.counter >= node.cnt);
1130            if node.cnt + node.len as Counter > id.counter {
1131                Some(node.lamport + (id.counter - node.cnt) as Lamport)
1132            } else {
1133                None
1134            }
1135        })
1136    }
1137
1138    pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
1139        let mut lamport = 0;
1140        for id in deps.iter() {
1141            let x = self.get_lamport(&id)?;
1142            lamport = lamport.max(x + 1);
1143        }
1144
1145        Some(lamport)
1146    }
1147
1148    /// Convert a frontiers to a version vector
1149    ///
1150    /// If the frontiers version is not found in the dag, return None
1151    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1152        if frontiers == &self.shallow_root_frontiers_deps {
1153            let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
1154            return Some(vv);
1155        }
1156
1157        let mut vv: VersionVector = Default::default();
1158        for id in frontiers.iter() {
1159            let x = self.get(id)?;
1160            let target_vv = self.ensure_vv_for(&x);
1161            vv.extend_to_include_vv(target_vv.iter());
1162            vv.extend_to_include_last_id(id);
1163        }
1164
1165        Some(vv)
1166    }
1167
1168    #[allow(unused)]
1169    pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
1170        if frontiers.is_empty() {
1171            return Default::default();
1172        }
1173
1174        let mut iter = frontiers.iter();
1175        let mut vv = {
1176            let id = iter.next().unwrap();
1177            let Some(x) = self.get(id) else {
1178                unreachable!()
1179            };
1180            let mut vv = self.ensure_vv_for(&x);
1181            vv.extend_to_include_last_id(id);
1182            vv
1183        };
1184
1185        for id in iter {
1186            let Some(x) = self.get(id) else {
1187                unreachable!()
1188            };
1189            let x = self.ensure_vv_for(&x);
1190            vv.extend_to_include_vv(x.iter());
1191            vv.extend_to_include_last_id(id);
1192        }
1193
1194        vv
1195    }
1196
1197    pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1198        if vv.is_empty() {
1199            return Default::default();
1200        }
1201
1202        let this = vv;
1203        let last_ids: Frontiers = this
1204            .iter()
1205            .filter_map(|(client_id, cnt)| {
1206                if *cnt == 0 {
1207                    return None;
1208                }
1209
1210                if self
1211                    .shallow_since_vv
1212                    .includes_id(ID::new(*client_id, *cnt - 1))
1213                {
1214                    return None;
1215                }
1216
1217                Some(ID::new(*client_id, cnt - 1))
1218            })
1219            .collect();
1220
1221        if last_ids.is_empty() {
1222            return self.shallow_since_frontiers.clone();
1223        }
1224
1225        shrink_frontiers(&last_ids, self).unwrap()
1226    }
1227
1228    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1229        if vv.is_empty() {
1230            return Default::default();
1231        }
1232
1233        let this = vv;
1234        let last_ids: Frontiers = this
1235            .iter()
1236            .filter_map(|(client_id, cnt)| {
1237                if *cnt == 0 {
1238                    return None;
1239                }
1240
1241                if self
1242                    .shallow_since_vv
1243                    .includes_id(ID::new(*client_id, *cnt - 1))
1244                {
1245                    return None;
1246                }
1247
1248                Some(ID::new(*client_id, cnt - 1))
1249            })
1250            .collect();
1251
1252        if last_ids.is_empty() {
1253            return self.shallow_since_frontiers.clone();
1254        }
1255
1256        shrink_frontiers(&last_ids, self).unwrap()
1257    }
1258
1259    pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1260        if frontiers.is_empty() {
1261            return 0;
1262        }
1263
1264        let mut iter = frontiers.iter();
1265        let mut lamport = {
1266            let id = iter.next().unwrap();
1267            let Some(x) = self.get(id) else {
1268                unreachable!()
1269            };
1270            assert!(id.counter >= x.cnt);
1271            (id.counter - x.cnt) as Lamport + x.lamport + 1
1272        };
1273
1274        for id in iter {
1275            let Some(x) = self.get(id) else {
1276                unreachable!()
1277            };
1278            assert!(id.counter >= x.cnt);
1279            lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1280        }
1281
1282        lamport
1283    }
1284
1285    pub fn get_frontiers(&self) -> &Frontiers {
1286        &self.frontiers
1287    }
1288
1289    /// - Ordering::Less means self is less than target or parallel
1290    /// - Ordering::Equal means versions equal
1291    /// - Ordering::Greater means self's version is greater than target
1292    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1293        if &self.frontiers == other {
1294            Ordering::Equal
1295        } else if other.iter().all(|id| self.vv.includes_id(id)) {
1296            Ordering::Greater
1297        } else {
1298            Ordering::Less
1299        }
1300    }
1301
1302    // PERF
1303    /// Compare two [Frontiers] causally.
1304    ///
1305    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1306    pub fn cmp_frontiers(
1307        &self,
1308        a: &Frontiers,
1309        b: &Frontiers,
1310    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1311        let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1312        let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1313        Ok(a.partial_cmp(&b))
1314    }
1315}
1316
1317#[derive(Debug, PartialEq, Eq)]
1318pub struct FrontiersNotIncluded;
1319impl Display for FrontiersNotIncluded {
1320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1321        f.write_str("The given Frontiers are not included by the doc")
1322    }
1323}
1324
1325#[cfg(test)]
1326mod ensure_vv_for_tests {
1327    use super::*;
1328    use crate::arena::SharedArena;
1329    use std::sync::atomic::AtomicI64;
1330
1331    fn make_dag_node(peer: PeerID, cnt: Counter, len: usize, deps: Frontiers) -> AppDagNode {
1332        AppDagNodeInner {
1333            vv: OnceCell::new(),
1334            peer,
1335            cnt,
1336            lamport: cnt as Lamport,
1337            deps,
1338            has_succ: false,
1339            len,
1340        }
1341        .into()
1342    }
1343
1344    /// Regression for loro-dev/loro#929: when computing the vv for a node
1345    /// whose DAG fan-in contains a shared ancestor reached through multiple
1346    /// paths, the iterative DFS used to push the shared ancestor onto the
1347    /// stack twice. The second visit would then call `OnceCell::set(..)
1348    /// .unwrap()` on an already-initialized cell and panic with
1349    /// "called `Result::unwrap()` on an `Err` value: ImVersionVector(..)".
1350    ///
1351    /// Topology:
1352    ///    x (peer 1, counter 0)
1353    ///    |
1354    ///    y (peer 2, counter 0, deps = [x])
1355    ///    |
1356    ///    z (peer 3, counter 0, deps = [x, y])
1357    ///
1358    /// When we call `ensure_vv_for(z)`, z pushes x and y. Processing y then
1359    /// re-pushes x (still uninitialized), so the stack ends up with two
1360    /// copies of x; the second pop must be a no-op after the fix.
1361    #[test]
1362    fn diamond_dep_ensure_vv_for_does_not_double_set() {
1363        let change_store = ChangeStore::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)));
1364        let dag = AppDag::new(change_store);
1365
1366        let x = make_dag_node(1, 0, 1, Frontiers::default());
1367        let y = make_dag_node(2, 0, 1, Frontiers::from_id(ID::new(1, 0)));
1368        let mut z_deps = Frontiers::default();
1369        z_deps.push(ID::new(1, 0));
1370        z_deps.push(ID::new(2, 0));
1371        let z = make_dag_node(3, 0, 1, z_deps);
1372
1373        {
1374            let mut map = dag.map.lock();
1375            map.insert(x.id_start(), x);
1376            map.insert(y.id_start(), y);
1377            map.insert(z.id_start(), z.clone());
1378        }
1379
1380        // Historically this panicked at the inner `vv.set(...).unwrap()`.
1381        // `ensure_vv_for` returns the vv *at* the node: it covers every
1382        // peer in the causal past, but not the node's own peer counter.
1383        let vv = dag.ensure_vv_for(&z);
1384        assert_eq!(vv.get(&1).copied(), Some(1));
1385        assert_eq!(vv.get(&2).copied(), Some(1));
1386        assert!(vv.get(&3).is_none());
1387    }
1388}