1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::num::NonZeroU32;
5use std::ops::Add;
6
7use hexane::{ColumnCursor, ColumnData, DeltaCursor, StrCursor, UIntCursor};
8
9use crate::storage::BundleMetadata;
10use crate::{
11 clock::{Clock, SeqClock},
12 columnar::column_range::{DepsRange, ValueRange},
13 error::AutomergeError,
14 op_set2::{change::BuildChangeMetadata, ActorCursor, ActorIdx, MetaCursor, ValueMeta},
15 storage::{Columns, DocChangeColumns},
16 types::OpId,
17 Change, ChangeHash,
18};
19
20#[derive(Debug, PartialEq, Default, Clone)]
27pub(crate) struct ChangeGraph {
28 edges: Vec<Edge>,
29 hashes: Vec<ChangeHash>,
30 actors: Vec<ActorIdx>,
31 parents: Vec<Option<EdgeIdx>>,
32 seq: Vec<u32>,
33 max_ops: Vec<u32>,
34 num_ops: ColumnData<UIntCursor>,
35 timestamps: ColumnData<DeltaCursor>,
36 messages: ColumnData<StrCursor>,
37 extra_bytes_meta: ColumnData<MetaCursor>,
38 extra_bytes_raw: Vec<u8>,
39 heads: BTreeSet<ChangeHash>,
40 nodes_by_hash: HashMap<ChangeHash, NodeIdx>,
41 clock_cache: HashMap<NodeIdx, SeqClock>,
42 seq_index: Vec<Vec<NodeIdx>>,
43}
44
45const CACHE_STEP: u32 = 16;
46
47#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
48struct NodeIdx(u32);
49
50impl Add<usize> for NodeIdx {
51 type Output = Self;
52
53 fn add(self, other: usize) -> Self {
54 NodeIdx(self.0 + other as u32)
55 }
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
59struct EdgeIdx(NonZeroU32);
60
61impl EdgeIdx {
62 fn new(value: usize) -> Self {
63 EdgeIdx(NonZeroU32::new(value as u32 + 1).unwrap())
64 }
65 fn get(&self) -> usize {
66 self.0.get() as usize - 1
67 }
68}
69
70#[derive(PartialEq, Debug, Clone)]
71struct Edge {
72 target: NodeIdx,
75 next: Option<EdgeIdx>,
76}
77
78impl ChangeGraph {
79 pub(crate) fn new(num_actors: usize) -> Self {
80 Self {
81 edges: Vec::new(),
82 nodes_by_hash: HashMap::new(),
83 hashes: Vec::new(),
84 actors: Vec::new(),
85 max_ops: Vec::new(),
86 num_ops: ColumnData::new(),
87 seq: Vec::new(),
88 parents: Vec::new(),
89 messages: ColumnData::new(),
90 timestamps: ColumnData::new(),
91 extra_bytes_meta: ColumnData::new(),
92 extra_bytes_raw: Vec::new(),
93 heads: BTreeSet::new(),
94 clock_cache: HashMap::new(),
95 seq_index: vec![vec![]; num_actors],
96 }
97 }
98
99 pub(crate) fn with_capacity(changes: usize, deps: usize, num_actors: usize) -> Self {
100 Self {
101 edges: Vec::with_capacity(deps),
102 nodes_by_hash: HashMap::new(),
103 hashes: Vec::with_capacity(changes),
104 actors: Vec::with_capacity(changes),
105 max_ops: Vec::with_capacity(changes),
106 num_ops: ColumnData::new(),
107 seq: Vec::with_capacity(changes),
108 parents: Vec::with_capacity(changes),
109 messages: ColumnData::new(),
110 timestamps: ColumnData::new(),
111 extra_bytes_meta: ColumnData::new(),
112 extra_bytes_raw: Vec::new(),
113 heads: BTreeSet::new(),
114 clock_cache: HashMap::new(),
115 seq_index: vec![vec![]; num_actors],
116 }
117 }
118
119 pub(crate) fn all_actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
120 self.seq_index.iter().enumerate().map(|(i, _)| i)
121 }
122
123 pub(crate) fn actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
124 self.seq_index
125 .iter()
126 .enumerate()
127 .filter_map(|(i, v)| if !v.is_empty() { Some(i) } else { None })
128 }
129
130 pub(crate) fn unused_actors(&self) -> impl Iterator<Item = usize> + '_ {
131 self.seq_index
132 .iter()
133 .enumerate()
134 .filter_map(|(i, v)| if v.is_empty() { Some(i) } else { None })
135 }
136
137 pub(crate) fn heads(&self) -> impl Iterator<Item = ChangeHash> + '_ {
138 self.heads.iter().cloned()
139 }
140
141 pub(crate) fn head_indexes(&self) -> impl Iterator<Item = u64> + '_ {
142 self.heads
143 .iter()
144 .map(|h| self.nodes_by_hash.get(h).unwrap().0 as u64)
145 }
146
147 pub(crate) fn num_actors(&self) -> usize {
148 self.seq_index.len()
149 }
150
151 pub(crate) fn insert_actor(&mut self, idx: usize) {
152 if self.seq_index.len() != idx {
153 for actor_index in &mut self.actors {
154 if actor_index.0 >= idx as u32 {
155 actor_index.0 += 1;
156 }
157 }
158 }
159 for clock in self.clock_cache.values_mut() {
160 clock.rewrite_with_new_actor(idx)
161 }
162 self.seq_index.insert(idx, vec![]);
163 }
164
165 pub(crate) fn remove_actor(&mut self, idx: usize) {
166 for actor_index in &mut self.actors {
167 if actor_index.0 > idx as u32 {
168 actor_index.0 -= 1;
169 }
170 }
171 if self.seq_index.get(idx).is_some() {
172 assert!(self.seq_index[idx].is_empty());
173 self.seq_index.remove(idx);
174 }
175 for clock in &mut self.clock_cache.values_mut() {
176 clock.remove_actor(idx)
177 }
178 }
179
180 pub(crate) fn len(&self) -> usize {
181 self.hashes.len()
182 }
183
184 pub(crate) fn is_empty(&self) -> bool {
185 self.hashes.is_empty()
186 }
187
188 pub(crate) fn hash_to_index(&self, hash: &ChangeHash) -> Option<usize> {
189 self.nodes_by_hash.get(hash).map(|n| n.0 as usize)
190 }
191
192 pub(crate) fn index_to_hash(&self, index: usize) -> Option<&ChangeHash> {
193 self.hashes.get(index)
194 }
195
196 pub(crate) fn max_op_for_actor(&mut self, actor_index: usize) -> u64 {
197 self.seq_index
198 .get(actor_index)
199 .and_then(|s| s.last())
200 .and_then(|index| self.max_ops.get(index.0 as usize).cloned())
201 .unwrap_or(0) as u64
202 }
203
204 pub(crate) fn seq_for_actor(&self, actor: usize) -> u64 {
205 self.seq_index
206 .get(actor)
207 .map(|v| v.len() as u64)
208 .unwrap_or(0)
209 }
210
211 fn deps_iter(&self) -> impl Iterator<Item = NodeIdx> + '_ {
212 self.node_ids().flat_map(|n| self.parents(n))
213 }
214
215 fn num_deps(&self) -> impl Iterator<Item = usize> + '_ {
216 self.node_ids().map(|n| self.parents(n).count())
217 }
218
219 fn node_ids(&self) -> impl Iterator<Item = NodeIdx> {
220 let end = self.hashes.len() as u32;
221 (0..end).map(NodeIdx)
222 }
223
224 pub(crate) fn encode(&self, out: &mut Vec<u8>) -> DocChangeColumns {
225 let actor_iter = self.actors.iter().map(as_actor);
226 let actor = ActorCursor::encode(out, actor_iter, false).into();
227
228 let seq_iter = self.seq.iter().map(as_seq);
229 let seq = DeltaCursor::encode(out, seq_iter, false).into();
230
231 let max_op_iter = self.max_ops.iter().map(as_max_op);
232 let max_op = DeltaCursor::encode(out, max_op_iter, false).into();
233
234 let time = self.timestamps.save_to_unless_empty(out).into();
235
236 let message = self.messages.save_to_unless_empty(out).into();
237
238 let num_deps_iter = self.num_deps().map(as_num_deps);
239 let num_deps = UIntCursor::encode(out, num_deps_iter, false).into();
240
241 let deps_iter = self.deps_iter().map(as_deps);
242 let deps = DeltaCursor::encode(out, deps_iter, false).into();
243
244 let meta = self.extra_bytes_meta.save_to_unless_empty(out).into();
246 let raw = (out.len()..out.len() + self.extra_bytes_raw.len()).into();
247 out.extend(&self.extra_bytes_raw);
248
249 DocChangeColumns {
250 actor,
251 seq,
252 max_op,
253 time,
254 message,
255 deps: DepsRange::new(num_deps, deps),
256 extra: ValueRange::new(meta, raw),
257 other: Columns::empty(),
258 }
259 }
260
261 pub(crate) fn opid_to_hash(&self, id: OpId) -> Option<ChangeHash> {
262 let actor_indices = self.seq_index.get(id.actor())?;
263 let counter = id.counter();
264 let index = actor_indices
265 .binary_search_by(|n| {
266 let i = n.0 as usize;
267 let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
268 let max_op = self.max_ops[i];
269 let start = max_op as u64 - num_ops + 1;
270 if counter < start {
271 Ordering::Greater
272 } else if (max_op as u64) < counter {
273 Ordering::Less
274 } else {
275 Ordering::Equal
276 }
277 })
278 .ok()?;
279 let node_idx = actor_indices[index];
280 self.hashes.get(node_idx.0 as usize).cloned()
281 }
282
283 pub(crate) fn deps_for_hash(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
284 let node_idx = self.nodes_by_hash.get(hash);
285 let mut edge_idx = node_idx.and_then(|n| self.parents[n.0 as usize]);
286 std::iter::from_fn(move || {
287 let this_edge_idx = edge_idx?;
288 let edge = &self.edges[this_edge_idx.get()];
289 edge_idx = edge.next;
290 let hash = self.hashes[edge.target.0 as usize];
291 Some(hash)
292 })
293 }
294
295 pub(crate) fn has_change(&self, hash: &ChangeHash) -> bool {
296 self.nodes_by_hash.contains_key(hash)
297 }
298
299 pub(crate) fn get_bundle_metadata<I>(
300 &self,
301 hashes: I,
302 ) -> impl Iterator<Item = Result<BundleMetadata<'_>, MissingDep>>
303 where
304 I: IntoIterator<Item = ChangeHash>,
305 {
306 hashes.into_iter().map(|hash| {
307 let index = self
308 .nodes_by_hash
309 .get(&hash)
310 .cloned()
311 .ok_or(MissingDep(hash))?;
312 let i = index.0 as usize;
313 let actor = self.actors[i].into();
314 let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
315 let max_op = self.max_ops[i] as u64;
316 let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
317 let message = self.messages.get(i).flatten();
318
319 let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
321 let meta_range =
322 meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
323 let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
324
325 let deps = self
326 .parents(index)
327 .map(|p| self.hashes[p.0 as usize])
328 .collect::<Vec<_>>();
329
330 let start_op = max_op - num_ops + 1;
332 let seq = self.seq[i] as u64;
333 Ok(BundleMetadata {
334 hash,
335 actor,
336 seq,
337 start_op,
338 max_op,
339 timestamp,
340 message,
341 extra,
342 deps,
343 builder: i,
344 })
345 })
346 }
347
348 pub(crate) fn get_build_metadata<I>(
349 &self,
350 hashes: I,
351 ) -> Result<(Vec<BuildChangeMetadata<'_>>, usize), MissingDep>
352 where
353 I: IntoIterator<Item = ChangeHash>,
354 {
355 let indexes: Vec<_> = hashes
356 .into_iter()
357 .map(|hash| {
358 self.nodes_by_hash
359 .get(&hash)
360 .cloned()
361 .ok_or(MissingDep(hash))
362 })
363 .collect::<Result<_, _>>()?;
364
365 Ok(self.get_build_metadata_for_indexes(indexes))
366 }
367
368 fn get_build_metadata_for_indexes<I>(&self, indexes: I) -> (Vec<BuildChangeMetadata<'_>>, usize)
369 where
370 I: IntoIterator<Item = NodeIdx>,
371 {
372 let mut num_deps = 0;
373 let changes = indexes
374 .into_iter()
375 .map(|index| {
376 let i = index.0 as usize;
377 let actor = self.actors[i].into();
378 let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
379 let max_op = self.max_ops[i] as u64;
380 let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
381 let message = self.messages.get(i).flatten();
382
383 let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
385 let meta_range =
386 meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
387 let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
388
389 let deps = self.parents(index).map(|p| p.0 as u64).collect::<Vec<_>>();
390 num_deps += deps.len();
391 let start_op = max_op - num_ops + 1;
392 let seq = self.seq[i] as u64;
393 BuildChangeMetadata {
394 actor,
395 seq,
396 start_op,
397 max_op,
398 timestamp,
399 message,
400 extra,
401 deps,
402 builder: i,
403 }
404 })
405 .collect();
406 (changes, num_deps)
407 }
408
409 fn get_build_indexes(&self, clock: SeqClock) -> Vec<NodeIdx> {
410 let mut change_indexes: Vec<NodeIdx> = Vec::new();
411 for (actor_index, actor_changes) in self.seq_index.iter().enumerate() {
413 if let Some(seq) = clock.get_for_actor(&actor_index) {
414 change_indexes.extend(&actor_changes[seq.get() as usize..]);
417 } else {
418 change_indexes.extend(&actor_changes[..]);
419 }
420 }
421
422 change_indexes.sort_unstable();
424
425 change_indexes
426 }
427
428 #[inline(never)]
429 pub(crate) fn get_hashes(&self, have_deps: &[ChangeHash]) -> Vec<ChangeHash> {
430 let clock = self.seq_clock_for_heads(have_deps);
431 self.get_build_indexes(clock)
432 .into_iter()
433 .filter_map(|node| self.hashes.get(node.0 as usize))
434 .copied()
435 .collect()
436 }
437
438 pub(crate) fn get_build_metadata_clock(
439 &self,
440 have_deps: &[ChangeHash],
441 ) -> (Vec<BuildChangeMetadata<'_>>, usize) {
442 let clock = self.seq_clock_for_heads(have_deps);
443 let change_indexes = self.get_build_indexes(clock);
444 self.get_build_metadata_for_indexes(change_indexes)
445 }
446
447 pub(crate) fn get_hash_for_actor_seq(
448 &self,
449 actor: usize,
450 seq: u64,
451 ) -> Result<ChangeHash, AutomergeError> {
452 self.seq_index
453 .get(actor)
454 .and_then(|v| v.get(seq as usize - 1))
455 .and_then(|i| self.hashes.get(i.0 as usize))
456 .ok_or(AutomergeError::InvalidSeq(seq))
457 .copied()
458 }
459
460 fn update_heads(&mut self, change: &Change) {
461 for d in change.deps() {
462 self.heads.remove(d);
463 }
464 self.heads.insert(change.hash());
465 }
466
467 pub(crate) fn from_iter<
468 'a,
469 I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
470 >(
471 iter: I,
472 deps: usize,
473 num_actors: usize,
474 ) -> Result<Self, MissingDep> {
475 let mut seen = HashSet::new();
476 for (change, _) in iter.clone() {
477 for h in change.deps().iter() {
478 if !seen.contains(h) {
479 return Err(MissingDep(*h));
480 }
481 }
482 seen.insert(change.hash());
483 }
484
485 let mut graph = ChangeGraph::with_capacity(iter.len(), deps, num_actors);
486 graph.add_changes(iter)?;
487 Ok(graph)
488 }
489
490 pub(crate) fn add_nodes<
491 'a,
492 I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
493 >(
494 &mut self,
495 iter: I,
496 ) {
497 self.actors
498 .extend(iter.clone().map(|(_, a)| ActorIdx::from(a)));
499 self.seq.extend(iter.clone().map(|(c, _)| c.seq() as u32));
500 self.max_ops
501 .extend(iter.clone().map(|(c, _)| c.max_op() as u32));
502 self.num_ops
503 .extend(iter.clone().map(|(c, _)| c.len() as u64));
504 self.timestamps
505 .extend(iter.clone().map(|(c, _)| c.timestamp()));
506 self.messages
507 .extend(iter.clone().map(|(c, _)| c.message().cloned()));
508 self.extra_bytes_meta
509 .extend(iter.clone().map(|(c, _)| ValueMeta::from(c.extra_bytes())));
510 self.parents
511 .extend(std::iter::repeat(None).take(iter.len()));
512 for (c, _) in iter {
513 self.extra_bytes_raw.extend_from_slice(c.extra_bytes());
514 }
515 }
516
517 fn add_changes<'a, I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone>(
518 &mut self,
519 iter: I,
520 ) -> Result<(), MissingDep> {
521 let node = NodeIdx(self.hashes.len() as u32);
522
523 self.add_nodes(iter.clone());
524
525 for (i, (change, actor)) in iter.enumerate() {
526 let node_idx = node + i;
527 let hash = change.hash();
528 self.hashes.push(hash);
529 debug_assert!(!self.nodes_by_hash.contains_key(&hash));
530 self.nodes_by_hash.insert(hash, node_idx);
531 self.update_heads(change);
532
533 assert!(actor < self.seq_index.len());
534 assert_eq!(self.seq_index[actor].len() + 1, change.seq() as usize);
535 self.seq_index[actor].push(node_idx);
536
537 for parent_hash in change.deps().iter() {
538 self.add_parent(node_idx, parent_hash);
539 }
540
541 if (node_idx + 1).0 % CACHE_STEP == 0 {
542 self.cache_clock(node_idx);
543 }
544 }
545 Ok(())
546 }
547
548 pub(crate) fn add_change(&mut self, change: &Change, actor: usize) -> Result<(), MissingDep> {
549 let hash = change.hash();
550
551 if self.nodes_by_hash.contains_key(&hash) {
552 return Ok(());
553 }
554
555 for h in change.deps().iter() {
556 if !self.nodes_by_hash.contains_key(h) {
557 return Err(MissingDep(*h));
558 }
559 }
560
561 self.add_changes([(change, actor)].into_iter())
562 }
563
564 fn cache_clock(&mut self, node_idx: NodeIdx) -> SeqClock {
565 let mut clock = SeqClock::new(self.num_actors());
566 let mut to_visit = BTreeSet::from([node_idx]);
567
568 self.calculate_clock_inner(&mut clock, &mut to_visit, CACHE_STEP as usize * 2);
569
570 for n in to_visit {
571 let sub = self.cache_clock(n);
572 SeqClock::merge(&mut clock, &sub);
573 }
574
575 self.clock_cache.insert(node_idx, clock.clone());
576
577 clock
578 }
579
580 fn add_parent(&mut self, child_idx: NodeIdx, parent_hash: &ChangeHash) {
581 debug_assert!(self.nodes_by_hash.contains_key(parent_hash));
582 let parent_idx = *self.nodes_by_hash.get(parent_hash).unwrap();
583 let new_edge_idx = EdgeIdx::new(self.edges.len());
584 self.edges.push(Edge {
585 target: parent_idx,
586 next: None,
587 });
588
589 let child = &mut self.parents[child_idx.0 as usize];
590 if let Some(edge_idx) = child {
591 let mut edge = &mut self.edges[edge_idx.get()];
592 while let Some(next) = edge.next {
593 edge = &mut self.edges[next.get()];
594 }
595 edge.next = Some(new_edge_idx);
596 } else {
597 *child = Some(new_edge_idx);
598 }
599 }
600
601 pub(crate) fn deps(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
602 let mut iter = self.nodes_by_hash.get(hash).map(|node| self.parents(*node));
603 std::iter::from_fn(move || {
604 let next = iter.as_mut()?.next()?;
605 self.hashes.get(next.0 as usize).copied()
606 })
607 }
608
609 fn parents(&self, node_idx: NodeIdx) -> impl Iterator<Item = NodeIdx> + '_ {
610 let mut edge_idx = self.parents[node_idx.0 as usize];
611 std::iter::from_fn(move || {
612 let this_edge_idx = edge_idx?;
613 let edge = &self.edges[this_edge_idx.get()];
614 edge_idx = edge.next;
615 Some(edge.target)
616 })
617 }
618
619 fn heads_to_nodes(&self, heads: &[ChangeHash]) -> Vec<NodeIdx> {
620 heads
621 .iter()
622 .filter_map(|h| self.nodes_by_hash.get(h))
623 .copied()
624 .collect()
625 }
626
627 pub(crate) fn clock_for_heads(&self, heads: &[ChangeHash]) -> Clock {
628 let nodes = self.heads_to_nodes(heads);
629 self.calculate_clock(nodes)
630 .iter()
631 .map(|(actor, seq)| {
632 self.seq_index
633 .get(actor)
634 .and_then(|v| v.get(seq?.get() as usize - 1))
635 .and_then(|i| self.max_ops.get(i.0 as usize))
636 .copied()
637 })
638 .collect()
639 }
640
641 pub(crate) fn seq_clock_for_heads(&self, heads: &[ChangeHash]) -> SeqClock {
642 let nodes = self.heads_to_nodes(heads);
643 self.calculate_clock(nodes)
644 }
645
646 fn clock_data_for(&self, idx: NodeIdx) -> Option<u32> {
647 Some(*self.seq.get(idx.0 as usize)?)
648 }
649
650 fn calculate_clock(&self, nodes: Vec<NodeIdx>) -> SeqClock {
651 let mut clock = SeqClock::new(self.num_actors());
652 let mut to_visit = nodes.into_iter().collect::<BTreeSet<_>>();
653
654 self.calculate_clock_inner(&mut clock, &mut to_visit, usize::MAX);
655
656 assert!(to_visit.is_empty());
657
658 clock
659 }
660
661 fn calculate_clock_inner(
662 &self,
663 clock: &mut SeqClock,
664 to_visit: &mut BTreeSet<NodeIdx>,
665 limit: usize,
666 ) {
667 let mut visited = BTreeSet::new();
668
669 while let Some(idx) = to_visit.pop_last() {
670 assert!(!visited.contains(&idx));
671 assert!(visited.len() <= self.hashes.len());
672 visited.insert(idx);
673
674 let actor = self.actors[idx.0 as usize];
675 let data = self.clock_data_for(idx);
676 clock.include(actor.into(), data);
677
678 if let Some(cached) = self.clock_cache.get(&idx) {
679 SeqClock::merge(clock, cached);
680 } else if visited.len() <= limit {
681 to_visit.extend(self.parents(idx).filter(|p| !visited.contains(p)));
682 } else {
683 break;
684 }
685 }
686 }
687
688 pub(crate) fn remove_ancestors(
689 &self,
690 changes: &mut BTreeSet<ChangeHash>,
691 heads: &[ChangeHash],
692 ) {
693 let nodes = self.heads_to_nodes(heads);
694 self.traverse_ancestors(nodes, |idx| {
695 let hash = &self.hashes[idx.0 as usize];
696 changes.remove(hash);
697 true
698 });
699 }
700
701 fn traverse_ancestors<F: FnMut(NodeIdx) -> bool>(&self, mut to_visit: Vec<NodeIdx>, mut f: F) {
702 let mut visited = BTreeSet::new();
703
704 while let Some(idx) = to_visit.pop() {
705 if visited.contains(&idx) {
706 continue;
707 } else {
708 visited.insert(idx);
709 }
710 if f(idx) {
711 to_visit.extend(self.parents(idx));
712 }
713 }
714 }
715}
716
717fn as_num_deps(num: usize) -> Option<Cow<'static, u64>> {
718 Some(Cow::Owned(num as u64))
719}
720
721fn as_seq(seq: &u32) -> Option<Cow<'_, i64>> {
722 Some(Cow::Owned(*seq as i64))
723}
724
725fn as_actor(actor_index: &ActorIdx) -> Option<Cow<'_, ActorIdx>> {
726 Some(Cow::Borrowed(actor_index))
727}
728
729fn as_max_op(m: &u32) -> Option<Cow<'_, i64>> {
730 Some(Cow::Owned(*m as i64))
731}
732
733fn as_deps(n: NodeIdx) -> Option<Cow<'static, i64>> {
734 Some(Cow::Owned(n.0 as i64))
735}
736
737#[derive(Debug, thiserror::Error)]
738#[error("attempted to derive a clock for a change with dependencies we don't have")]
739pub struct MissingDep(ChangeHash);
740
741#[cfg(test)]
742mod tests {
743 use std::{
744 collections::BTreeMap,
745 time::{SystemTime, UNIX_EPOCH},
746 };
747
748 use crate::{
749 op_set2::{change::build_change, op_set::ResolvedAction, OpSet, TxOp},
750 types::{ObjMeta, OpId, OpType},
751 ActorId, TextEncoding,
752 };
753
754 use super::*;
755
756 #[test]
757 fn clock_by_heads() {
758 let mut builder = TestGraphBuilder::new();
759 let actor1 = builder.actor();
760 let actor2 = builder.actor();
761 let actor3 = builder.actor();
762 let change1 = builder.change(&actor1, 10, &[]);
763 let change2 = builder.change(&actor2, 20, &[change1]);
764 let change3 = builder.change(&actor3, 30, &[change1]);
765 let change4 = builder.change(&actor1, 10, &[change2, change3]);
766 let graph = builder.build();
767
768 let mut expected_clock = SeqClock::new(3);
770 expected_clock.include(builder.index(&actor1), Some(2));
771 expected_clock.include(builder.index(&actor2), Some(1));
772 expected_clock.include(builder.index(&actor3), Some(1));
773
774 let clock = graph.seq_clock_for_heads(&[change4]);
775 assert_eq!(clock, expected_clock);
776 }
777
778 #[test]
779 fn remove_ancestors() {
780 let mut builder = TestGraphBuilder::new();
781 let actor1 = builder.actor();
782 let actor2 = builder.actor();
783 let actor3 = builder.actor();
784 let change1 = builder.change(&actor1, 10, &[]);
785 let change2 = builder.change(&actor2, 20, &[change1]);
786 let change3 = builder.change(&actor3, 30, &[change1]);
787 let change4 = builder.change(&actor1, 10, &[change2, change3]);
788 let graph = builder.build();
789
790 let mut changes = vec![change1, change2, change3, change4]
791 .into_iter()
792 .collect::<BTreeSet<_>>();
793 let heads = vec![change2];
794 graph.remove_ancestors(&mut changes, &heads);
795
796 let expected_changes = vec![change3, change4].into_iter().collect::<BTreeSet<_>>();
797
798 assert_eq!(changes, expected_changes);
799 }
800
801 struct TestGraphBuilder {
802 actors: Vec<ActorId>,
803 changes: Vec<Change>,
804 graph: ChangeGraph,
805 seqs_by_actor: BTreeMap<ActorId, u64>,
806 }
807
808 impl TestGraphBuilder {
809 fn new() -> Self {
810 TestGraphBuilder {
811 actors: Vec::new(),
812 changes: Vec::new(),
813 graph: ChangeGraph::new(0),
814 seqs_by_actor: BTreeMap::new(),
815 }
816 }
817
818 fn actor(&mut self) -> ActorId {
819 let actor = ActorId::random();
820 self.graph.insert_actor(self.actors.len());
821 self.actors.push(actor.clone());
822 actor
823 }
824
825 fn index(&self, actor: &ActorId) -> usize {
826 self.actors.iter().position(|a| a == actor).unwrap()
827 }
828
829 fn change(
834 &mut self,
835 actor: &ActorId,
836 num_new_ops: usize,
837 parents: &[ChangeHash],
838 ) -> ChangeHash {
839 let osd = OpSet::from_actors(self.actors.clone(), TextEncoding::platform_default());
840
841 let start_op = parents
842 .iter()
843 .map(|c| {
844 self.changes
845 .iter()
846 .find(|change| change.hash() == *c)
847 .unwrap()
848 .max_op()
849 })
850 .max()
851 .unwrap_or(0)
852 + 1;
853
854 let actor_idx = self.index(actor);
855 let ops = (0..num_new_ops)
856 .map(|opnum| {
857 TxOp::map(
858 OpId::new(start_op + opnum as u64, actor_idx),
859 ObjMeta::root(),
860 0,
861 ResolvedAction::VisibleUpdate(OpType::Put("value".into())),
862 "key".to_string(),
863 vec![],
864 )
865 })
866 .collect::<Vec<_>>();
867
868 let timestamp = SystemTime::now()
869 .duration_since(UNIX_EPOCH)
870 .unwrap()
871 .as_millis() as i64;
872 let seq = self.seqs_by_actor.entry(actor.clone()).or_insert(1);
873 let meta = BuildChangeMetadata {
874 actor: actor_idx,
875 builder: 0,
876 deps: parents
877 .iter()
878 .map(|h| self.graph.hash_to_index(h).unwrap() as u64)
879 .collect(),
880 seq: *seq,
881 max_op: start_op + ops.len() as u64 - 1,
882 start_op,
883 timestamp,
884 message: None,
885 extra: Cow::Owned(vec![]),
886 };
887 let change = Change::new(build_change(&ops, &meta, &self.graph, &osd.actors));
888 *seq = seq.checked_add(1).unwrap();
889 let hash = change.hash();
890 self.graph.add_change(&change, actor_idx).unwrap();
891 self.changes.push(change);
892 hash
893 }
894
895 fn build(&self) -> ChangeGraph {
896 let mut graph = ChangeGraph::new(self.actors.len());
897 for change in &self.changes {
898 let actor_idx = self.index(change.actor_id());
899 graph.add_change(change, actor_idx).unwrap();
900 }
901 graph
902 }
903 }
904}