1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::sync::Mutex;
6use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
7use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
8use once_cell::sync::OnceCell;
9use rle::{HasIndex, HasLength, Mergable, Sliceable};
10use rustc_hash::FxHashSet;
11use smallvec::SmallVec;
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
14use std::fmt::Display;
15use std::ops::{ControlFlow, Deref};
16use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
17use std::sync::Arc;
18use tracing::instrument;
19
20use super::change_store::BatchDecodeInfo;
21use super::ChangeStore;
22
23#[derive(Debug)]
26pub struct AppDag {
27 change_store: ChangeStore,
28 map: Mutex<BTreeMap<ID, AppDagNode>>,
33 frontiers: Frontiers,
35 vv: VersionVector,
37 shallow_since_frontiers: Frontiers,
39 shallow_root_frontiers_deps: Frontiers,
41 shallow_since_vv: ImVersionVector,
43 unparsed_vv: Mutex<VersionVector>,
49 unhandled_dep_points: Mutex<BTreeSet<ID>>,
53 pending_txn_node: Option<AppDagNode>,
54 import_rollback_has_journal: AtomicBool,
55 import_rollback: Mutex<Option<AppDagRollback>>,
56}
57
58#[derive(Debug)]
59pub(crate) struct AppDagRollback {
60 frontiers: Frontiers,
61 vv: VersionVector,
62 unparsed_vv: VersionVector,
63 shallow_since_frontiers: Frontiers,
64 shallow_root_frontiers_deps: Frontiers,
65 shallow_since_vv: ImVersionVector,
66 pending_txn_node: Option<AppDagNode>,
67 map_entries_before_mutation: BTreeMap<ID, Option<AppDagNode>>,
68 unhandled_dep_point_log: Vec<UnhandledDepPointLog>,
69}
70
71#[derive(Debug)]
72enum UnhandledDepPointLog {
73 Added(ID),
74 Removed(ID),
75}
76
77#[derive(Debug, Clone)]
78pub struct AppDagNode {
79 inner: Arc<AppDagNodeInner>,
80}
81
82impl Deref for AppDagNode {
83 type Target = AppDagNodeInner;
84
85 fn deref(&self) -> &Self::Target {
86 &self.inner
87 }
88}
89
90impl AppDagNode {
91 pub fn new(inner: AppDagNodeInner) -> Self {
92 Self {
93 inner: Arc::new(inner),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
99pub struct AppDagNodeInner {
100 pub(crate) peer: PeerID,
101 pub(crate) cnt: Counter,
102 pub(crate) lamport: Lamport,
103 pub(crate) deps: Frontiers,
104 pub(crate) vv: OnceCell<ImVersionVector>,
105 pub(crate) has_succ: bool,
109 pub(crate) len: usize,
110}
111
112impl From<AppDagNodeInner> for AppDagNode {
113 fn from(inner: AppDagNodeInner) -> Self {
114 AppDagNode {
115 inner: Arc::new(inner),
116 }
117 }
118}
119
120impl AppDag {
121 pub(super) fn new(change_store: ChangeStore) -> Self {
122 Self {
123 change_store,
124 map: Mutex::new(BTreeMap::new()),
125 frontiers: Frontiers::default(),
126 vv: VersionVector::default(),
127 unparsed_vv: Mutex::new(VersionVector::default()),
128 unhandled_dep_points: Mutex::new(BTreeSet::new()),
129 shallow_since_frontiers: Default::default(),
130 shallow_root_frontiers_deps: Default::default(),
131 shallow_since_vv: Default::default(),
132 pending_txn_node: None,
133 import_rollback_has_journal: AtomicBool::new(false),
134 import_rollback: Mutex::new(None),
135 }
136 }
137
138 pub fn frontiers(&self) -> &Frontiers {
139 &self.frontiers
140 }
141
142 pub fn vv(&self) -> &VersionVector {
143 &self.vv
144 }
145
146 pub fn shallow_since_vv(&self) -> &ImVersionVector {
147 &self.shallow_since_vv
148 }
149
150 pub fn shallow_since_frontiers(&self) -> &Frontiers {
151 &self.shallow_since_frontiers
152 }
153
154 pub(crate) fn begin_import_rollback(&mut self) {
155 let old_vv_is_empty = self.vv.is_empty();
156 let mut rollback = self.import_rollback.lock();
157 debug_assert!(rollback.is_none());
158 *rollback = Some(AppDagRollback {
159 frontiers: self.frontiers.clone(),
160 vv: self.vv.clone(),
161 unparsed_vv: self.unparsed_vv.lock().clone(),
162 shallow_since_frontiers: self.shallow_since_frontiers.clone(),
163 shallow_root_frontiers_deps: self.shallow_root_frontiers_deps.clone(),
164 shallow_since_vv: self.shallow_since_vv.clone(),
165 pending_txn_node: self.pending_txn_node.clone(),
166 map_entries_before_mutation: BTreeMap::new(),
167 unhandled_dep_point_log: Vec::new(),
168 });
169 self.import_rollback_has_journal
170 .store(!old_vv_is_empty, AtomicOrdering::Relaxed);
171 }
172
173 pub(crate) fn commit_import_rollback(&mut self) {
174 self.import_rollback_has_journal
175 .store(false, AtomicOrdering::Relaxed);
176 *self.import_rollback.lock() = None;
177 }
178
179 pub(crate) fn rollback_import(&mut self) {
180 self.import_rollback_has_journal
181 .store(false, AtomicOrdering::Relaxed);
182 let Some(checkpoint) = self.import_rollback.lock().take() else {
183 return;
184 };
185
186 let imported_spans = self.vv.sub_iter(&checkpoint.vv).collect::<Vec<_>>();
187 let mut map = self.map.lock();
188 for span in imported_spans {
189 let start = ID::new(span.peer, span.counter.start);
190 let end = ID::new(span.peer, span.counter.end);
191 let keys = map.range(start..end).map(|(id, _)| *id).collect::<Vec<_>>();
192 for key in keys {
193 map.remove(&key);
194 }
195 }
196
197 for (id, node) in checkpoint.map_entries_before_mutation {
198 if let Some(node) = node {
199 map.insert(id, node);
200 } else {
201 map.remove(&id);
202 }
203 }
204 drop(map);
205
206 self.frontiers = checkpoint.frontiers;
207 self.vv = checkpoint.vv.clone();
208 self.shallow_since_frontiers = checkpoint.shallow_since_frontiers;
209 self.shallow_root_frontiers_deps = checkpoint.shallow_root_frontiers_deps;
210 self.shallow_since_vv = checkpoint.shallow_since_vv;
211 *self.unparsed_vv.lock() = checkpoint.unparsed_vv;
212
213 let mut unhandled_dep_points = self.unhandled_dep_points.lock();
214 if checkpoint.vv.is_empty() {
215 unhandled_dep_points.clear();
216 }
217 for item in checkpoint.unhandled_dep_point_log.into_iter().rev() {
218 match item {
219 UnhandledDepPointLog::Added(id) => {
220 unhandled_dep_points.remove(&id);
221 }
222 UnhandledDepPointLog::Removed(id) => {
223 unhandled_dep_points.insert(id);
224 }
225 }
226 }
227 drop(unhandled_dep_points);
228
229 self.pending_txn_node = checkpoint.pending_txn_node;
230 }
231
232 fn record_map_entry_before_mutation(&self, map: &BTreeMap<ID, AppDagNode>, id: ID) {
233 if !self
234 .import_rollback_has_journal
235 .load(AtomicOrdering::Relaxed)
236 {
237 return;
238 }
239
240 self.record_map_entry_before_replacement(id, map.get(&id).cloned());
241 }
242
243 fn record_map_entry_before_replacement(&self, id: ID, old: Option<AppDagNode>) {
244 if !self
245 .import_rollback_has_journal
246 .load(AtomicOrdering::Relaxed)
247 {
248 return;
249 }
250
251 let mut rollback = self.import_rollback.lock();
252 let Some(rollback) = rollback.as_mut() else {
253 return;
254 };
255
256 let old_end = rollback.vv.get(&id.peer).copied().unwrap_or(0);
257 if id.counter >= old_end {
258 return;
259 }
260
261 rollback
262 .map_entries_before_mutation
263 .entry(id)
264 .or_insert(old);
265 }
266
267 fn record_unhandled_dep_point_added(&self, id: ID) {
268 if !self
269 .import_rollback_has_journal
270 .load(AtomicOrdering::Relaxed)
271 {
272 return;
273 }
274
275 if let Some(rollback) = self.import_rollback.lock().as_mut() {
276 rollback
277 .unhandled_dep_point_log
278 .push(UnhandledDepPointLog::Added(id));
279 }
280 }
281
282 fn record_unhandled_dep_point_removed(&self, id: ID) {
283 if !self
284 .import_rollback_has_journal
285 .load(AtomicOrdering::Relaxed)
286 {
287 return;
288 }
289
290 if let Some(rollback) = self.import_rollback.lock().as_mut() {
291 rollback
292 .unhandled_dep_point_log
293 .push(UnhandledDepPointLog::Removed(id));
294 }
295 }
296
297 pub fn is_empty(&self) -> bool {
298 self.vv.is_empty()
299 }
300
301 #[tracing::instrument(skip_all, name = "handle_new_change")]
302 pub(super) fn handle_new_change(
303 &mut self,
304 change: &Change,
305 from_local: bool,
306 rollback_old_vv: Option<&VersionVector>,
307 ) {
308 let len = change.content_len();
309 self.update_version_on_new_change(change, from_local);
310 #[cfg(debug_assertions)]
311 {
312 let unhandled_dep_points = self.unhandled_dep_points.lock();
313 let c = unhandled_dep_points
314 .range(change.id_start()..change.id_end())
315 .count();
316 assert!(c == 0);
317 }
318
319 let mut inserted = false;
320 if change.deps_on_self() {
321 let record_old_node_before_merge = rollback_old_vv
322 .and_then(|vv| vv.get(&change.id.peer).copied())
323 .filter(|old_end| *old_end == change.id.counter);
324 inserted =
326 self.with_last_mut_of_peer(change.id.peer, record_old_node_before_merge, |last| {
327 let (_, last) = last.unwrap();
328 if last.has_succ {
329 return false;
331 }
332
333 assert_eq!(last.peer, change.id.peer, "peer id is not the same");
334 assert_eq!(
335 last.cnt + last.len as Counter,
336 change.id.counter,
337 "counter is not continuous"
338 );
339 assert_eq!(
340 last.lamport + last.len as Lamport,
341 change.lamport,
342 "lamport is not continuous"
343 );
344 let last = Arc::make_mut(&mut last.inner);
345 last.len = (change.id.counter - last.cnt) as usize + len;
346 last.has_succ = false;
347 true
348 });
349 }
350
351 if !inserted {
352 let node: AppDagNode = AppDagNodeInner {
353 vv: OnceCell::new(),
354 peer: change.id.peer,
355 cnt: change.id.counter,
356 lamport: change.lamport,
357 deps: change.deps.clone(),
358 has_succ: false,
359 len,
360 }
361 .into();
362
363 let mut map = self.map.lock();
364 map.insert(node.id_start(), node);
365 self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
366 }
367 }
368
369 fn try_with_node_mut<R>(
370 &self,
371 map: &mut BTreeMap<ID, AppDagNode>,
372 id: ID,
373 f: impl FnOnce(Option<(ID, &mut AppDagNode)>) -> R,
374 ) -> R {
375 let x = map.range_mut(..=id).next_back();
376 if let Some((node_id, node)) = x {
377 if node.contains_id(id) {
378 f(Some((*node_id, node)))
379 } else {
380 f(None)
381 }
382 } else {
383 f(None)
384 }
385 }
386
387 pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
389 for dep in change.deps.iter() {
390 match self.get_lamport(&dep) {
391 Some(lamport) => {
392 change.lamport = change.lamport.max(lamport + 1);
393 }
394 None => return Err(()),
395 }
396 }
397 Ok(())
398 }
399
400 pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
401 let Some(node) = self.get(id) else {
402 return Frontiers::default();
403 };
404
405 let offset = id.counter - node.cnt;
406 if offset == 0 {
407 node.deps.clone()
408 } else {
409 ID::new(id.peer, node.cnt + offset - 1).into()
410 }
411 }
412
413 pub(crate) fn with_last_mut_of_peer<R>(
414 &mut self,
415 peer: PeerID,
416 record_if_before: Option<Counter>,
417 f: impl FnOnce(Option<(ID, &mut AppDagNode)>) -> R,
418 ) -> R {
419 self.lazy_load_last_of_peer(peer);
420 let mut binding = self.map.lock();
421 let last = binding
422 .range_mut(..=ID::new(peer, Counter::MAX))
423 .next_back()
424 .map(|(id, v)| {
425 if record_if_before.is_some_and(|old_end| id.counter < old_end) {
426 self.record_map_entry_before_replacement(*id, Some(v.clone()));
427 }
428 (*id, v)
429 });
430 f(last)
431 }
432
433 fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
434 if from_local {
435 assert!(self.pending_txn_node.take().is_some());
436 assert_eq!(
437 self.vv.get(&change.id.peer).copied().unwrap_or(0),
438 change.ctr_end()
439 );
440 } else {
441 let id_last = change.id_last();
442 self.frontiers
443 .update_frontiers_on_new_change(id_last, &change.deps);
444 assert!(self.pending_txn_node.is_none());
445 assert_eq!(
446 self.vv.get(&change.id.peer).copied().unwrap_or(0),
447 change.id.counter
448 );
449 self.vv.extend_to_include_last_id(id_last);
450 }
451 }
452
453 pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
454 let unparsed_vv = self.unparsed_vv.lock();
455 if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
456 return;
457 }
458
459 let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
460 panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
461 };
462
463 self.lazy_load_nodes_internal(nodes, peer, None);
464 }
465
466 fn lazy_load_nodes_internal(
467 &self,
468 nodes: Vec<AppDagNode>,
469 peer: u64,
470 map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
471 ) {
472 assert!(!nodes.is_empty());
473 let mut map_guard = None;
474 let map = map_input.unwrap_or_else(|| {
475 map_guard = Some(self.map.lock());
476 map_guard.as_mut().unwrap()
477 });
478 let new_dag_start_counter_for_the_peer = nodes[0].cnt;
479 let nodes_cnt_end = nodes.last().unwrap().ctr_end();
480 let mut unparsed_vv = self.unparsed_vv.lock();
481 let end_counter = unparsed_vv[&peer];
482 assert!(end_counter <= nodes_cnt_end);
483 let mut deps_on_others = Vec::new();
484 let mut break_point_set = self.unhandled_dep_points.lock();
485 for mut node in nodes {
486 if node.cnt >= end_counter {
487 break;
489 }
490
491 if node.cnt + node.len as Counter > end_counter {
492 node = node.slice(0, (end_counter - node.cnt) as usize);
493 }
495
496 for dep in node.deps.iter() {
497 if dep.peer != peer {
498 deps_on_others.push(dep);
499 }
500 }
501
502 let break_point_ends: Vec<_> = break_point_set
504 .range(node.id_start()..node.id_end())
505 .map(|id| (id.counter - node.cnt) as usize + 1)
506 .collect();
507 if break_point_ends.is_empty() {
508 self.record_map_entry_before_mutation(map, node.id_start());
509 map.insert(node.id_start(), node);
510 } else {
511 let mut slice_start = 0;
512 for slice_end in break_point_ends.iter().copied() {
513 let mut slice_node = node.slice(slice_start, slice_end);
514 let inner = Arc::make_mut(&mut slice_node.inner);
515 inner.has_succ = true;
516 self.record_map_entry_before_mutation(map, slice_node.id_start());
517 map.insert(slice_node.id_start(), slice_node);
518 slice_start = slice_end;
519 }
520
521 let last_break_point = break_point_ends.last().copied().unwrap();
522 if last_break_point != node.len {
523 let slice_node = node.slice(last_break_point, node.len);
524 self.record_map_entry_before_mutation(map, slice_node.id_start());
525 map.insert(slice_node.id_start(), slice_node);
526 }
527
528 for break_point in break_point_ends.into_iter() {
529 let id = node.id_start().inc(break_point as Counter - 1);
530 if break_point_set.remove(&id) {
531 self.record_unhandled_dep_point_removed(id);
532 }
533 }
534 }
535 }
536
537 if new_dag_start_counter_for_the_peer == 0 {
538 unparsed_vv.remove(&peer);
539 } else {
540 unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
541 }
542 drop(unparsed_vv);
543 drop(break_point_set);
544 self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
545 }
546
547 fn handle_deps_break_points(
548 &self,
549 ids: impl IntoIterator<Item = ID>,
550 skip_peer: PeerID,
551 map: Option<&mut BTreeMap<ID, AppDagNode>>,
552 ) {
553 let mut map_guard = None;
554 let map = map.unwrap_or_else(|| {
555 map_guard = Some(self.map.lock());
556 map_guard.as_mut().unwrap()
557 });
558 for id in ids {
559 if id.peer == skip_peer {
560 continue;
561 }
562
563 let mut handled = false;
564 let ans = self.try_with_node_mut(map, id, |target| {
565 let (target_id, target) = target?;
567 if target.ctr_last() == id.counter {
568 self.record_map_entry_before_replacement(target_id, Some(target.clone()));
569 let target = Arc::make_mut(&mut target.inner);
570 handled = true;
571 target.has_succ = true;
572 None
573 } else {
574 let new_node =
579 target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
580 self.record_map_entry_before_replacement(target_id, Some(target.clone()));
581 let target = Arc::make_mut(&mut target.inner);
582 target.len -= new_node.len;
583 Some(new_node)
584 }
585 });
586
587 if let Some(new_node) = ans {
588 self.record_map_entry_before_mutation(map, new_node.id_start());
589 map.insert(new_node.id_start(), new_node);
590 } else if !handled {
591 let mut unhandled_dep_points = self.unhandled_dep_points.lock();
592 if unhandled_dep_points.insert(id) {
593 self.record_unhandled_dep_point_added(id);
594 }
595 }
596 }
597 }
598
599 fn ensure_lazy_load_node(&self, id: ID) {
600 if self.shallow_since_vv.includes_id(id) {
601 return;
602 }
603
604 loop {
605 let unparsed_end = {
608 let unparsed_vv = self.unparsed_vv.lock();
609 unparsed_vv.get(&id.peer).copied().unwrap_or(0)
610 };
611 if unparsed_end <= id.counter {
612 return;
613 }
614
615 let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
616 let Some(nodes) = self
617 .change_store
618 .get_dag_nodes_that_contains(last_unparsed_id)
619 else {
620 panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
621 };
622
623 self.lazy_load_nodes_internal(nodes, id.peer, None);
624 }
625 }
626
627 pub fn total_parsed_dag_node(&self) -> usize {
628 self.map.lock().len()
629 }
630
631 pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
632 assert!(self.vv.is_empty());
633 *self.unparsed_vv.lock() = v.vv.clone();
634 self.vv = v.vv;
635 self.frontiers = v.frontiers;
636 if let Some((vv, f)) = v.start_version {
637 if !f.is_empty() {
638 assert!(f.len() == 1);
639 let id = f.as_single().unwrap();
640 let node = self.get(id).unwrap();
641 assert!(node.cnt == id.counter);
642 self.shallow_root_frontiers_deps = node.deps.clone();
643 }
644 self.shallow_since_frontiers = f;
645 self.shallow_since_vv = ImVersionVector::from_vv(&vv);
646 }
647 }
648
649 #[instrument(skip(self))]
659 pub fn check_dag_correctness(&self) {
660 {
661 let unparsed_vv = self.unparsed_vv.lock().clone();
663 for (peer, cnt) in unparsed_vv.iter() {
664 if *cnt == 0 {
665 continue;
666 }
667
668 let mut end_cnt = *cnt;
669 let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
670 while end_cnt > init_counter {
671 let cnt = end_cnt - 1;
672 self.ensure_lazy_load_node(ID::new(*peer, cnt));
673 end_cnt = self.unparsed_vv.lock().get(peer).copied().unwrap_or(0);
674 }
675 }
676
677 self.unparsed_vv.lock().clear();
678 }
679 {
680 let map = self.map.lock();
682 let mut last_end_id = ID::new(0, 0);
683 for (&id, node) in map.iter() {
684 let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
685 if id.peer == last_end_id.peer {
686 assert!(id.counter == last_end_id.counter);
687 } else {
688 assert_eq!(id.counter, init_counter);
689 }
690
691 last_end_id = id.inc(node.len as Counter);
692 }
693 }
694 {
695 let map = self.map.lock();
697 check_always_dep_on_last_id(&map);
698 }
699 {
700 let map = self.map.lock();
702 'outer: for (_, node) in map.iter() {
703 let mut this_lamport = 0;
704 for dep in node.deps.iter() {
705 if self.shallow_since_vv.includes_id(dep) {
706 continue 'outer;
707 }
708
709 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
710 this_lamport = this_lamport.max(dep_node.lamport_end());
711 }
712
713 assert_eq!(this_lamport, node.lamport);
714 }
715 }
716 {
717 let map = self.map.lock().clone();
719 'outer: for (_, node) in map.iter() {
720 let actual_vv = self.ensure_vv_for(node);
721 let mut expected_vv = ImVersionVector::default();
722 for dep in node.deps.iter() {
723 if self.shallow_since_vv.includes_id(dep) {
724 continue 'outer;
725 }
726
727 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
728 self.ensure_vv_for(dep_node);
729 expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
730 expected_vv.extend_to_include_last_id(dep);
731 }
732
733 assert_eq!(actual_vv, expected_vv);
734 }
735 }
736 {
737 let mut maybe_frontiers = FxHashSet::default();
739 let map = self.map.lock();
740 for (_, node) in map.iter() {
741 maybe_frontiers.insert(node.id_last());
742 }
743
744 for (_, node) in map.iter() {
745 for dep in node.deps.iter() {
746 maybe_frontiers.remove(&dep);
747 }
748 }
749
750 let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
751 assert_eq!(maybe_frontiers, frontiers);
752 }
753 }
754
755 pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
756 for id in deps.iter() {
757 if !self.vv.includes_id(id) {
758 return false;
759 }
760 }
761
762 if self.is_before_shallow_root(deps) {
763 return false;
764 }
765
766 true
767 }
768
769 pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
770 if self.shallow_since_vv.is_empty() {
775 return false;
776 }
777
778 if deps.is_empty() {
779 return true;
780 }
781
782 if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
783 return true;
784 }
785
786 if deps
787 .iter()
788 .any(|x| self.shallow_since_frontiers.contains(&x))
789 {
790 return deps != &self.shallow_since_frontiers;
791 }
792
793 false
794 }
795
796 pub(crate) fn travel_ancestors(
800 &self,
801 id: ID,
802 f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
803 ) {
804 struct PendingNode(AppDagNode);
805 impl PartialEq for PendingNode {
806 fn eq(&self, other: &Self) -> bool {
807 self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
808 }
809 }
810 impl Eq for PendingNode {}
811 impl PartialOrd for PendingNode {
812 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
813 Some(self.cmp(other))
814 }
815 }
816 impl Ord for PendingNode {
817 fn cmp(&self, other: &Self) -> Ordering {
818 self.0
819 .lamport_last()
820 .cmp(&other.0.lamport_last())
821 .then_with(|| self.0.peer.cmp(&other.0.peer))
822 }
823 }
824
825 let mut visited = FxHashSet::default();
826 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
827 pending.push(PendingNode(self.get(id).unwrap()));
828 while let Some(PendingNode(node)) = pending.pop() {
829 if f(&node).is_break() {
830 break;
831 }
832
833 for dep in node.deps.iter() {
834 let Some(dep_node) = self.get(dep) else {
835 continue;
836 };
837 if visited.contains(&dep_node.id_start()) {
838 continue;
839 }
840
841 visited.insert(dep_node.id_start());
842 pending.push(PendingNode(dep_node));
843 }
844 }
845 }
846
847 pub(crate) fn update_version_on_new_local_op(
848 &mut self,
849 deps: &Frontiers,
850 start_id: ID,
851 start_lamport: Lamport,
852 len: usize,
853 ) {
854 let last_id = start_id.inc(len as Counter - 1);
855 self.vv.set_last(last_id);
857 self.frontiers.update_frontiers_on_new_change(last_id, deps);
858 match &mut self.pending_txn_node {
859 Some(node) => {
860 assert!(
861 node.peer == start_id.peer
862 && node.cnt + node.len as Counter == start_id.counter
863 && deps.len() == 1
864 && deps.as_single().unwrap().peer == start_id.peer
865 );
866 let inner = Arc::make_mut(&mut node.inner);
867 inner.len += len;
868 }
869 None => {
870 let node = AppDagNode {
871 inner: Arc::new(AppDagNodeInner {
872 peer: start_id.peer,
873 cnt: start_id.counter,
874 lamport: start_lamport,
875 deps: deps.clone(),
876 vv: OnceCell::new(),
877 has_succ: false,
878 len,
879 }),
880 };
881 self.pending_txn_node = Some(node);
882 }
883 }
884 }
885
886 pub(crate) fn latest_vv_contains_peer(&self, peer: PeerID) -> bool {
887 self.vv.contains_key(&peer) && *self.vv.get(&peer).unwrap() > 0
888 }
889}
890
891fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
892 for (_, node) in map.iter() {
893 for dep in node.deps.iter() {
894 let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
895 continue;
897 };
898 assert_eq!(dep_node.id_start(), dep_id);
899 if dep_node.contains_id(dep) {
900 assert_eq!(dep_node.id_last(), dep);
901 }
902 }
903 }
904}
905
906impl HasIndex for AppDagNode {
907 type Int = Counter;
908 fn get_start_index(&self) -> Self::Int {
909 self.cnt
910 }
911
912 fn get_end_index(&self) -> Self::Int {
913 self.cnt + self.len as Counter
914 }
915}
916
917impl Sliceable for AppDagNode {
918 fn slice(&self, from: usize, to: usize) -> Self {
919 AppDagNodeInner {
920 peer: self.peer,
921 cnt: self.cnt + from as Counter,
922 lamport: self.lamport + from as Lamport,
923 deps: if from > 0 {
924 Frontiers::from_id(self.id_start().inc(from as Counter - 1))
925 } else {
926 self.deps.clone()
927 },
928 vv: if let Some(vv) = self.vv.get() {
929 let mut new = vv.clone();
930 new.insert(self.peer, self.cnt + from as Counter);
931 OnceCell::with_value(new)
932 } else {
933 OnceCell::new()
934 },
935 has_succ: if to == self.len { self.has_succ } else { true },
936 len: to - from,
937 }
938 .into()
939 }
940}
941
942impl HasId for AppDagNode {
943 fn id_start(&self) -> ID {
944 ID {
945 peer: self.peer,
946 counter: self.cnt,
947 }
948 }
949}
950
951impl HasCounter for AppDagNode {
952 fn ctr_start(&self) -> Counter {
953 self.cnt
954 }
955}
956
957impl HasLength for AppDagNode {
958 fn atom_len(&self) -> usize {
959 self.len
960 }
961
962 fn content_len(&self) -> usize {
963 self.len
964 }
965}
966
967impl Mergable for AppDagNode {
968 fn is_mergable(&self, other: &Self, _conf: &()) -> bool
969 where
970 Self: Sized,
971 {
972 !self.has_succ
973 && self.peer == other.peer
974 && self.cnt + self.len as Counter == other.cnt
975 && other.deps.len() == 1
976 && self.lamport + self.len as Lamport == other.lamport
977 && other.deps.as_single().unwrap().peer == self.peer
978 }
979
980 fn merge(&mut self, other: &Self, _conf: &())
981 where
982 Self: Sized,
983 {
984 assert_eq!(
985 other.deps.as_single().unwrap().counter,
986 self.cnt + self.len as Counter - 1
987 );
988 let this = Arc::make_mut(&mut self.inner);
989 this.len += other.len;
990 this.has_succ = other.has_succ;
991 }
992}
993
994impl HasLamport for AppDagNode {
995 fn lamport(&self) -> Lamport {
996 self.lamport
997 }
998}
999
1000impl DagNode for AppDagNode {
1001 fn deps(&self) -> &Frontiers {
1002 &self.deps
1003 }
1004}
1005
1006impl Dag for AppDag {
1007 type Node = AppDagNode;
1008
1009 fn frontier(&self) -> &Frontiers {
1010 &self.frontiers
1011 }
1012
1013 fn get(&self, id: ID) -> Option<Self::Node> {
1014 self.ensure_lazy_load_node(id);
1015 let binding = self.map.lock();
1016 if let Some(x) = binding.range(..=id).next_back() {
1017 if x.1.contains_id(id) {
1018 return Some(x.1.clone());
1021 }
1022 }
1023
1024 if let Some(node) = &self.pending_txn_node {
1025 if node.peer == id.peer && node.cnt <= id.counter {
1026 assert!(node.cnt + node.len as Counter > id.counter);
1027 return Some(node.clone());
1028 }
1029 }
1030
1031 None
1032 }
1033
1034 fn vv(&self) -> &VersionVector {
1035 &self.vv
1036 }
1037
1038 fn contains(&self, id: ID) -> bool {
1039 self.vv.includes_id(id)
1040 }
1041}
1042
1043impl AppDag {
1044 pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
1048 self.get(id).map(|x| {
1049 let mut vv = self.ensure_vv_for(&x);
1050 vv.insert(id.peer, id.counter + 1);
1051 vv
1052 })
1053 }
1054
1055 pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
1056 if target_node.vv.get().is_none() {
1057 let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
1064 while let Some(top_node) = stack.pop() {
1065 if top_node.vv.get().is_some() {
1066 continue;
1067 }
1068
1069 let mut ans_vv = ImVersionVector::default();
1070 if top_node.deps == self.shallow_root_frontiers_deps {
1071 for (&p, &c) in self.shallow_since_vv.iter() {
1072 ans_vv.insert(p, c);
1073 }
1074 } else {
1075 let mut all_deps_processed = true;
1076 for id in top_node.deps.iter() {
1077 let node = self.get(id).expect("deps should be in the dag");
1078 if node.vv.get().is_none() {
1079 if all_deps_processed {
1080 stack.push(top_node.clone());
1081 }
1082 all_deps_processed = false;
1083 stack.push(node);
1084 continue;
1085 };
1086 }
1087
1088 if !all_deps_processed {
1089 continue;
1090 }
1091
1092 for id in top_node.deps.iter() {
1093 let node = self.get(id).expect("deps should be in the dag");
1094 let dep_vv = node.vv.get().unwrap();
1095 if ans_vv.is_empty() {
1096 ans_vv = dep_vv.clone();
1097 } else {
1098 ans_vv.extend_to_include_vv(dep_vv.iter());
1099 }
1100
1101 ans_vv.insert(node.peer, node.ctr_end());
1102 }
1103 }
1104
1105 let _ = top_node.vv.set(ans_vv);
1109 }
1110 }
1111
1112 target_node.vv.get().unwrap().clone()
1113 }
1114
1115 pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
1118 if a.peer == b.peer {
1119 return Some(a.counter.cmp(&b.counter));
1120 }
1121
1122 let a = self.get_vv(a).unwrap();
1123 let b = self.get_vv(b).unwrap();
1124 a.partial_cmp(&b)
1125 }
1126
1127 pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
1128 self.get(*id).and_then(|node| {
1129 assert!(id.counter >= node.cnt);
1130 if node.cnt + node.len as Counter > id.counter {
1131 Some(node.lamport + (id.counter - node.cnt) as Lamport)
1132 } else {
1133 None
1134 }
1135 })
1136 }
1137
1138 pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
1139 let mut lamport = 0;
1140 for id in deps.iter() {
1141 let x = self.get_lamport(&id)?;
1142 lamport = lamport.max(x + 1);
1143 }
1144
1145 Some(lamport)
1146 }
1147
1148 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1152 if frontiers == &self.shallow_root_frontiers_deps {
1153 let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
1154 return Some(vv);
1155 }
1156
1157 let mut vv: VersionVector = Default::default();
1158 for id in frontiers.iter() {
1159 let x = self.get(id)?;
1160 let target_vv = self.ensure_vv_for(&x);
1161 vv.extend_to_include_vv(target_vv.iter());
1162 vv.extend_to_include_last_id(id);
1163 }
1164
1165 Some(vv)
1166 }
1167
1168 #[allow(unused)]
1169 pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
1170 if frontiers.is_empty() {
1171 return Default::default();
1172 }
1173
1174 let mut iter = frontiers.iter();
1175 let mut vv = {
1176 let id = iter.next().unwrap();
1177 let Some(x) = self.get(id) else {
1178 unreachable!()
1179 };
1180 let mut vv = self.ensure_vv_for(&x);
1181 vv.extend_to_include_last_id(id);
1182 vv
1183 };
1184
1185 for id in iter {
1186 let Some(x) = self.get(id) else {
1187 unreachable!()
1188 };
1189 let x = self.ensure_vv_for(&x);
1190 vv.extend_to_include_vv(x.iter());
1191 vv.extend_to_include_last_id(id);
1192 }
1193
1194 vv
1195 }
1196
1197 pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1198 if vv.is_empty() {
1199 return Default::default();
1200 }
1201
1202 let this = vv;
1203 let last_ids: Frontiers = this
1204 .iter()
1205 .filter_map(|(client_id, cnt)| {
1206 if *cnt == 0 {
1207 return None;
1208 }
1209
1210 if self
1211 .shallow_since_vv
1212 .includes_id(ID::new(*client_id, *cnt - 1))
1213 {
1214 return None;
1215 }
1216
1217 Some(ID::new(*client_id, cnt - 1))
1218 })
1219 .collect();
1220
1221 if last_ids.is_empty() {
1222 return self.shallow_since_frontiers.clone();
1223 }
1224
1225 shrink_frontiers(&last_ids, self).unwrap()
1226 }
1227
1228 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1229 if vv.is_empty() {
1230 return Default::default();
1231 }
1232
1233 let this = vv;
1234 let last_ids: Frontiers = this
1235 .iter()
1236 .filter_map(|(client_id, cnt)| {
1237 if *cnt == 0 {
1238 return None;
1239 }
1240
1241 if self
1242 .shallow_since_vv
1243 .includes_id(ID::new(*client_id, *cnt - 1))
1244 {
1245 return None;
1246 }
1247
1248 Some(ID::new(*client_id, cnt - 1))
1249 })
1250 .collect();
1251
1252 if last_ids.is_empty() {
1253 return self.shallow_since_frontiers.clone();
1254 }
1255
1256 shrink_frontiers(&last_ids, self).unwrap()
1257 }
1258
1259 pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1260 if frontiers.is_empty() {
1261 return 0;
1262 }
1263
1264 let mut iter = frontiers.iter();
1265 let mut lamport = {
1266 let id = iter.next().unwrap();
1267 let Some(x) = self.get(id) else {
1268 unreachable!()
1269 };
1270 assert!(id.counter >= x.cnt);
1271 (id.counter - x.cnt) as Lamport + x.lamport + 1
1272 };
1273
1274 for id in iter {
1275 let Some(x) = self.get(id) else {
1276 unreachable!()
1277 };
1278 assert!(id.counter >= x.cnt);
1279 lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1280 }
1281
1282 lamport
1283 }
1284
1285 pub fn get_frontiers(&self) -> &Frontiers {
1286 &self.frontiers
1287 }
1288
1289 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1293 if &self.frontiers == other {
1294 Ordering::Equal
1295 } else if other.iter().all(|id| self.vv.includes_id(id)) {
1296 Ordering::Greater
1297 } else {
1298 Ordering::Less
1299 }
1300 }
1301
1302 pub fn cmp_frontiers(
1307 &self,
1308 a: &Frontiers,
1309 b: &Frontiers,
1310 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1311 let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1312 let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1313 Ok(a.partial_cmp(&b))
1314 }
1315}
1316
1317#[derive(Debug, PartialEq, Eq)]
1318pub struct FrontiersNotIncluded;
1319impl Display for FrontiersNotIncluded {
1320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1321 f.write_str("The given Frontiers are not included by the doc")
1322 }
1323}
1324
1325#[cfg(test)]
1326mod ensure_vv_for_tests {
1327 use super::*;
1328 use crate::arena::SharedArena;
1329 use std::sync::atomic::AtomicI64;
1330
1331 fn make_dag_node(peer: PeerID, cnt: Counter, len: usize, deps: Frontiers) -> AppDagNode {
1332 AppDagNodeInner {
1333 vv: OnceCell::new(),
1334 peer,
1335 cnt,
1336 lamport: cnt as Lamport,
1337 deps,
1338 has_succ: false,
1339 len,
1340 }
1341 .into()
1342 }
1343
1344 #[test]
1362 fn diamond_dep_ensure_vv_for_does_not_double_set() {
1363 let change_store = ChangeStore::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)));
1364 let dag = AppDag::new(change_store);
1365
1366 let x = make_dag_node(1, 0, 1, Frontiers::default());
1367 let y = make_dag_node(2, 0, 1, Frontiers::from_id(ID::new(1, 0)));
1368 let mut z_deps = Frontiers::default();
1369 z_deps.push(ID::new(1, 0));
1370 z_deps.push(ID::new(2, 0));
1371 let z = make_dag_node(3, 0, 1, z_deps);
1372
1373 {
1374 let mut map = dag.map.lock();
1375 map.insert(x.id_start(), x);
1376 map.insert(y.id_start(), y);
1377 map.insert(z.id_start(), z.clone());
1378 }
1379
1380 let vv = dag.ensure_vv_for(&z);
1384 assert_eq!(vv.get(&1).copied(), Some(1));
1385 assert_eq!(vv.get(&2).copied(), Some(1));
1386 assert!(vv.get(&3).is_none());
1387 }
1388}