1use crate::change::{Change, Lamport};
2use crate::dag::{Dag, DagNode};
3use crate::id::{Counter, ID};
4use crate::span::{HasId, HasLamport};
5use crate::sync::Mutex;
6use crate::version::{shrink_frontiers, Frontiers, ImVersionVector, VersionVector};
7use fxhash::FxHashSet;
8use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID};
9use once_cell::sync::OnceCell;
10use rle::{HasIndex, HasLength, Mergable, Sliceable};
11use smallvec::SmallVec;
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
14use std::fmt::Display;
15use std::ops::{ControlFlow, Deref};
16use std::sync::Arc;
17use tracing::instrument;
18
19use super::change_store::BatchDecodeInfo;
20use super::ChangeStore;
21
22#[derive(Debug)]
25pub struct AppDag {
26 change_store: ChangeStore,
27 map: Mutex<BTreeMap<ID, AppDagNode>>,
32 frontiers: Frontiers,
34 vv: VersionVector,
36 shallow_since_frontiers: Frontiers,
38 shallow_root_frontiers_deps: Frontiers,
40 shallow_since_vv: ImVersionVector,
42 unparsed_vv: Mutex<VersionVector>,
48 unhandled_dep_points: Mutex<BTreeSet<ID>>,
52 pending_txn_node: Option<AppDagNode>,
53}
54
55#[derive(Debug, Clone)]
56pub struct AppDagNode {
57 inner: Arc<AppDagNodeInner>,
58}
59
60impl Deref for AppDagNode {
61 type Target = AppDagNodeInner;
62
63 fn deref(&self) -> &Self::Target {
64 &self.inner
65 }
66}
67
68impl AppDagNode {
69 pub fn new(inner: AppDagNodeInner) -> Self {
70 Self {
71 inner: Arc::new(inner),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct AppDagNodeInner {
78 pub(crate) peer: PeerID,
79 pub(crate) cnt: Counter,
80 pub(crate) lamport: Lamport,
81 pub(crate) deps: Frontiers,
82 pub(crate) vv: OnceCell<ImVersionVector>,
83 pub(crate) has_succ: bool,
87 pub(crate) len: usize,
88}
89
90impl From<AppDagNodeInner> for AppDagNode {
91 fn from(inner: AppDagNodeInner) -> Self {
92 AppDagNode {
93 inner: Arc::new(inner),
94 }
95 }
96}
97
98impl AppDag {
99 pub(super) fn new(change_store: ChangeStore) -> Self {
100 Self {
101 change_store,
102 map: Mutex::new(BTreeMap::new()),
103 frontiers: Frontiers::default(),
104 vv: VersionVector::default(),
105 unparsed_vv: Mutex::new(VersionVector::default()),
106 unhandled_dep_points: Mutex::new(BTreeSet::new()),
107 shallow_since_frontiers: Default::default(),
108 shallow_root_frontiers_deps: Default::default(),
109 shallow_since_vv: Default::default(),
110 pending_txn_node: None,
111 }
112 }
113
114 pub fn frontiers(&self) -> &Frontiers {
115 &self.frontiers
116 }
117
118 pub fn vv(&self) -> &VersionVector {
119 &self.vv
120 }
121
122 pub fn shallow_since_vv(&self) -> &ImVersionVector {
123 &self.shallow_since_vv
124 }
125
126 pub fn shallow_since_frontiers(&self) -> &Frontiers {
127 &self.shallow_since_frontiers
128 }
129
130 pub fn is_empty(&self) -> bool {
131 self.vv.is_empty()
132 }
133
134 #[tracing::instrument(skip_all, name = "handle_new_change")]
135 pub(super) fn handle_new_change(&mut self, change: &Change, from_local: bool) {
136 let len = change.content_len();
137 self.update_version_on_new_change(change, from_local);
138 #[cfg(debug_assertions)]
139 {
140 let unhandled_dep_points = self.unhandled_dep_points.lock().unwrap();
141 let c = unhandled_dep_points
142 .range(change.id_start()..change.id_end())
143 .count();
144 assert!(c == 0);
145 }
146
147 let mut inserted = false;
148 if change.deps_on_self() {
149 inserted = self.with_last_mut_of_peer(change.id.peer, |last| {
151 let last = last.unwrap();
152 if last.has_succ {
153 return false;
155 }
156
157 assert_eq!(last.peer, change.id.peer, "peer id is not the same");
158 assert_eq!(
159 last.cnt + last.len as Counter,
160 change.id.counter,
161 "counter is not continuous"
162 );
163 assert_eq!(
164 last.lamport + last.len as Lamport,
165 change.lamport,
166 "lamport is not continuous"
167 );
168 let last = Arc::make_mut(&mut last.inner);
169 last.len = (change.id.counter - last.cnt) as usize + len;
170 last.has_succ = false;
171 true
172 });
173 }
174
175 if !inserted {
176 let node: AppDagNode = AppDagNodeInner {
177 vv: OnceCell::new(),
178 peer: change.id.peer,
179 cnt: change.id.counter,
180 lamport: change.lamport,
181 deps: change.deps.clone(),
182 has_succ: false,
183 len,
184 }
185 .into();
186
187 let mut map = self.map.lock().unwrap();
188 map.insert(node.id_start(), node);
189 self.handle_deps_break_points(change.deps.iter(), change.id.peer, Some(&mut map));
190 }
191 }
192
193 fn try_with_node_mut<R>(
194 &self,
195 map: &mut BTreeMap<ID, AppDagNode>,
196 id: ID,
197 f: impl FnOnce(Option<&mut AppDagNode>) -> R,
198 ) -> R {
199 let x = map.range_mut(..=id).next_back();
200 if let Some((_, node)) = x {
201 if node.contains_id(id) {
202 f(Some(node))
203 } else {
204 f(None)
205 }
206 } else {
207 f(None)
208 }
209 }
210
211 pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
213 for dep in change.deps.iter() {
214 match self.get_lamport(&dep) {
215 Some(lamport) => {
216 change.lamport = change.lamport.max(lamport + 1);
217 }
218 None => return Err(()),
219 }
220 }
221 Ok(())
222 }
223
224 pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
225 let Some(node) = self.get(id) else {
226 return Frontiers::default();
227 };
228
229 let offset = id.counter - node.cnt;
230 if offset == 0 {
231 node.deps.clone()
232 } else {
233 ID::new(id.peer, node.cnt + offset - 1).into()
234 }
235 }
236
237 pub(crate) fn with_last_mut_of_peer<R>(
238 &mut self,
239 peer: PeerID,
240 f: impl FnOnce(Option<&mut AppDagNode>) -> R,
241 ) -> R {
242 self.lazy_load_last_of_peer(peer);
243 let mut binding = self.map.lock().unwrap();
244 let last = binding
245 .range_mut(..=ID::new(peer, Counter::MAX))
246 .next_back()
247 .map(|(_, v)| v);
248 f(last)
249 }
250
251 fn update_version_on_new_change(&mut self, change: &Change, from_local: bool) {
252 if from_local {
253 assert!(self.pending_txn_node.take().is_some());
254 assert_eq!(
255 self.vv.get(&change.id.peer).copied().unwrap_or(0),
256 change.ctr_end()
257 );
258 } else {
259 let id_last = change.id_last();
260 self.frontiers
261 .update_frontiers_on_new_change(id_last, &change.deps);
262 assert!(self.pending_txn_node.is_none());
263 assert_eq!(
264 self.vv.get(&change.id.peer).copied().unwrap_or(0),
265 change.id.counter
266 );
267 self.vv.extend_to_include_last_id(id_last);
268 }
269 }
270
271 pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
272 let unparsed_vv = self.unparsed_vv.lock().unwrap();
273 if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
274 return;
275 }
276
277 let Some(nodes) = self.change_store.get_last_dag_nodes_for_peer(peer) else {
278 panic!("unparsed vv don't match with change store. Peer:{peer} is not in change store")
279 };
280
281 self.lazy_load_nodes_internal(nodes, peer, None);
282 }
283
284 fn lazy_load_nodes_internal(
285 &self,
286 nodes: Vec<AppDagNode>,
287 peer: u64,
288 map_input: Option<&mut BTreeMap<ID, AppDagNode>>,
289 ) {
290 assert!(!nodes.is_empty());
291 let mut map_guard = None;
292 let map = map_input.unwrap_or_else(|| {
293 map_guard = Some(self.map.lock().unwrap());
294 map_guard.as_mut().unwrap()
295 });
296 let new_dag_start_counter_for_the_peer = nodes[0].cnt;
297 let nodes_cnt_end = nodes.last().unwrap().ctr_end();
298 let mut unparsed_vv = self.unparsed_vv.lock().unwrap();
299 let end_counter = unparsed_vv[&peer];
300 assert!(end_counter <= nodes_cnt_end);
301 let mut deps_on_others = Vec::new();
302 let mut break_point_set = self.unhandled_dep_points.lock().unwrap();
303 for mut node in nodes {
304 if node.cnt >= end_counter {
305 break;
307 }
308
309 if node.cnt + node.len as Counter > end_counter {
310 node = node.slice(0, (end_counter - node.cnt) as usize);
311 }
313
314 for dep in node.deps.iter() {
315 if dep.peer != peer {
316 deps_on_others.push(dep);
317 }
318 }
319
320 let break_point_ends: Vec<_> = break_point_set
322 .range(node.id_start()..node.id_end())
323 .map(|id| (id.counter - node.cnt) as usize + 1)
324 .collect();
325 if break_point_ends.is_empty() {
326 map.insert(node.id_start(), node);
327 } else {
328 let mut slice_start = 0;
329 for slice_end in break_point_ends.iter().copied() {
330 let mut slice_node = node.slice(slice_start, slice_end);
331 let inner = Arc::make_mut(&mut slice_node.inner);
332 inner.has_succ = true;
333 map.insert(slice_node.id_start(), slice_node);
334 slice_start = slice_end;
335 }
336
337 let last_break_point = break_point_ends.last().copied().unwrap();
338 if last_break_point != node.len {
339 let slice_node = node.slice(last_break_point, node.len);
340 map.insert(slice_node.id_start(), slice_node);
341 }
342
343 for break_point in break_point_ends.into_iter() {
344 break_point_set.remove(&node.id_start().inc(break_point as Counter - 1));
345 }
346 }
347 }
348
349 if new_dag_start_counter_for_the_peer == 0 {
350 unparsed_vv.remove(&peer);
351 } else {
352 unparsed_vv.insert(peer, new_dag_start_counter_for_the_peer);
353 }
354 drop(unparsed_vv);
355 drop(break_point_set);
356 self.handle_deps_break_points(deps_on_others.iter().copied(), peer, Some(map));
357 }
358
359 fn handle_deps_break_points(
360 &self,
361 ids: impl IntoIterator<Item = ID>,
362 skip_peer: PeerID,
363 map: Option<&mut BTreeMap<ID, AppDagNode>>,
364 ) {
365 let mut map_guard = None;
366 let map = map.unwrap_or_else(|| {
367 map_guard = Some(self.map.lock().unwrap());
368 map_guard.as_mut().unwrap()
369 });
370 for id in ids {
371 if id.peer == skip_peer {
372 continue;
373 }
374
375 let mut handled = false;
376 let ans = self.try_with_node_mut(map, id, |target| {
377 let target = target?;
379 if target.ctr_last() == id.counter {
380 let target = Arc::make_mut(&mut target.inner);
381 handled = true;
382 target.has_succ = true;
383 None
384 } else {
385 let new_node =
390 target.slice(id.counter as usize - target.cnt as usize + 1, target.len);
391 let target = Arc::make_mut(&mut target.inner);
392 target.len -= new_node.len;
393 Some(new_node)
394 }
395 });
396
397 if let Some(new_node) = ans {
398 map.insert(new_node.id_start(), new_node);
399 } else if !handled {
400 self.unhandled_dep_points.lock().unwrap().insert(id);
401 }
402 }
403 }
404
405 fn ensure_lazy_load_node(&self, id: ID) {
406 if self.shallow_since_vv.includes_id(id) {
407 return;
408 }
409
410 loop {
411 let unparsed_end = {
414 let unparsed_vv = self.unparsed_vv.lock().unwrap();
415 unparsed_vv.get(&id.peer).copied().unwrap_or(0)
416 };
417 if unparsed_end <= id.counter {
418 return;
419 }
420
421 let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
422 let Some(nodes) = self
423 .change_store
424 .get_dag_nodes_that_contains(last_unparsed_id)
425 else {
426 panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
427 };
428
429 self.lazy_load_nodes_internal(nodes, id.peer, None);
430 }
431 }
432
433 pub fn total_parsed_dag_node(&self) -> usize {
434 self.map.lock().unwrap().len()
435 }
436
437 pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
438 assert!(self.vv.is_empty());
439 *self.unparsed_vv.lock().unwrap() = v.vv.clone();
440 self.vv = v.vv;
441 self.frontiers = v.frontiers;
442 if let Some((vv, f)) = v.start_version {
443 if !f.is_empty() {
444 assert!(f.len() == 1);
445 let id = f.as_single().unwrap();
446 let node = self.get(id).unwrap();
447 assert!(node.cnt == id.counter);
448 self.shallow_root_frontiers_deps = node.deps.clone();
449 }
450 self.shallow_since_frontiers = f;
451 self.shallow_since_vv = ImVersionVector::from_vv(&vv);
452 }
453 }
454
455 #[instrument(skip(self))]
465 pub fn check_dag_correctness(&self) {
466 {
467 let unparsed_vv = self.unparsed_vv.lock().unwrap().clone();
469 for (peer, cnt) in unparsed_vv.iter() {
470 if *cnt == 0 {
471 continue;
472 }
473
474 let mut end_cnt = *cnt;
475 let init_counter = self.shallow_since_vv.get(peer).copied().unwrap_or(0);
476 while end_cnt > init_counter {
477 let cnt = end_cnt - 1;
478 self.ensure_lazy_load_node(ID::new(*peer, cnt));
479 end_cnt = self
480 .unparsed_vv
481 .lock()
482 .unwrap()
483 .get(peer)
484 .copied()
485 .unwrap_or(0);
486 }
487 }
488
489 self.unparsed_vv.lock().unwrap().clear();
490 }
491 {
492 let map = self.map.lock().unwrap();
494 let mut last_end_id = ID::new(0, 0);
495 for (&id, node) in map.iter() {
496 let init_counter = self.shallow_since_vv.get(&id.peer).copied().unwrap_or(0);
497 if id.peer == last_end_id.peer {
498 assert!(id.counter == last_end_id.counter);
499 } else {
500 assert_eq!(id.counter, init_counter);
501 }
502
503 last_end_id = id.inc(node.len as Counter);
504 }
505 }
506 {
507 let map = self.map.lock().unwrap();
509 check_always_dep_on_last_id(&map);
510 }
511 {
512 let map = self.map.lock().unwrap();
514 'outer: for (_, node) in map.iter() {
515 let mut this_lamport = 0;
516 for dep in node.deps.iter() {
517 if self.shallow_since_vv.includes_id(dep) {
518 continue 'outer;
519 }
520
521 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
522 this_lamport = this_lamport.max(dep_node.lamport_end());
523 }
524
525 assert_eq!(this_lamport, node.lamport);
526 }
527 }
528 {
529 let map = self.map.lock().unwrap().clone();
531 'outer: for (_, node) in map.iter() {
532 let actual_vv = self.ensure_vv_for(node);
533 let mut expected_vv = ImVersionVector::default();
534 for dep in node.deps.iter() {
535 if self.shallow_since_vv.includes_id(dep) {
536 continue 'outer;
537 }
538
539 let (_, dep_node) = map.range(..=dep).next_back().unwrap();
540 self.ensure_vv_for(dep_node);
541 expected_vv.extend_to_include_vv(dep_node.vv.get().unwrap().iter());
542 expected_vv.extend_to_include_last_id(dep);
543 }
544
545 assert_eq!(actual_vv, expected_vv);
546 }
547 }
548 {
549 let mut maybe_frontiers = FxHashSet::default();
551 let map = self.map.lock().unwrap();
552 for (_, node) in map.iter() {
553 maybe_frontiers.insert(node.id_last());
554 }
555
556 for (_, node) in map.iter() {
557 for dep in node.deps.iter() {
558 maybe_frontiers.remove(&dep);
559 }
560 }
561
562 let frontiers = self.frontiers.iter().collect::<FxHashSet<_>>();
563 assert_eq!(maybe_frontiers, frontiers);
564 }
565 }
566
567 pub(crate) fn can_export_shallow_snapshot_on(&self, deps: &Frontiers) -> bool {
568 for id in deps.iter() {
569 if !self.vv.includes_id(id) {
570 return false;
571 }
572 }
573
574 if self.is_before_shallow_root(deps) {
575 return false;
576 }
577
578 true
579 }
580
581 pub(crate) fn is_before_shallow_root(&self, deps: &Frontiers) -> bool {
582 if self.shallow_since_vv.is_empty() {
587 return false;
588 }
589
590 if deps.is_empty() {
591 return true;
592 }
593
594 if deps.iter().any(|x| self.shallow_since_vv.includes_id(x)) {
595 return true;
596 }
597
598 if deps
599 .iter()
600 .any(|x| self.shallow_since_frontiers.contains(&x))
601 {
602 return deps != &self.shallow_since_frontiers;
603 }
604
605 false
606 }
607
608 pub(crate) fn travel_ancestors(
612 &self,
613 id: ID,
614 f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
615 ) {
616 struct PendingNode(AppDagNode);
617 impl PartialEq for PendingNode {
618 fn eq(&self, other: &Self) -> bool {
619 self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
620 }
621 }
622 impl Eq for PendingNode {}
623 impl PartialOrd for PendingNode {
624 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
625 Some(self.cmp(other))
626 }
627 }
628 impl Ord for PendingNode {
629 fn cmp(&self, other: &Self) -> Ordering {
630 self.0
631 .lamport_last()
632 .cmp(&other.0.lamport_last())
633 .then_with(|| self.0.peer.cmp(&other.0.peer))
634 }
635 }
636
637 let mut visited = FxHashSet::default();
638 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
639 pending.push(PendingNode(self.get(id).unwrap()));
640 while let Some(PendingNode(node)) = pending.pop() {
641 if f(&node).is_break() {
642 break;
643 }
644
645 for dep in node.deps.iter() {
646 let Some(dep_node) = self.get(dep) else {
647 continue;
648 };
649 if visited.contains(&dep_node.id_start()) {
650 continue;
651 }
652
653 visited.insert(dep_node.id_start());
654 pending.push(PendingNode(dep_node));
655 }
656 }
657 }
658
659 pub(crate) fn update_version_on_new_local_op(
660 &mut self,
661 deps: &Frontiers,
662 start_id: ID,
663 start_lamport: Lamport,
664 len: usize,
665 ) {
666 let last_id = start_id.inc(len as Counter - 1);
667 self.vv.set_last(last_id);
669 self.frontiers.update_frontiers_on_new_change(last_id, deps);
670 match &mut self.pending_txn_node {
671 Some(node) => {
672 assert!(
673 node.peer == start_id.peer
674 && node.cnt + node.len as Counter == start_id.counter
675 && deps.len() == 1
676 && deps.as_single().unwrap().peer == start_id.peer
677 );
678 let inner = Arc::make_mut(&mut node.inner);
679 inner.len += len;
680 }
681 None => {
682 let node = AppDagNode {
683 inner: Arc::new(AppDagNodeInner {
684 peer: start_id.peer,
685 cnt: start_id.counter,
686 lamport: start_lamport,
687 deps: deps.clone(),
688 vv: OnceCell::new(),
689 has_succ: false,
690 len,
691 }),
692 };
693 self.pending_txn_node = Some(node);
694 }
695 }
696 }
697
698 pub(crate) fn latest_vv_contains_peer(&self, peer: PeerID) -> bool {
699 self.vv.contains_key(&peer) && *self.vv.get(&peer).unwrap() > 0
700 }
701}
702
703fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
704 for (_, node) in map.iter() {
705 for dep in node.deps.iter() {
706 let Some((&dep_id, dep_node)) = map.range(..=dep).next_back() else {
707 continue;
709 };
710 assert_eq!(dep_node.id_start(), dep_id);
711 if dep_node.contains_id(dep) {
712 assert_eq!(dep_node.id_last(), dep);
713 }
714 }
715 }
716}
717
718impl HasIndex for AppDagNode {
719 type Int = Counter;
720 fn get_start_index(&self) -> Self::Int {
721 self.cnt
722 }
723
724 fn get_end_index(&self) -> Self::Int {
725 self.cnt + self.len as Counter
726 }
727}
728
729impl Sliceable for AppDagNode {
730 fn slice(&self, from: usize, to: usize) -> Self {
731 AppDagNodeInner {
732 peer: self.peer,
733 cnt: self.cnt + from as Counter,
734 lamport: self.lamport + from as Lamport,
735 deps: if from > 0 {
736 Frontiers::from_id(self.id_start().inc(from as Counter - 1))
737 } else {
738 self.deps.clone()
739 },
740 vv: if let Some(vv) = self.vv.get() {
741 let mut new = vv.clone();
742 new.insert(self.peer, self.cnt + from as Counter);
743 OnceCell::with_value(new)
744 } else {
745 OnceCell::new()
746 },
747 has_succ: if to == self.len { self.has_succ } else { true },
748 len: to - from,
749 }
750 .into()
751 }
752}
753
754impl HasId for AppDagNode {
755 fn id_start(&self) -> ID {
756 ID {
757 peer: self.peer,
758 counter: self.cnt,
759 }
760 }
761}
762
763impl HasCounter for AppDagNode {
764 fn ctr_start(&self) -> Counter {
765 self.cnt
766 }
767}
768
769impl HasLength for AppDagNode {
770 fn atom_len(&self) -> usize {
771 self.len
772 }
773
774 fn content_len(&self) -> usize {
775 self.len
776 }
777}
778
779impl Mergable for AppDagNode {
780 fn is_mergable(&self, other: &Self, _conf: &()) -> bool
781 where
782 Self: Sized,
783 {
784 !self.has_succ
785 && self.peer == other.peer
786 && self.cnt + self.len as Counter == other.cnt
787 && other.deps.len() == 1
788 && self.lamport + self.len as Lamport == other.lamport
789 && other.deps.as_single().unwrap().peer == self.peer
790 }
791
792 fn merge(&mut self, other: &Self, _conf: &())
793 where
794 Self: Sized,
795 {
796 assert_eq!(
797 other.deps.as_single().unwrap().counter,
798 self.cnt + self.len as Counter - 1
799 );
800 let this = Arc::make_mut(&mut self.inner);
801 this.len += other.len;
802 this.has_succ = other.has_succ;
803 }
804}
805
806impl HasLamport for AppDagNode {
807 fn lamport(&self) -> Lamport {
808 self.lamport
809 }
810}
811
812impl DagNode for AppDagNode {
813 fn deps(&self) -> &Frontiers {
814 &self.deps
815 }
816}
817
818impl Dag for AppDag {
819 type Node = AppDagNode;
820
821 fn frontier(&self) -> &Frontiers {
822 &self.frontiers
823 }
824
825 fn get(&self, id: ID) -> Option<Self::Node> {
826 self.ensure_lazy_load_node(id);
827 let binding = self.map.lock().unwrap();
828 if let Some(x) = binding.range(..=id).next_back() {
829 if x.1.contains_id(id) {
830 return Some(x.1.clone());
833 }
834 }
835
836 if let Some(node) = &self.pending_txn_node {
837 if node.peer == id.peer && node.cnt <= id.counter {
838 assert!(node.cnt + node.len as Counter > id.counter);
839 return Some(node.clone());
840 }
841 }
842
843 None
844 }
845
846 fn vv(&self) -> &VersionVector {
847 &self.vv
848 }
849
850 fn contains(&self, id: ID) -> bool {
851 self.vv.includes_id(id)
852 }
853}
854
855impl AppDag {
856 pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
860 self.get(id).map(|x| {
861 let mut vv = self.ensure_vv_for(&x);
862 vv.insert(id.peer, id.counter + 1);
863 vv
864 })
865 }
866
867 pub(crate) fn ensure_vv_for(&self, target_node: &AppDagNode) -> ImVersionVector {
868 if target_node.vv.get().is_none() {
869 let mut stack: SmallVec<[AppDagNode; 4]> = smallvec::smallvec![target_node.clone()];
871 while let Some(top_node) = stack.pop() {
872 let mut ans_vv = ImVersionVector::default();
873 if top_node.deps == self.shallow_root_frontiers_deps {
880 for (&p, &c) in self.shallow_since_vv.iter() {
881 ans_vv.insert(p, c);
882 }
883 } else {
884 let mut all_deps_processed = true;
885 for id in top_node.deps.iter() {
886 let node = self.get(id).expect("deps should be in the dag");
887 if node.vv.get().is_none() {
888 if all_deps_processed {
890 stack.push(top_node.clone());
891 }
892 all_deps_processed = false;
893 stack.push(node);
894 continue;
895 };
896 }
897
898 if !all_deps_processed {
899 continue;
900 }
901
902 for id in top_node.deps.iter() {
903 let node = self.get(id).expect("deps should be in the dag");
904 let dep_vv = node.vv.get().unwrap();
905 if ans_vv.is_empty() {
906 ans_vv = dep_vv.clone();
907 } else {
908 ans_vv.extend_to_include_vv(dep_vv.iter());
909 }
910
911 ans_vv.insert(node.peer, node.ctr_end());
912 }
913 }
914
915 top_node.vv.set(ans_vv.clone()).unwrap();
917 }
918 }
919
920 target_node.vv.get().unwrap().clone()
921 }
922
923 pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
926 if a.peer == b.peer {
927 return Some(a.counter.cmp(&b.counter));
928 }
929
930 let a = self.get_vv(a).unwrap();
931 let b = self.get_vv(b).unwrap();
932 a.partial_cmp(&b)
933 }
934
935 pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
936 self.get(*id).and_then(|node| {
937 assert!(id.counter >= node.cnt);
938 if node.cnt + node.len as Counter > id.counter {
939 Some(node.lamport + (id.counter - node.cnt) as Lamport)
940 } else {
941 None
942 }
943 })
944 }
945
946 pub fn get_change_lamport_from_deps(&self, deps: &Frontiers) -> Option<Lamport> {
947 let mut lamport = 0;
948 for id in deps.iter() {
949 let x = self.get_lamport(&id)?;
950 lamport = lamport.max(x + 1);
951 }
952
953 Some(lamport)
954 }
955
956 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
960 if frontiers == &self.shallow_root_frontiers_deps {
961 let vv = VersionVector::from_im_vv(&self.shallow_since_vv);
962 return Some(vv);
963 }
964
965 let mut vv: VersionVector = Default::default();
966 for id in frontiers.iter() {
967 let x = self.get(id)?;
968 let target_vv = self.ensure_vv_for(&x);
969 vv.extend_to_include_vv(target_vv.iter());
970 vv.extend_to_include_last_id(id);
971 }
972
973 Some(vv)
974 }
975
976 #[allow(unused)]
977 pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
978 if frontiers.is_empty() {
979 return Default::default();
980 }
981
982 let mut iter = frontiers.iter();
983 let mut vv = {
984 let id = iter.next().unwrap();
985 let Some(x) = self.get(id) else {
986 unreachable!()
987 };
988 let mut vv = self.ensure_vv_for(&x);
989 vv.extend_to_include_last_id(id);
990 vv
991 };
992
993 for id in iter {
994 let Some(x) = self.get(id) else {
995 unreachable!()
996 };
997 let x = self.ensure_vv_for(&x);
998 vv.extend_to_include_vv(x.iter());
999 vv.extend_to_include_last_id(id);
1000 }
1001
1002 vv
1003 }
1004
1005 pub fn im_vv_to_frontiers(&self, vv: &ImVersionVector) -> Frontiers {
1006 if vv.is_empty() {
1007 return Default::default();
1008 }
1009
1010 let this = vv;
1011 let last_ids: Frontiers = this
1012 .iter()
1013 .filter_map(|(client_id, cnt)| {
1014 if *cnt == 0 {
1015 return None;
1016 }
1017
1018 if self
1019 .shallow_since_vv
1020 .includes_id(ID::new(*client_id, *cnt - 1))
1021 {
1022 return None;
1023 }
1024
1025 Some(ID::new(*client_id, cnt - 1))
1026 })
1027 .collect();
1028
1029 if last_ids.is_empty() {
1030 return self.shallow_since_frontiers.clone();
1031 }
1032
1033 shrink_frontiers(&last_ids, self).unwrap()
1034 }
1035
1036 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1037 if vv.is_empty() {
1038 return Default::default();
1039 }
1040
1041 let this = vv;
1042 let last_ids: Frontiers = this
1043 .iter()
1044 .filter_map(|(client_id, cnt)| {
1045 if *cnt == 0 {
1046 return None;
1047 }
1048
1049 if self
1050 .shallow_since_vv
1051 .includes_id(ID::new(*client_id, *cnt - 1))
1052 {
1053 return None;
1054 }
1055
1056 Some(ID::new(*client_id, cnt - 1))
1057 })
1058 .collect();
1059
1060 if last_ids.is_empty() {
1061 return self.shallow_since_frontiers.clone();
1062 }
1063
1064 shrink_frontiers(&last_ids, self).unwrap()
1065 }
1066
1067 pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
1068 if frontiers.is_empty() {
1069 return 0;
1070 }
1071
1072 let mut iter = frontiers.iter();
1073 let mut lamport = {
1074 let id = iter.next().unwrap();
1075 let Some(x) = self.get(id) else {
1076 unreachable!()
1077 };
1078 assert!(id.counter >= x.cnt);
1079 (id.counter - x.cnt) as Lamport + x.lamport + 1
1080 };
1081
1082 for id in iter {
1083 let Some(x) = self.get(id) else {
1084 unreachable!()
1085 };
1086 assert!(id.counter >= x.cnt);
1087 lamport = lamport.max((id.counter - x.cnt) as Lamport + x.lamport + 1);
1088 }
1089
1090 lamport
1091 }
1092
1093 pub fn get_frontiers(&self) -> &Frontiers {
1094 &self.frontiers
1095 }
1096
1097 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1101 if &self.frontiers == other {
1102 Ordering::Equal
1103 } else if other.iter().all(|id| self.vv.includes_id(id)) {
1104 Ordering::Greater
1105 } else {
1106 Ordering::Less
1107 }
1108 }
1109
1110 pub fn cmp_frontiers(
1115 &self,
1116 a: &Frontiers,
1117 b: &Frontiers,
1118 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1119 let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
1120 let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
1121 Ok(a.partial_cmp(&b))
1122 }
1123}
1124
1125#[derive(Debug, PartialEq, Eq)]
1126pub struct FrontiersNotIncluded;
1127impl Display for FrontiersNotIncluded {
1128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129 f.write_str("The given Frontiers are not included by the doc")
1130 }
1131}