1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
6use fxhash::FxHashSet;
7use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
8use once_cell::sync::OnceCell;
9use rle::{HasIndex, HasLength, Mergable, Sliceable};
10use smallvec::SmallVec;
11use std::cmp::Ordering;
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
13use std::fmt::Display;
14use std::ops::{ControlFlow, Deref};
15use std::sync::{Arc, Mutex};
16use tracing::{instrument, trace};
17
18use super::change_store::BatchDecodeInfo;
19use super::ChangeStore;
20
21#[derive(Debug)]
24pub struct AppDag {
25 change_store: ChangeStore,
26 map: Mutex<BTreeMap<ID, AppDagNode>>,
31 frontiers: Frontiers,
33 vv: VersionVector,
35 shallow_since_frontiers: Frontiers,
37 shallow_root_frontiers_deps: Frontiers,
39 shallow_since_vv: ImVersionVector,
41 unparsed_vv: Mutex<VersionVector>,
47 unhandled_dep_points: Mutex<BTreeSet<ID>>,
51 pending_txn_node: Option<AppDagNode>,
52}
53
54#[derive(Debug, Clone)]
55pub struct AppDagNode {
56 inner: Arc<AppDagNodeInner>,
57}
58
59impl Deref for AppDagNode {
60 type Target = AppDagNodeInner;
61
62 fn deref(&self) -> &Self::Target {
63 &self.inner
64 }
65}
66
67impl AppDagNode {
68 pub fn new(inner: AppDagNodeInner) -> Self {
69 Self {
70 inner: Arc::new(inner),
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
76pub struct AppDagNodeInner {
77 pub(crate) peer: PeerID,
78 pub(crate) cnt: Counter,
79 pub(crate) lamport: Lamport,
80 pub(crate) deps: Frontiers,
81 pub(crate) vv: OnceCell<ImVersionVector>,
82 pub(crate) has_succ: bool,
86 pub(crate) len: usize,
87}
88
89impl From<AppDagNodeInner> for AppDagNode {
90 fn from(inner: AppDagNodeInner) -> Self {
91 AppDagNode {
92 inner: Arc::new(inner),
93 }
94 }
95}
96
97impl AppDag {
98 pub(super) fn new(change_store: ChangeStore) -> Self {
99 Self {
100 change_store,
101 map: Mutex::new(BTreeMap::new()),
102 frontiers: Frontiers::default(),
103 vv: VersionVector::default(),
104 unparsed_vv: Mutex::new(VersionVector::default()),
105 unhandled_dep_points: Mutex::new(BTreeSet::new()),
106 shallow_since_frontiers: Default::default(),
107 shallow_root_frontiers_deps: Default::default(),
108 shallow_since_vv: Default::default(),
109 pending_txn_node: None,
110 }
111 }
112
113 pub fn frontiers(&self) -> &Frontiers {
114 &self.frontiers
115 }
116
117 pub fn vv(&self) -> &VersionVector {
118 &self.vv
119 }
120
121 pub fn shallow_since_vv(&self) -> &ImVersionVector {
122 &self.shallow_since_vv
123 }
124
125 pub fn shallow_since_frontiers(&self) -> &Frontiers {
126 &self.shallow_since_frontiers
127 }
128
129 pub fn is_empty(&self) -> bool {
130 self.vv.is_empty()
131 }
132
133 #[tracing::instrument(skip_all, name = "handle_new_change")]
134 pub(super) fn handle_new_change(&mut self, change: &Change, from_local: bool) {
135 let len = change.content_len();
136 self.update_version_on_new_change(change, from_local);
137 #[cfg(debug_assertions)]
138 {
139 let unhandled_dep_points = self.unhandled_dep_points.try_lock().unwrap();
140 let c = unhandled_dep_points
141 .range(change.id_start()..change.id_end())
142 .count();
143 assert!(c == 0);
144 }
145
146 let mut inserted = false;
147 if change.deps_on_self() {
148 inserted = self.with_last_mut_of_peer(change.id.peer, |last| {
150 let last = last.unwrap();
151 if last.has_succ {
152 return false;
154 }
155
156 assert_eq!(last.peer, change.id.peer, "peer id is not the same");
157 assert_eq!(
158 last.cnt + last.len as Counter,
159 change.id.counter,
160 "counter is not continuous"
161 );
162 assert_eq!(
163 last.lamport + last.len as Lamport,
164 change.lamport,
165 "lamport is not continuous"
166 );
167 let last = Arc::make_mut(&mut last.inner);
168 last.len = (change.id.counter - last.cnt) as usize + len;
169 last.has_succ = false;
170 true
171 });
172 }
173
174 if !inserted {
175 let node: AppDagNode = AppDagNodeInner {
176 vv: OnceCell::new(),
177 peer: change.id.peer,
178 cnt: change.id.counter,
179 lamport: change.lamport,
180 deps: change.deps.clone(),
181 has_succ: false,
182 len,
183 }
184 .into();
185
186 let mut map = self.map.try_lock().unwrap();
187 map.insert(node.id_start(), node);
188 self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
189 }
190 }
191
192 fn try_with_node_mut<R>(
193 &self,
194 map: &mut BTreeMap<ID, AppDagNode>,
195 id: ID,
196 f: impl FnOnce(Option<&mut AppDagNode>) -> R,
197 ) -> R {
198 let x = map.range_mut(..=id).next_back();
199 if let Some((_, node)) = x {
200 if node.contains_id(id) {
201 f(Some(node))
202 } else {
203 f(None)
204 }
205 } else {
206 f(None)
207 }
208 }
209
210 pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
212 for dep in change.deps.iter() {
213 match self.get_lamport(&dep) {
214 Some(lamport) => {
215 change.lamport = change.lamport.max(lamport + 1);
216 }
217 None => return Err(()),
218 }
219 }
220 Ok(())
221 }
222
223 pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
224 let Some(node) = self.get(id) else {
225 return Frontiers::default();
226 };
227
228 let offset = id.counter - node.cnt;
229 if offset == 0 {
230 node.deps.clone()
231 } else {
232 ID::new(id.peer, node.cnt + offset - 1).into()
233 }
234 }
235
236 pub(crate) fn with_last_mut_of_peer<R>(
237 &mut self,
238 peer: PeerID,
239 f: impl FnOnce(Option<&mut AppDagNode>) -> R,
240 ) -> R {
241 self.lazy_load_last_of_peer(peer);
242 let mut binding = self.map.try_lock().unwrap();
243 let last = binding
244 .range_mut(..=ID::new(peer, Counter::MAX))
245 .next_back()
246 .map(|(_, v)| v);
247 f(last)
248 }
249
250 fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
251 if from_local {
252 assert!(self.pending_txn_node.take().is_some());
253 assert_eq!(
254 self.vv.get(&change.id.peer).copied().unwrap_or(0),
255 change.ctr_end()
256 );
257 } else {
258 let id_last = change.id_last();
259 self.frontiers
260 .update_frontiers_on_new_change(id_last, &change.deps);
261 assert!(self.pending_txn_node.is_none());
262 assert_eq!(
263 self.vv.get(&change.id.peer).copied().unwrap_or(0),
264 change.id.counter
265 );
266 self.vv.extend_to_include_last_id(id_last);
267 }
268 }
269
270 pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
271 let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
272 if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
273 return;
274 }
275
276 let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
277 panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
278 };
279
280 self.lazy_load_nodes_internal(nodes, peer, None);
281 }
282
283 fn lazy_load_nodes_internal(
284 &self,
285 nodes: Vec<AppDagNode>,
286 peer: u64,
287 map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
288 ) {
289 assert!(!nodes.is_empty());
290 let mut map_guard = None;
291 let map = map_input.unwrap_or_else(|| {
292 map_guard = Some(self.map.try_lock().unwrap());
293 map_guard.as_mut().unwrap()
294 });
295 let new_dag_start_counter_for_the_peer = nodes[0].cnt;
296 let nodes_cnt_end = nodes.last().unwrap().ctr_end();
297 let mut unparsed_vv = self.unparsed_vv.try_lock().unwrap();
298 let end_counter = unparsed_vv[&peer];
299 assert!(end_counter <= nodes_cnt_end);
300 let mut deps_on_others = Vec::new();
301 let mut break_point_set = self.unhandled_dep_points.try_lock().unwrap();
302 for mut node in nodes {
303 if node.cnt >= end_counter {
304 break;
306 }
307
308 if node.cnt + node.len as Counter > end_counter {
309 node = node.slice(0, (end_counter - node.cnt) as usize);
310 }
312
313 for dep in node.deps.iter() {
314 if dep.peer != peer {
315 deps_on_others.push(dep);
316 }
317 }
318
319 let break_point_ends: Vec<_> = break_point_set
321 .range(node.id_start()..node.id_end())
322 .map(|id| (id.counter - node.cnt) as usize + 1)
323 .collect();
324 if break_point_ends.is_empty() {
325 map.insert(node.id_start(), node);
326 } else {
327 let mut slice_start = 0;
328 for slice_end in break_point_ends.iter().copied() {
329 let mut slice_node = node.slice(slice_start, slice_end);
330 let inner = Arc::make_mut(&mut slice_node.inner);
331 inner.has_succ = true;
332 map.insert(slice_node.id_start(), slice_node);
333 slice_start = slice_end;
334 }
335
336 let last_break_point = break_point_ends.last().copied().unwrap();
337 if last_break_point != node.len {
338 let slice_node = node.slice(last_break_point, node.len);
339 map.insert(slice_node.id_start(), slice_node);
340 }
341
342 for break_point in break_point_ends.into_iter() {
343 break_point_set.remove(&node.id_start().inc(break_point as Counter - 1));
344 }
345 }
346 }
347
348 if new_dag_start_counter_for_the_peer == 0 {
349 unparsed_vv.remove(&peer);
350 } else {
351 unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
352 }
353 drop(unparsed_vv);
354 drop(break_point_set);
355 self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
356 }
357
358 fn handle_deps_break_points(
359 &self,
360 ids: impl IntoIterator<Item = ID>,
361 skip_peer: PeerID,
362 map: Option<&mut BTreeMap<ID, AppDagNode>>,
363 ) {
364 let mut map_guard = None;
365 let map = map.unwrap_or_else(|| {
366 map_guard = Some(self.map.try_lock().unwrap());
367 map_guard.as_mut().unwrap()
368 });
369 for id in ids {
370 if id.peer == skip_peer {
371 continue;
372 }
373
374 let mut handled = false;
375 let ans = self.try_with_node_mut(map, id, |target| {
376 let target = target?;
378 if target.ctr_last() == id.counter {
379 let target = Arc::make_mut(&mut target.inner);
380 handled = true;
381 target.has_succ = true;
382 None
383 } else {
384 let new_node =
389 target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
390 let target = Arc::make_mut(&mut target.inner);
391 target.len -= new_node.len;
392 Some(new_node)
393 }
394 });
395
396 if let Some(new_node) = ans {
397 map.insert(new_node.id_start(), new_node);
398 } else if !handled {
399 self.unhandled_dep_points.try_lock().unwrap().insert(id);
400 }
401 }
402 }
403
404 fn ensure_lazy_load_node(&self, id: ID) {
405 if self.shallow_since_vv.includes_id(id) {
406 return;
407 }
408
409 loop {
410 let unparsed_end = {
413 let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
414 unparsed_vv.get(&id.peer).copied().unwrap_or(0)
415 };
416 if unparsed_end <= id.counter {
417 return;
418 }
419
420 let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
421 let Some(nodes) = self
422 .change_store
423 .get_dag_nodes_that_contains(last_unparsed_id)
424 else {
425 panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
426 };
427
428 self.lazy_load_nodes_internal(nodes, id.peer, None);
429 }
430 }
431
432 pub fn total_parsed_dag_node(&self) -> usize {
433 self.map.try_lock().unwrap().len()
434 }
435
436 pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
437 assert!(self.vv.is_empty());
438 *self.unparsed_vv.try_lock().unwrap() = v.vv.clone();
439 self.vv = v.vv;
440 self.frontiers = v.frontiers;
441 if let Some((vv, f)) = v.start_version {
442 if !f.is_empty() {
443 assert!(f.len() == 1);
444 let id = f.as_single().unwrap();
445 let node = self.get(id).unwrap();
446 assert!(node.cnt == id.counter);
447 self.shallow_root_frontiers_deps = node.deps.clone();
448 }
449 self.shallow_since_frontiers = f;
450 self.shallow_since_vv = ImVersionVector::from_vv(&vv);
451 }
452 }
453
454 #[instrument(skip(self))]
464 pub fn check_dag_correctness(&self) {
465 {
466 let unparsed_vv = self.unparsed_vv.try_lock().unwrap().clone();
468 for (peer, cnt) in unparsed_vv.iter() {
469 if *cnt == 0 {
470 continue;
471 }
472
473 let mut end_cnt = *cnt;
474 let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
475 while end_cnt > init_counter {
476 let cnt = end_cnt - 1;
477 self.ensure_lazy_load_node(ID::new(*peer, cnt));
478 end_cnt = self
479 .unparsed_vv
480 .try_lock()
481 .unwrap()
482 .get(peer)
483 .copied()
484 .unwrap_or(0);
485 }
486 }
487
488 self.unparsed_vv.try_lock().unwrap().clear();
489 }
490 {
491 let map = self.map.try_lock().unwrap();
493 let mut last_end_id = ID::new(0, 0);
494 for (&id, node) in map.iter() {
495 let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
496 if id.peer == last_end_id.peer {
497 assert!(id.counter == last_end_id.counter);
498 } else {
499 assert_eq!(id.counter, init_counter);
500 }
501
502 last_end_id = id.inc(node.len as Counter);
503 }
504 }
505 {
506 let map = self.map.try_lock().unwrap();
508 check_always_dep_on_last_id(&map);
509 }
510 {
511 let map = self.map.try_lock().unwrap();
513 'outer: for (_, node) in map.iter() {
514 let mut this_lamport = 0;
515 for dep in node.deps.iter() {
516 if self.shallow_since_vv.includes_id(dep) {
517 continue 'outer;
518 }
519
520 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
521 this_lamport = this_lamport.max(dep_node.lamport_end());
522 }
523
524 assert_eq!(this_lamport, node.lamport);
525 }
526 }
527 {
528 let map = self.map.try_lock().unwrap().clone();
530 'outer: for (_, node) in map.iter() {
531 let actual_vv = self.ensure_vv_for(node);
532 let mut expected_vv = ImVersionVector::default();
533 for dep in node.deps.iter() {
534 if self.shallow_since_vv.includes_id(dep) {
535 continue 'outer;
536 }
537
538 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
539 self.ensure_vv_for(dep_node);
540 expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
541 expected_vv.extend_to_include_last_id(dep);
542 }
543
544 assert_eq!(actual_vv, expected_vv);
545 }
546 }
547 {
548 let mut maybe_frontiers = FxHashSet::default();
550 let map = self.map.try_lock().unwrap();
551 for (_, node) in map.iter() {
552 maybe_frontiers.insert(node.id_last());
553 }
554
555 for (_, node) in map.iter() {
556 for dep in node.deps.iter() {
557 maybe_frontiers.remove(&dep);
558 }
559 }
560
561 let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
562 assert_eq!(maybe_frontiers, frontiers);
563 }
564 }
565
566 pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
567 for id in deps.iter() {
568 if !self.vv.includes_id(id) {
569 return false;
570 }
571 }
572
573 if self.is_before_shallow_root(deps) {
574 return false;
575 }
576
577 true
578 }
579
580 pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
581 trace!("Is on shallow history? deps={:?}", deps);
582 trace!("self.shallow_since_vv {:?}", &self.shallow_since_vv);
583 trace!("self.shallow_frontiers {:?}", &self.shallow_since_frontiers);
584
585 if self.shallow_since_vv.is_empty() {
586 return false;
587 }
588
589 if deps.is_empty() {
590 return true;
591 }
592
593 if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
594 return true;
595 }
596
597 if deps
598 .iter()
599 .any(|x| self.shallow_since_frontiers.contains(&x))
600 {
601 return deps != &self.shallow_since_frontiers;
602 }
603
604 false
605 }
606
607 pub(crate) fn travel_ancestors(
611 &self,
612 id: ID,
613 f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
614 ) {
615 struct PendingNode(AppDagNode);
616 impl PartialEq for PendingNode {
617 fn eq(&self, other: &Self) -> bool {
618 self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
619 }
620 }
621 impl Eq for PendingNode {}
622 impl PartialOrd for PendingNode {
623 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
624 Some(self.cmp(other))
625 }
626 }
627 impl Ord for PendingNode {
628 fn cmp(&self, other: &Self) -> Ordering {
629 self.0
630 .lamport_last()
631 .cmp(&other.0.lamport_last())
632 .then_with(|| self.0.peer.cmp(&other.0.peer))
633 }
634 }
635
636 let mut visited = FxHashSet::default();
637 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
638 pending.push(PendingNode(self.get(id).unwrap()));
639 while let Some(PendingNode(node)) = pending.pop() {
640 if f(&node).is_break() {
641 break;
642 }
643
644 for dep in node.deps.iter() {
645 let Some(dep_node) = self.get(dep) else {
646 continue;
647 };
648 if visited.contains(&dep_node.id_start()) {
649 continue;
650 }
651
652 visited.insert(dep_node.id_start());
653 pending.push(PendingNode(dep_node));
654 }
655 }
656 }
657
658 pub(crate) fn update_version_on_new_local_op(
659 &mut self,
660 deps: &Frontiers,
661 start_id: ID,
662 start_lamport: Lamport,
663 len: usize,
664 ) {
665 let last_id = start_id.inc(len as Counter - 1);
666 self.vv.set_last(last_id);
668 self.frontiers.update_frontiers_on_new_change(last_id, deps);
669 match &mut self.pending_txn_node {
670 Some(node) => {
671 assert!(
672 node.peer == start_id.peer
673 && node.cnt + node.len as Counter == start_id.counter
674 && deps.len() == 1
675 && deps.as_single().unwrap().peer == start_id.peer
676 );
677 let inner = Arc::make_mut(&mut node.inner);
678 inner.len += len;
679 }
680 None => {
681 let node = AppDagNode {
682 inner: Arc::new(AppDagNodeInner {
683 peer: start_id.peer,
684 cnt: start_id.counter,
685 lamport: start_lamport,
686 deps: deps.clone(),
687 vv: OnceCell::new(),
688 has_succ: false,
689 len,
690 }),
691 };
692 self.pending_txn_node = Some(node);
693 }
694 }
695 }
696}
697
698fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
699 for (_, node) in map.iter() {
700 for dep in node.deps.iter() {
701 let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
702 continue;
704 };
705 assert_eq!(dep_node.id_start(), dep_id);
706 if dep_node.contains_id(dep) {
707 assert_eq!(dep_node.id_last(), dep);
708 }
709 }
710 }
711}
712
713impl HasIndex for AppDagNode {
714 type Int = Counter;
715 fn get_start_index(&self) -> Self::Int {
716 self.cnt
717 }
718
719 fn get_end_index(&self) -> Self::Int {
720 self.cnt + self.len as Counter
721 }
722}
723
724impl Sliceable for AppDagNode {
725 fn slice(&self, from: usize, to: usize) -> Self {
726 AppDagNodeInner {
727 peer: self.peer,
728 cnt: self.cnt + from as Counter,
729 lamport: self.lamport + from as Lamport,
730 deps: if from > 0 {
731 Frontiers::from_id(self.id_start().inc(from as Counter - 1))
732 } else {
733 self.deps.clone()
734 },
735 vv: if let Some(vv) = self.vv.get() {
736 let mut new = vv.clone();
737 new.insert(self.peer, self.cnt + from as Counter);
738 OnceCell::with_value(new)
739 } else {
740 OnceCell::new()
741 },
742 has_succ: if to == self.len { self.has_succ } else { true },
743 len: to - from,
744 }
745 .into()
746 }
747}
748
749impl HasId for AppDagNode {
750 fn id_start(&self) -> ID {
751 ID {
752 peer: self.peer,
753 counter: self.cnt,
754 }
755 }
756}
757
758impl HasCounter for AppDagNode {
759 fn ctr_start(&self) -> Counter {
760 self.cnt
761 }
762}
763
764impl HasLength for AppDagNode {
765 fn atom_len(&self) -> usize {
766 self.len
767 }
768
769 fn content_len(&self) -> usize {
770 self.len
771 }
772}
773
774impl Mergable for AppDagNode {
775 fn is_mergable(&self, other: &Self, _conf: &()) -> bool
776 where
777 Self: Sized,
778 {
779 !self.has_succ
780 && self.peer == other.peer
781 && self.cnt + self.len as Counter == other.cnt
782 && other.deps.len() == 1
783 && self.lamport + self.len as Lamport == other.lamport
784 && other.deps.as_single().unwrap().peer == self.peer
785 }
786
787 fn merge(&mut self, other: &Self, _conf: &())
788 where
789 Self: Sized,
790 {
791 assert_eq!(
792 other.deps.as_single().unwrap().counter,
793 self.cnt + self.len as Counter - 1
794 );
795 let this = Arc::make_mut(&mut self.inner);
796 this.len += other.len;
797 this.has_succ = other.has_succ;
798 }
799}
800
801impl HasLamport for AppDagNode {
802 fn lamport(&self) -> Lamport {
803 self.lamport
804 }
805}
806
807impl DagNode for AppDagNode {
808 fn deps(&self) -> &Frontiers {
809 &self.deps
810 }
811}
812
813impl Dag for AppDag {
814 type Node = AppDagNode;
815
816 fn frontier(&self) -> &Frontiers {
817 &self.frontiers
818 }
819
820 fn get(&self, id: ID) -> Option<Self::Node> {
821 self.ensure_lazy_load_node(id);
822 let binding = self.map.try_lock().unwrap();
823 if let Some(x) = binding.range(..=id).next_back() {
824 if x.1.contains_id(id) {
825 return Some(x.1.clone());
828 }
829 }
830
831 if let Some(node) = &self.pending_txn_node {
832 if node.peer == id.peer && node.cnt <= id.counter {
833 assert!(node.cnt + node.len as Counter > id.counter);
834 return Some(node.clone());
835 }
836 }
837
838 None
839 }
840
841 fn vv(&self) -> &VersionVector {
842 &self.vv
843 }
844
845 fn contains(&self, id: ID) -> bool {
846 self.vv.includes_id(id)
847 }
848}
849
850impl AppDag {
851 pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
855 self.get(id).map(|x| {
856 let mut vv = self.ensure_vv_for(&x);
857 vv.insert(id.peer, id.counter + 1);
858 vv
859 })
860 }
861
862 pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
863 if target_node.vv.get().is_none() {
864 let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
866 while let Some(top_node) = stack.pop() {
867 let mut ans_vv = ImVersionVector::default();
868 if top_node.deps == self.shallow_root_frontiers_deps {
875 for (&p, &c) in self.shallow_since_vv.iter() {
876 ans_vv.insert(p, c);
877 }
878 } else {
879 let mut all_deps_processed = true;
880 for id in top_node.deps.iter() {
881 let node = self.get(id).expect("deps should be in the dag");
882 if node.vv.get().is_none() {
883 if all_deps_processed {
885 stack.push(top_node.clone());
886 }
887 all_deps_processed = false;
888 stack.push(node);
889 continue;
890 };
891 }
892
893 if !all_deps_processed {
894 continue;
895 }
896
897 for id in top_node.deps.iter() {
898 let node = self.get(id).expect("deps should be in the dag");
899 let dep_vv = node.vv.get().unwrap();
900 if ans_vv.is_empty() {
901 ans_vv = dep_vv.clone();
902 } else {
903 ans_vv.extend_to_include_vv(dep_vv.iter());
904 }
905
906 ans_vv.insert(node.peer, node.ctr_end());
907 }
908 }
909
910 top_node.vv.set(ans_vv.clone()).unwrap();
912 }
913 }
914
915 target_node.vv.get().unwrap().clone()
916 }
917
918 pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
921 if a.peer == b.peer {
922 return Some(a.counter.cmp(&b.counter));
923 }
924
925 let a = self.get_vv(a).unwrap();
926 let b = self.get_vv(b).unwrap();
927 a.partial_cmp(&b)
928 }
929
930 pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
931 self.get(*id).and_then(|node| {
932 assert!(id.counter >= node.cnt);
933 if node.cnt + node.len as Counter > id.counter {
934 Some(node.lamport + (id.counter - node.cnt) as Lamport)
935 } else {
936 None
937 }
938 })
939 }
940
941 pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
942 let mut lamport = 0;
943 for id in deps.iter() {
944 let x = self.get_lamport(&id)?;
945 lamport = lamport.max(x + 1);
946 }
947
948 Some(lamport)
949 }
950
951 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
955 if frontiers == &self.shallow_root_frontiers_deps {
956 let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
957 return Some(vv);
958 }
959
960 let mut vv: VersionVector = Default::default();
961 for id in frontiers.iter() {
962 let x = self.get(id)?;
963 let target_vv = self.ensure_vv_for(&x);
964 vv.extend_to_include_vv(target_vv.iter());
965 vv.extend_to_include_last_id(id);
966 }
967
968 Some(vv)
969 }
970
971 #[allow(unused)]
972 pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
973 if frontiers.is_empty() {
974 return Default::default();
975 }
976
977 let mut iter = frontiers.iter();
978 let mut vv = {
979 let id = iter.next().unwrap();
980 let Some(x) = self.get(id) else {
981 unreachable!()
982 };
983 let mut vv = self.ensure_vv_for(&x);
984 vv.extend_to_include_last_id(id);
985 vv
986 };
987
988 for id in iter {
989 let Some(x) = self.get(id) else {
990 unreachable!()
991 };
992 let x = self.ensure_vv_for(&x);
993 vv.extend_to_include_vv(x.iter());
994 vv.extend_to_include_last_id(id);
995 }
996
997 vv
998 }
999
1000 pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1001 if vv.is_empty() {
1002 return Default::default();
1003 }
1004
1005 let this = vv;
1006 let last_ids: Frontiers = this
1007 .iter()
1008 .filter_map(|(client_id, cnt)| {
1009 if *cnt == 0 {
1010 return None;
1011 }
1012
1013 if self
1014 .shallow_since_vv
1015 .includes_id(ID::new(*client_id, *cnt - 1))
1016 {
1017 return None;
1018 }
1019
1020 Some(ID::new(*client_id, cnt - 1))
1021 })
1022 .collect();
1023
1024 if last_ids.is_empty() {
1025 return self.shallow_since_frontiers.clone();
1026 }
1027
1028 shrink_frontiers(&last_ids, self).unwrap()
1029 }
1030
1031 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1032 if vv.is_empty() {
1033 return Default::default();
1034 }
1035
1036 let this = vv;
1037 let last_ids: Frontiers = this
1038 .iter()
1039 .filter_map(|(client_id, cnt)| {
1040 if *cnt == 0 {
1041 return None;
1042 }
1043
1044 if self
1045 .shallow_since_vv
1046 .includes_id(ID::new(*client_id, *cnt - 1))
1047 {
1048 return None;
1049 }
1050
1051 Some(ID::new(*client_id, cnt - 1))
1052 })
1053 .collect();
1054
1055 if last_ids.is_empty() {
1056 return self.shallow_since_frontiers.clone();
1057 }
1058
1059 shrink_frontiers(&last_ids, self).unwrap()
1060 }
1061
1062 pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1063 if frontiers.is_empty() {
1064 return 0;
1065 }
1066
1067 let mut iter = frontiers.iter();
1068 let mut lamport = {
1069 let id = iter.next().unwrap();
1070 let Some(x) = self.get(id) else {
1071 unreachable!()
1072 };
1073 assert!(id.counter >= x.cnt);
1074 (id.counter - x.cnt) as Lamport + x.lamport + 1
1075 };
1076
1077 for id in iter {
1078 let Some(x) = self.get(id) else {
1079 unreachable!()
1080 };
1081 assert!(id.counter >= x.cnt);
1082 lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1083 }
1084
1085 lamport
1086 }
1087
1088 pub fn get_frontiers(&self) -> &Frontiers {
1089 &self.frontiers
1090 }
1091
1092 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1096 if &self.frontiers == other {
1097 Ordering::Equal
1098 } else if other.iter().all(|id| self.vv.includes_id(id)) {
1099 Ordering::Greater
1100 } else {
1101 Ordering::Less
1102 }
1103 }
1104
1105 pub fn cmp_frontiers(
1110 &self,
1111 a: &Frontiers,
1112 b: &Frontiers,
1113 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1114 let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1115 let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1116 Ok(a.partial_cmp(&b))
1117 }
1118}
1119
1120#[derive(Debug, PartialEq, Eq)]
1121pub struct FrontiersNotIncluded;
1122impl Display for FrontiersNotIncluded {
1123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1124 f.write_str("The given Frontiers are not included by the doc")
1125 }
1126}