automerge/
change_graph.rs

1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::num::NonZeroU32;
5use std::ops::Add;
6
7use hexane::{ColumnCursor, ColumnData, DeltaCursor, StrCursor, UIntCursor};
8
9use crate::storage::BundleMetadata;
10use crate::{
11    clock::{Clock, SeqClock},
12    columnar::column_range::{DepsRange, ValueRange},
13    error::AutomergeError,
14    op_set2::{change::BuildChangeMetadata, ActorCursor, ActorIdx, MetaCursor, ValueMeta},
15    storage::{Columns, DocChangeColumns},
16    types::OpId,
17    Change, ChangeHash,
18};
19
20/// The graph of changes
21///
22/// This is a sort of adjacency list based representation, except that instead of using linked
23/// lists, we keep all the edges and nodes in two vecs and reference them by index which plays nice
24/// with the cache
25
26#[derive(Debug, PartialEq, Default, Clone)]
27pub(crate) struct ChangeGraph {
28    edges: Vec<Edge>,
29    hashes: Vec<ChangeHash>,
30    actors: Vec<ActorIdx>,
31    parents: Vec<Option<EdgeIdx>>,
32    seq: Vec<u32>,
33    max_ops: Vec<u32>,
34    num_ops: ColumnData<UIntCursor>,
35    timestamps: ColumnData<DeltaCursor>,
36    messages: ColumnData<StrCursor>,
37    extra_bytes_meta: ColumnData<MetaCursor>,
38    extra_bytes_raw: Vec<u8>,
39    heads: BTreeSet<ChangeHash>,
40    nodes_by_hash: HashMap<ChangeHash, NodeIdx>,
41    clock_cache: HashMap<NodeIdx, SeqClock>,
42    seq_index: Vec<Vec<NodeIdx>>,
43}
44
45const CACHE_STEP: u32 = 16;
46
47#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
48struct NodeIdx(u32);
49
50impl Add<usize> for NodeIdx {
51    type Output = Self;
52
53    fn add(self, other: usize) -> Self {
54        NodeIdx(self.0 + other as u32)
55    }
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
59struct EdgeIdx(NonZeroU32);
60
61impl EdgeIdx {
62    fn new(value: usize) -> Self {
63        EdgeIdx(NonZeroU32::new(value as u32 + 1).unwrap())
64    }
65    fn get(&self) -> usize {
66        self.0.get() as usize - 1
67    }
68}
69
70#[derive(PartialEq, Debug, Clone)]
71struct Edge {
72    // Edges are always child -> parent so we only store the target, the child is implicit
73    // as you get the edge from the child
74    target: NodeIdx,
75    next: Option<EdgeIdx>,
76}
77
78impl ChangeGraph {
79    pub(crate) fn new(num_actors: usize) -> Self {
80        Self {
81            edges: Vec::new(),
82            nodes_by_hash: HashMap::new(),
83            hashes: Vec::new(),
84            actors: Vec::new(),
85            max_ops: Vec::new(),
86            num_ops: ColumnData::new(),
87            seq: Vec::new(),
88            parents: Vec::new(),
89            messages: ColumnData::new(),
90            timestamps: ColumnData::new(),
91            extra_bytes_meta: ColumnData::new(),
92            extra_bytes_raw: Vec::new(),
93            heads: BTreeSet::new(),
94            clock_cache: HashMap::new(),
95            seq_index: vec![vec![]; num_actors],
96        }
97    }
98
99    pub(crate) fn with_capacity(changes: usize, deps: usize, num_actors: usize) -> Self {
100        Self {
101            edges: Vec::with_capacity(deps),
102            nodes_by_hash: HashMap::new(),
103            hashes: Vec::with_capacity(changes),
104            actors: Vec::with_capacity(changes),
105            max_ops: Vec::with_capacity(changes),
106            num_ops: ColumnData::new(),
107            seq: Vec::with_capacity(changes),
108            parents: Vec::with_capacity(changes),
109            messages: ColumnData::new(),
110            timestamps: ColumnData::new(),
111            extra_bytes_meta: ColumnData::new(),
112            extra_bytes_raw: Vec::new(),
113            heads: BTreeSet::new(),
114            clock_cache: HashMap::new(),
115            seq_index: vec![vec![]; num_actors],
116        }
117    }
118
119    pub(crate) fn all_actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
120        self.seq_index.iter().enumerate().map(|(i, _)| i)
121    }
122
123    pub(crate) fn actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
124        self.seq_index
125            .iter()
126            .enumerate()
127            .filter_map(|(i, v)| if !v.is_empty() { Some(i) } else { None })
128    }
129
130    pub(crate) fn unused_actors(&self) -> impl Iterator<Item = usize> + '_ {
131        self.seq_index
132            .iter()
133            .enumerate()
134            .filter_map(|(i, v)| if v.is_empty() { Some(i) } else { None })
135    }
136
137    pub(crate) fn heads(&self) -> impl Iterator<Item = ChangeHash> + '_ {
138        self.heads.iter().cloned()
139    }
140
141    pub(crate) fn head_indexes(&self) -> impl Iterator<Item = u64> + '_ {
142        self.heads
143            .iter()
144            .map(|h| self.nodes_by_hash.get(h).unwrap().0 as u64)
145    }
146
147    pub(crate) fn num_actors(&self) -> usize {
148        self.seq_index.len()
149    }
150
151    pub(crate) fn insert_actor(&mut self, idx: usize) {
152        if self.seq_index.len() != idx {
153            for actor_index in &mut self.actors {
154                if actor_index.0 >= idx as u32 {
155                    actor_index.0 += 1;
156                }
157            }
158        }
159        for clock in self.clock_cache.values_mut() {
160            clock.rewrite_with_new_actor(idx)
161        }
162        self.seq_index.insert(idx, vec![]);
163    }
164
165    pub(crate) fn remove_actor(&mut self, idx: usize) {
166        for actor_index in &mut self.actors {
167            if actor_index.0 > idx as u32 {
168                actor_index.0 -= 1;
169            }
170        }
171        if self.seq_index.get(idx).is_some() {
172            assert!(self.seq_index[idx].is_empty());
173            self.seq_index.remove(idx);
174        }
175        for clock in &mut self.clock_cache.values_mut() {
176            clock.remove_actor(idx)
177        }
178    }
179
180    pub(crate) fn len(&self) -> usize {
181        self.hashes.len()
182    }
183
184    pub(crate) fn is_empty(&self) -> bool {
185        self.hashes.is_empty()
186    }
187
188    pub(crate) fn hash_to_index(&self, hash: &ChangeHash) -> Option<usize> {
189        self.nodes_by_hash.get(hash).map(|n| n.0 as usize)
190    }
191
192    pub(crate) fn index_to_hash(&self, index: usize) -> Option<&ChangeHash> {
193        self.hashes.get(index)
194    }
195
196    pub(crate) fn max_op_for_actor(&mut self, actor_index: usize) -> u64 {
197        self.seq_index
198            .get(actor_index)
199            .and_then(|s| s.last())
200            .and_then(|index| self.max_ops.get(index.0 as usize).cloned())
201            .unwrap_or(0) as u64
202    }
203
204    pub(crate) fn seq_for_actor(&self, actor: usize) -> u64 {
205        self.seq_index
206            .get(actor)
207            .map(|v| v.len() as u64)
208            .unwrap_or(0)
209    }
210
211    fn deps_iter(&self) -> impl Iterator<Item = NodeIdx> + '_ {
212        self.node_ids().flat_map(|n| self.parents(n))
213    }
214
215    fn num_deps(&self) -> impl Iterator<Item = usize> + '_ {
216        self.node_ids().map(|n| self.parents(n).count())
217    }
218
219    fn node_ids(&self) -> impl Iterator<Item = NodeIdx> {
220        let end = self.hashes.len() as u32;
221        (0..end).map(NodeIdx)
222    }
223
224    pub(crate) fn encode(&self, out: &mut Vec<u8>) -> DocChangeColumns {
225        let actor_iter = self.actors.iter().map(as_actor);
226        let actor = ActorCursor::encode(out, actor_iter, false).into();
227
228        let seq_iter = self.seq.iter().map(as_seq);
229        let seq = DeltaCursor::encode(out, seq_iter, false).into();
230
231        let max_op_iter = self.max_ops.iter().map(as_max_op);
232        let max_op = DeltaCursor::encode(out, max_op_iter, false).into();
233
234        let time = self.timestamps.save_to_unless_empty(out).into();
235
236        let message = self.messages.save_to_unless_empty(out).into();
237
238        let num_deps_iter = self.num_deps().map(as_num_deps);
239        let num_deps = UIntCursor::encode(out, num_deps_iter, false).into();
240
241        let deps_iter = self.deps_iter().map(as_deps);
242        let deps = DeltaCursor::encode(out, deps_iter, false).into();
243
244        // FIXME - we could eliminate this column if empty but meta isnt all null
245        let meta = self.extra_bytes_meta.save_to_unless_empty(out).into();
246        let raw = (out.len()..out.len() + self.extra_bytes_raw.len()).into();
247        out.extend(&self.extra_bytes_raw);
248
249        DocChangeColumns {
250            actor,
251            seq,
252            max_op,
253            time,
254            message,
255            deps: DepsRange::new(num_deps, deps),
256            extra: ValueRange::new(meta, raw),
257            other: Columns::empty(),
258        }
259    }
260
261    pub(crate) fn opid_to_hash(&self, id: OpId) -> Option<ChangeHash> {
262        let actor_indices = self.seq_index.get(id.actor())?;
263        let counter = id.counter();
264        let index = actor_indices
265            .binary_search_by(|n| {
266                let i = n.0 as usize;
267                let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
268                let max_op = self.max_ops[i];
269                let start = max_op as u64 - num_ops + 1;
270                if counter < start {
271                    Ordering::Greater
272                } else if (max_op as u64) < counter {
273                    Ordering::Less
274                } else {
275                    Ordering::Equal
276                }
277            })
278            .ok()?;
279        let node_idx = actor_indices[index];
280        self.hashes.get(node_idx.0 as usize).cloned()
281    }
282
283    pub(crate) fn deps_for_hash(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
284        let node_idx = self.nodes_by_hash.get(hash);
285        let mut edge_idx = node_idx.and_then(|n| self.parents[n.0 as usize]);
286        std::iter::from_fn(move || {
287            let this_edge_idx = edge_idx?;
288            let edge = &self.edges[this_edge_idx.get()];
289            edge_idx = edge.next;
290            let hash = self.hashes[edge.target.0 as usize];
291            Some(hash)
292        })
293    }
294
295    pub(crate) fn has_change(&self, hash: &ChangeHash) -> bool {
296        self.nodes_by_hash.contains_key(hash)
297    }
298
299    pub(crate) fn get_bundle_metadata<I>(
300        &self,
301        hashes: I,
302    ) -> impl Iterator<Item = Result<BundleMetadata<'_>, MissingDep>>
303    where
304        I: IntoIterator<Item = ChangeHash>,
305    {
306        hashes.into_iter().map(|hash| {
307            let index = self
308                .nodes_by_hash
309                .get(&hash)
310                .cloned()
311                .ok_or(MissingDep(hash))?;
312            let i = index.0 as usize;
313            let actor = self.actors[i].into();
314            let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
315            let max_op = self.max_ops[i] as u64;
316            let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
317            let message = self.messages.get(i).flatten();
318
319            // FIXME - this needs a test
320            let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
321            let meta_range =
322                meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
323            let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
324
325            let deps = self
326                .parents(index)
327                .map(|p| self.hashes[p.0 as usize])
328                .collect::<Vec<_>>();
329
330            //num_deps += deps.len();
331            let start_op = max_op - num_ops + 1;
332            let seq = self.seq[i] as u64;
333            Ok(BundleMetadata {
334                hash,
335                actor,
336                seq,
337                start_op,
338                max_op,
339                timestamp,
340                message,
341                extra,
342                deps,
343                builder: i,
344            })
345        })
346    }
347
348    pub(crate) fn get_build_metadata<I>(
349        &self,
350        hashes: I,
351    ) -> Result<(Vec<BuildChangeMetadata<'_>>, usize), MissingDep>
352    where
353        I: IntoIterator<Item = ChangeHash>,
354    {
355        let indexes: Vec<_> = hashes
356            .into_iter()
357            .map(|hash| {
358                self.nodes_by_hash
359                    .get(&hash)
360                    .cloned()
361                    .ok_or(MissingDep(hash))
362            })
363            .collect::<Result<_, _>>()?;
364
365        Ok(self.get_build_metadata_for_indexes(indexes))
366    }
367
368    fn get_build_metadata_for_indexes<I>(&self, indexes: I) -> (Vec<BuildChangeMetadata<'_>>, usize)
369    where
370        I: IntoIterator<Item = NodeIdx>,
371    {
372        let mut num_deps = 0;
373        let changes = indexes
374            .into_iter()
375            .map(|index| {
376                let i = index.0 as usize;
377                let actor = self.actors[i].into();
378                let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
379                let max_op = self.max_ops[i] as u64;
380                let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
381                let message = self.messages.get(i).flatten();
382
383                // FIXME - this needs a test
384                let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
385                let meta_range =
386                    meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
387                let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
388
389                let deps = self.parents(index).map(|p| p.0 as u64).collect::<Vec<_>>();
390                num_deps += deps.len();
391                let start_op = max_op - num_ops + 1;
392                let seq = self.seq[i] as u64;
393                BuildChangeMetadata {
394                    actor,
395                    seq,
396                    start_op,
397                    max_op,
398                    timestamp,
399                    message,
400                    extra,
401                    deps,
402                    builder: i,
403                }
404            })
405            .collect();
406        (changes, num_deps)
407    }
408
409    fn get_build_indexes(&self, clock: SeqClock) -> Vec<NodeIdx> {
410        let mut change_indexes: Vec<NodeIdx> = Vec::new();
411        // walk the state from the given deps clock and add them into the vec
412        for (actor_index, actor_changes) in self.seq_index.iter().enumerate() {
413            if let Some(seq) = clock.get_for_actor(&actor_index) {
414                // find the change in this actors sequence of changes that corresponds to the max_op
415                // recorded for them in the clock
416                change_indexes.extend(&actor_changes[seq.get() as usize..]);
417            } else {
418                change_indexes.extend(&actor_changes[..]);
419            }
420        }
421
422        // ensure the changes are still in sorted order
423        change_indexes.sort_unstable();
424
425        change_indexes
426    }
427
428    #[inline(never)]
429    pub(crate) fn get_hashes(&self, have_deps: &[ChangeHash]) -> Vec<ChangeHash> {
430        let clock = self.seq_clock_for_heads(have_deps);
431        self.get_build_indexes(clock)
432            .into_iter()
433            .filter_map(|node| self.hashes.get(node.0 as usize))
434            .copied()
435            .collect()
436    }
437
438    pub(crate) fn get_build_metadata_clock(
439        &self,
440        have_deps: &[ChangeHash],
441    ) -> (Vec<BuildChangeMetadata<'_>>, usize) {
442        let clock = self.seq_clock_for_heads(have_deps);
443        let change_indexes = self.get_build_indexes(clock);
444        self.get_build_metadata_for_indexes(change_indexes)
445    }
446
447    pub(crate) fn get_hash_for_actor_seq(
448        &self,
449        actor: usize,
450        seq: u64,
451    ) -> Result<ChangeHash, AutomergeError> {
452        self.seq_index
453            .get(actor)
454            .and_then(|v| v.get(seq as usize - 1))
455            .and_then(|i| self.hashes.get(i.0 as usize))
456            .ok_or(AutomergeError::InvalidSeq(seq))
457            .copied()
458    }
459
460    fn update_heads(&mut self, change: &Change) {
461        for d in change.deps() {
462            self.heads.remove(d);
463        }
464        self.heads.insert(change.hash());
465    }
466
467    pub(crate) fn from_iter<
468        'a,
469        I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
470    >(
471        iter: I,
472        deps: usize,
473        num_actors: usize,
474    ) -> Result<Self, MissingDep> {
475        let mut seen = HashSet::new();
476        for (change, _) in iter.clone() {
477            for h in change.deps().iter() {
478                if !seen.contains(h) {
479                    return Err(MissingDep(*h));
480                }
481            }
482            seen.insert(change.hash());
483        }
484
485        let mut graph = ChangeGraph::with_capacity(iter.len(), deps, num_actors);
486        graph.add_changes(iter)?;
487        Ok(graph)
488    }
489
490    pub(crate) fn add_nodes<
491        'a,
492        I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
493    >(
494        &mut self,
495        iter: I,
496    ) {
497        self.actors
498            .extend(iter.clone().map(|(_, a)| ActorIdx::from(a)));
499        self.seq.extend(iter.clone().map(|(c, _)| c.seq() as u32));
500        self.max_ops
501            .extend(iter.clone().map(|(c, _)| c.max_op() as u32));
502        self.num_ops
503            .extend(iter.clone().map(|(c, _)| c.len() as u64));
504        self.timestamps
505            .extend(iter.clone().map(|(c, _)| c.timestamp()));
506        self.messages
507            .extend(iter.clone().map(|(c, _)| c.message().cloned()));
508        self.extra_bytes_meta
509            .extend(iter.clone().map(|(c, _)| ValueMeta::from(c.extra_bytes())));
510        self.parents
511            .extend(std::iter::repeat(None).take(iter.len()));
512        for (c, _) in iter {
513            self.extra_bytes_raw.extend_from_slice(c.extra_bytes());
514        }
515    }
516
517    fn add_changes<'a, I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone>(
518        &mut self,
519        iter: I,
520    ) -> Result<(), MissingDep> {
521        let node = NodeIdx(self.hashes.len() as u32);
522
523        self.add_nodes(iter.clone());
524
525        for (i, (change, actor)) in iter.enumerate() {
526            let node_idx = node + i;
527            let hash = change.hash();
528            self.hashes.push(hash);
529            debug_assert!(!self.nodes_by_hash.contains_key(&hash));
530            self.nodes_by_hash.insert(hash, node_idx);
531            self.update_heads(change);
532
533            assert!(actor < self.seq_index.len());
534            assert_eq!(self.seq_index[actor].len() + 1, change.seq() as usize);
535            self.seq_index[actor].push(node_idx);
536
537            for parent_hash in change.deps().iter() {
538                self.add_parent(node_idx, parent_hash);
539            }
540
541            if (node_idx + 1).0 % CACHE_STEP == 0 {
542                self.cache_clock(node_idx);
543            }
544        }
545        Ok(())
546    }
547
548    pub(crate) fn add_change(&mut self, change: &Change, actor: usize) -> Result<(), MissingDep> {
549        let hash = change.hash();
550
551        if self.nodes_by_hash.contains_key(&hash) {
552            return Ok(());
553        }
554
555        for h in change.deps().iter() {
556            if !self.nodes_by_hash.contains_key(h) {
557                return Err(MissingDep(*h));
558            }
559        }
560
561        self.add_changes([(change, actor)].into_iter())
562    }
563
564    fn cache_clock(&mut self, node_idx: NodeIdx) -> SeqClock {
565        let mut clock = SeqClock::new(self.num_actors());
566        let mut to_visit = BTreeSet::from([node_idx]);
567
568        self.calculate_clock_inner(&mut clock, &mut to_visit, CACHE_STEP as usize * 2);
569
570        for n in to_visit {
571            let sub = self.cache_clock(n);
572            SeqClock::merge(&mut clock, &sub);
573        }
574
575        self.clock_cache.insert(node_idx, clock.clone());
576
577        clock
578    }
579
580    fn add_parent(&mut self, child_idx: NodeIdx, parent_hash: &ChangeHash) {
581        debug_assert!(self.nodes_by_hash.contains_key(parent_hash));
582        let parent_idx = *self.nodes_by_hash.get(parent_hash).unwrap();
583        let new_edge_idx = EdgeIdx::new(self.edges.len());
584        self.edges.push(Edge {
585            target: parent_idx,
586            next: None,
587        });
588
589        let child = &mut self.parents[child_idx.0 as usize];
590        if let Some(edge_idx) = child {
591            let mut edge = &mut self.edges[edge_idx.get()];
592            while let Some(next) = edge.next {
593                edge = &mut self.edges[next.get()];
594            }
595            edge.next = Some(new_edge_idx);
596        } else {
597            *child = Some(new_edge_idx);
598        }
599    }
600
601    pub(crate) fn deps(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
602        let mut iter = self.nodes_by_hash.get(hash).map(|node| self.parents(*node));
603        std::iter::from_fn(move || {
604            let next = iter.as_mut()?.next()?;
605            self.hashes.get(next.0 as usize).copied()
606        })
607    }
608
609    fn parents(&self, node_idx: NodeIdx) -> impl Iterator<Item = NodeIdx> + '_ {
610        let mut edge_idx = self.parents[node_idx.0 as usize];
611        std::iter::from_fn(move || {
612            let this_edge_idx = edge_idx?;
613            let edge = &self.edges[this_edge_idx.get()];
614            edge_idx = edge.next;
615            Some(edge.target)
616        })
617    }
618
619    fn heads_to_nodes(&self, heads: &[ChangeHash]) -> Vec<NodeIdx> {
620        heads
621            .iter()
622            .filter_map(|h| self.nodes_by_hash.get(h))
623            .copied()
624            .collect()
625    }
626
627    pub(crate) fn clock_for_heads(&self, heads: &[ChangeHash]) -> Clock {
628        let nodes = self.heads_to_nodes(heads);
629        self.calculate_clock(nodes)
630            .iter()
631            .map(|(actor, seq)| {
632                self.seq_index
633                    .get(actor)
634                    .and_then(|v| v.get(seq?.get() as usize - 1))
635                    .and_then(|i| self.max_ops.get(i.0 as usize))
636                    .copied()
637            })
638            .collect()
639    }
640
641    pub(crate) fn seq_clock_for_heads(&self, heads: &[ChangeHash]) -> SeqClock {
642        let nodes = self.heads_to_nodes(heads);
643        self.calculate_clock(nodes)
644    }
645
646    fn clock_data_for(&self, idx: NodeIdx) -> Option<u32> {
647        Some(*self.seq.get(idx.0 as usize)?)
648    }
649
650    fn calculate_clock(&self, nodes: Vec<NodeIdx>) -> SeqClock {
651        let mut clock = SeqClock::new(self.num_actors());
652        let mut to_visit = nodes.into_iter().collect::<BTreeSet<_>>();
653
654        self.calculate_clock_inner(&mut clock, &mut to_visit, usize::MAX);
655
656        assert!(to_visit.is_empty());
657
658        clock
659    }
660
661    fn calculate_clock_inner(
662        &self,
663        clock: &mut SeqClock,
664        to_visit: &mut BTreeSet<NodeIdx>,
665        limit: usize,
666    ) {
667        let mut visited = BTreeSet::new();
668
669        while let Some(idx) = to_visit.pop_last() {
670            assert!(!visited.contains(&idx));
671            assert!(visited.len() <= self.hashes.len());
672            visited.insert(idx);
673
674            let actor = self.actors[idx.0 as usize];
675            let data = self.clock_data_for(idx);
676            clock.include(actor.into(), data);
677
678            if let Some(cached) = self.clock_cache.get(&idx) {
679                SeqClock::merge(clock, cached);
680            } else if visited.len() <= limit {
681                to_visit.extend(self.parents(idx).filter(|p| !visited.contains(p)));
682            } else {
683                break;
684            }
685        }
686    }
687
688    pub(crate) fn remove_ancestors(
689        &self,
690        changes: &mut BTreeSet<ChangeHash>,
691        heads: &[ChangeHash],
692    ) {
693        let nodes = self.heads_to_nodes(heads);
694        self.traverse_ancestors(nodes, |idx| {
695            let hash = &self.hashes[idx.0 as usize];
696            changes.remove(hash);
697            true
698        });
699    }
700
701    fn traverse_ancestors<F: FnMut(NodeIdx) -> bool>(&self, mut to_visit: Vec<NodeIdx>, mut f: F) {
702        let mut visited = BTreeSet::new();
703
704        while let Some(idx) = to_visit.pop() {
705            if visited.contains(&idx) {
706                continue;
707            } else {
708                visited.insert(idx);
709            }
710            if f(idx) {
711                to_visit.extend(self.parents(idx));
712            }
713        }
714    }
715}
716
717fn as_num_deps(num: usize) -> Option<Cow<'static, u64>> {
718    Some(Cow::Owned(num as u64))
719}
720
721fn as_seq(seq: &u32) -> Option<Cow<'_, i64>> {
722    Some(Cow::Owned(*seq as i64))
723}
724
725fn as_actor(actor_index: &ActorIdx) -> Option<Cow<'_, ActorIdx>> {
726    Some(Cow::Borrowed(actor_index))
727}
728
729fn as_max_op(m: &u32) -> Option<Cow<'_, i64>> {
730    Some(Cow::Owned(*m as i64))
731}
732
733fn as_deps(n: NodeIdx) -> Option<Cow<'static, i64>> {
734    Some(Cow::Owned(n.0 as i64))
735}
736
737#[derive(Debug, thiserror::Error)]
738#[error("attempted to derive a clock for a change with dependencies we don't have")]
739pub struct MissingDep(ChangeHash);
740
741#[cfg(test)]
742mod tests {
743    use std::{
744        collections::BTreeMap,
745        time::{SystemTime, UNIX_EPOCH},
746    };
747
748    use crate::{
749        op_set2::{change::build_change, op_set::ResolvedAction, OpSet, TxOp},
750        types::{ObjMeta, OpId, OpType},
751        ActorId, TextEncoding,
752    };
753
754    use super::*;
755
756    #[test]
757    fn clock_by_heads() {
758        let mut builder = TestGraphBuilder::new();
759        let actor1 = builder.actor();
760        let actor2 = builder.actor();
761        let actor3 = builder.actor();
762        let change1 = builder.change(&actor1, 10, &[]);
763        let change2 = builder.change(&actor2, 20, &[change1]);
764        let change3 = builder.change(&actor3, 30, &[change1]);
765        let change4 = builder.change(&actor1, 10, &[change2, change3]);
766        let graph = builder.build();
767
768        // todo - why 4?
769        let mut expected_clock = SeqClock::new(3);
770        expected_clock.include(builder.index(&actor1), Some(2));
771        expected_clock.include(builder.index(&actor2), Some(1));
772        expected_clock.include(builder.index(&actor3), Some(1));
773
774        let clock = graph.seq_clock_for_heads(&[change4]);
775        assert_eq!(clock, expected_clock);
776    }
777
778    #[test]
779    fn remove_ancestors() {
780        let mut builder = TestGraphBuilder::new();
781        let actor1 = builder.actor();
782        let actor2 = builder.actor();
783        let actor3 = builder.actor();
784        let change1 = builder.change(&actor1, 10, &[]);
785        let change2 = builder.change(&actor2, 20, &[change1]);
786        let change3 = builder.change(&actor3, 30, &[change1]);
787        let change4 = builder.change(&actor1, 10, &[change2, change3]);
788        let graph = builder.build();
789
790        let mut changes = vec![change1, change2, change3, change4]
791            .into_iter()
792            .collect::<BTreeSet<_>>();
793        let heads = vec![change2];
794        graph.remove_ancestors(&mut changes, &heads);
795
796        let expected_changes = vec![change3, change4].into_iter().collect::<BTreeSet<_>>();
797
798        assert_eq!(changes, expected_changes);
799    }
800
801    struct TestGraphBuilder {
802        actors: Vec<ActorId>,
803        changes: Vec<Change>,
804        graph: ChangeGraph,
805        seqs_by_actor: BTreeMap<ActorId, u64>,
806    }
807
808    impl TestGraphBuilder {
809        fn new() -> Self {
810            TestGraphBuilder {
811                actors: Vec::new(),
812                changes: Vec::new(),
813                graph: ChangeGraph::new(0),
814                seqs_by_actor: BTreeMap::new(),
815            }
816        }
817
818        fn actor(&mut self) -> ActorId {
819            let actor = ActorId::random();
820            self.graph.insert_actor(self.actors.len());
821            self.actors.push(actor.clone());
822            actor
823        }
824
825        fn index(&self, actor: &ActorId) -> usize {
826            self.actors.iter().position(|a| a == actor).unwrap()
827        }
828
829        /// Create a change with `num_new_ops` and `parents` for `actor`
830        ///
831        /// The `start_op` and `seq` of the change will be computed from the
832        /// previous changes for the same actor.
833        fn change(
834            &mut self,
835            actor: &ActorId,
836            num_new_ops: usize,
837            parents: &[ChangeHash],
838        ) -> ChangeHash {
839            let osd = OpSet::from_actors(self.actors.clone(), TextEncoding::platform_default());
840
841            let start_op = parents
842                .iter()
843                .map(|c| {
844                    self.changes
845                        .iter()
846                        .find(|change| change.hash() == *c)
847                        .unwrap()
848                        .max_op()
849                })
850                .max()
851                .unwrap_or(0)
852                + 1;
853
854            let actor_idx = self.index(actor);
855            let ops = (0..num_new_ops)
856                .map(|opnum| {
857                    TxOp::map(
858                        OpId::new(start_op + opnum as u64, actor_idx),
859                        ObjMeta::root(),
860                        0,
861                        ResolvedAction::VisibleUpdate(OpType::Put("value".into())),
862                        "key".to_string(),
863                        vec![],
864                    )
865                })
866                .collect::<Vec<_>>();
867
868            let timestamp = SystemTime::now()
869                .duration_since(UNIX_EPOCH)
870                .unwrap()
871                .as_millis() as i64;
872            let seq = self.seqs_by_actor.entry(actor.clone()).or_insert(1);
873            let meta = BuildChangeMetadata {
874                actor: actor_idx,
875                builder: 0,
876                deps: parents
877                    .iter()
878                    .map(|h| self.graph.hash_to_index(h).unwrap() as u64)
879                    .collect(),
880                seq: *seq,
881                max_op: start_op + ops.len() as u64 - 1,
882                start_op,
883                timestamp,
884                message: None,
885                extra: Cow::Owned(vec![]),
886            };
887            let change = Change::new(build_change(&ops, &meta, &self.graph, &osd.actors));
888            *seq = seq.checked_add(1).unwrap();
889            let hash = change.hash();
890            self.graph.add_change(&change, actor_idx).unwrap();
891            self.changes.push(change);
892            hash
893        }
894
895        fn build(&self) -> ChangeGraph {
896            let mut graph = ChangeGraph::new(self.actors.len());
897            for change in &self.changes {
898                let actor_idx = self.index(change.actor_id());
899                graph.add_change(change, actor_idx).unwrap();
900            }
901            graph
902        }
903    }
904}