use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::num::NonZeroU32;
use std::ops::Add;
use hexane::{ColumnCursor, ColumnData, DeltaCursor, StrCursor, UIntCursor};
use crate::storage::BundleMetadata;
use crate::{
clock::{Clock, SeqClock},
columnar::column_range::{DepsRange, ValueRange},
error::AutomergeError,
op_set2::{change::BuildChangeMetadata, ActorCursor, ActorIdx, MetaCursor, ValueMeta},
storage::{Columns, DocChangeColumns},
types::OpId,
Change, ChangeHash,
};
#[derive(Debug, PartialEq, Default, Clone)]
pub(crate) struct ChangeGraph {
edges: Vec<Edge>,
hashes: Vec<ChangeHash>,
actors: Vec<ActorIdx>,
parents: Vec<Option<EdgeIdx>>,
seq: Vec<u32>,
max_ops: Vec<u32>,
num_ops: ColumnData<UIntCursor>,
timestamps: ColumnData<DeltaCursor>,
messages: ColumnData<StrCursor>,
extra_bytes_meta: ColumnData<MetaCursor>,
extra_bytes_raw: Vec<u8>,
heads: BTreeSet<ChangeHash>,
nodes_by_hash: HashMap<ChangeHash, NodeIdx>,
clock_cache: HashMap<NodeIdx, SeqClock>,
seq_index: Vec<Vec<NodeIdx>>,
}
const CACHE_STEP: u32 = 16;
#[derive(Hash, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct NodeIdx(u32);
impl Add<usize> for NodeIdx {
type Output = Self;
fn add(self, other: usize) -> Self {
NodeIdx(self.0 + other as u32)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct EdgeIdx(NonZeroU32);
impl EdgeIdx {
fn new(value: usize) -> Self {
EdgeIdx(NonZeroU32::new(value as u32 + 1).unwrap())
}
fn get(&self) -> usize {
self.0.get() as usize - 1
}
}
#[derive(PartialEq, Debug, Clone)]
struct Edge {
target: NodeIdx,
next: Option<EdgeIdx>,
}
impl ChangeGraph {
pub(crate) fn new(num_actors: usize) -> Self {
Self {
edges: Vec::new(),
nodes_by_hash: HashMap::new(),
hashes: Vec::new(),
actors: Vec::new(),
max_ops: Vec::new(),
num_ops: ColumnData::new(),
seq: Vec::new(),
parents: Vec::new(),
messages: ColumnData::new(),
timestamps: ColumnData::new(),
extra_bytes_meta: ColumnData::new(),
extra_bytes_raw: Vec::new(),
heads: BTreeSet::new(),
clock_cache: HashMap::new(),
seq_index: vec![vec![]; num_actors],
}
}
pub(crate) fn with_capacity(changes: usize, deps: usize, num_actors: usize) -> Self {
Self {
edges: Vec::with_capacity(deps),
nodes_by_hash: HashMap::new(),
hashes: Vec::with_capacity(changes),
actors: Vec::with_capacity(changes),
max_ops: Vec::with_capacity(changes),
num_ops: ColumnData::new(),
seq: Vec::with_capacity(changes),
parents: Vec::with_capacity(changes),
messages: ColumnData::new(),
timestamps: ColumnData::new(),
extra_bytes_meta: ColumnData::new(),
extra_bytes_raw: Vec::new(),
heads: BTreeSet::new(),
clock_cache: HashMap::new(),
seq_index: vec![vec![]; num_actors],
}
}
pub(crate) fn all_actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
self.seq_index.iter().enumerate().map(|(i, _)| i)
}
pub(crate) fn actor_ids(&self) -> impl Iterator<Item = usize> + '_ {
self.seq_index
.iter()
.enumerate()
.filter_map(|(i, v)| if !v.is_empty() { Some(i) } else { None })
}
pub(crate) fn unused_actors(&self) -> impl Iterator<Item = usize> + '_ {
self.seq_index
.iter()
.enumerate()
.filter_map(|(i, v)| if v.is_empty() { Some(i) } else { None })
}
pub(crate) fn heads(&self) -> impl Iterator<Item = ChangeHash> + '_ {
self.heads.iter().cloned()
}
pub(crate) fn head_indexes(&self) -> impl Iterator<Item = u64> + '_ {
self.heads
.iter()
.map(|h| self.nodes_by_hash.get(h).unwrap().0 as u64)
}
pub(crate) fn num_actors(&self) -> usize {
self.seq_index.len()
}
pub(crate) fn insert_actor(&mut self, idx: usize) {
if self.seq_index.len() != idx {
for actor_index in &mut self.actors {
if actor_index.0 >= idx as u32 {
actor_index.0 += 1;
}
}
}
for clock in self.clock_cache.values_mut() {
clock.rewrite_with_new_actor(idx)
}
self.seq_index.insert(idx, vec![]);
}
pub(crate) fn remove_actor(&mut self, idx: usize) {
for actor_index in &mut self.actors {
if actor_index.0 > idx as u32 {
actor_index.0 -= 1;
}
}
if self.seq_index.get(idx).is_some() {
assert!(self.seq_index[idx].is_empty());
self.seq_index.remove(idx);
}
for clock in &mut self.clock_cache.values_mut() {
clock.remove_actor(idx)
}
}
pub(crate) fn len(&self) -> usize {
self.hashes.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
pub(crate) fn hash_to_index(&self, hash: &ChangeHash) -> Option<usize> {
self.nodes_by_hash.get(hash).map(|n| n.0 as usize)
}
pub(crate) fn index_to_hash(&self, index: usize) -> Option<&ChangeHash> {
self.hashes.get(index)
}
pub(crate) fn max_op_for_actor(&mut self, actor_index: usize) -> u64 {
self.seq_index
.get(actor_index)
.and_then(|s| s.last())
.and_then(|index| self.max_ops.get(index.0 as usize).cloned())
.unwrap_or(0) as u64
}
pub(crate) fn seq_for_actor(&self, actor: usize) -> u64 {
self.seq_index
.get(actor)
.map(|v| v.len() as u64)
.unwrap_or(0)
}
fn deps_iter(&self) -> impl Iterator<Item = NodeIdx> + '_ {
self.node_ids().flat_map(|n| self.parents(n))
}
fn num_deps(&self) -> impl Iterator<Item = usize> + '_ {
self.node_ids().map(|n| self.parents(n).count())
}
fn node_ids(&self) -> impl Iterator<Item = NodeIdx> {
let end = self.hashes.len() as u32;
(0..end).map(NodeIdx)
}
pub(crate) fn encode(&self, out: &mut Vec<u8>) -> DocChangeColumns {
let actor_iter = self.actors.iter().map(as_actor);
let actor = ActorCursor::encode(out, actor_iter, false).into();
let seq_iter = self.seq.iter().map(as_seq);
let seq = DeltaCursor::encode(out, seq_iter, false).into();
let max_op_iter = self.max_ops.iter().map(as_max_op);
let max_op = DeltaCursor::encode(out, max_op_iter, false).into();
let time = self.timestamps.save_to_unless_empty(out).into();
let message = self.messages.save_to_unless_empty(out).into();
let num_deps_iter = self.num_deps().map(as_num_deps);
let num_deps = UIntCursor::encode(out, num_deps_iter, false).into();
let deps_iter = self.deps_iter().map(as_deps);
let deps = DeltaCursor::encode(out, deps_iter, false).into();
let meta = self.extra_bytes_meta.save_to_unless_empty(out).into();
let raw = (out.len()..out.len() + self.extra_bytes_raw.len()).into();
out.extend(&self.extra_bytes_raw);
DocChangeColumns {
actor,
seq,
max_op,
time,
message,
deps: DepsRange::new(num_deps, deps),
extra: ValueRange::new(meta, raw),
other: Columns::empty(),
}
}
pub(crate) fn opid_to_hash(&self, id: OpId) -> Option<ChangeHash> {
let actor_indices = self.seq_index.get(id.actor())?;
let counter = id.counter();
let index = actor_indices
.binary_search_by(|n| {
let i = n.0 as usize;
let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
let max_op = self.max_ops[i];
let start = max_op as u64 - num_ops + 1;
if counter < start {
Ordering::Greater
} else if (max_op as u64) < counter {
Ordering::Less
} else {
Ordering::Equal
}
})
.ok()?;
let node_idx = actor_indices[index];
self.hashes.get(node_idx.0 as usize).cloned()
}
pub(crate) fn deps_for_hash(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
let node_idx = self.nodes_by_hash.get(hash);
let mut edge_idx = node_idx.and_then(|n| self.parents[n.0 as usize]);
std::iter::from_fn(move || {
let this_edge_idx = edge_idx?;
let edge = &self.edges[this_edge_idx.get()];
edge_idx = edge.next;
let hash = self.hashes[edge.target.0 as usize];
Some(hash)
})
}
pub(crate) fn has_change(&self, hash: &ChangeHash) -> bool {
self.nodes_by_hash.contains_key(hash)
}
pub(crate) fn get_bundle_metadata<I>(
&self,
hashes: I,
) -> impl Iterator<Item = Result<BundleMetadata<'_>, MissingDep>>
where
I: IntoIterator<Item = ChangeHash>,
{
hashes.into_iter().map(|hash| {
let index = self
.nodes_by_hash
.get(&hash)
.cloned()
.ok_or(MissingDep(hash))?;
let i = index.0 as usize;
let actor = self.actors[i].into();
let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
let max_op = self.max_ops[i] as u64;
let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
let message = self.messages.get(i).flatten();
let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
let meta_range =
meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
let deps = self
.parents(index)
.map(|p| self.hashes[p.0 as usize])
.collect::<Vec<_>>();
let start_op = max_op - num_ops + 1;
let seq = self.seq[i] as u64;
Ok(BundleMetadata {
hash,
actor,
seq,
start_op,
max_op,
timestamp,
message,
extra,
deps,
builder: i,
})
})
}
pub(crate) fn get_build_metadata<I>(
&self,
hashes: I,
) -> Result<(Vec<BuildChangeMetadata<'_>>, usize), MissingDep>
where
I: IntoIterator<Item = ChangeHash>,
{
let indexes: Vec<_> = hashes
.into_iter()
.map(|hash| {
self.nodes_by_hash
.get(&hash)
.cloned()
.ok_or(MissingDep(hash))
})
.collect::<Result<_, _>>()?;
Ok(self.get_build_metadata_for_indexes(indexes))
}
fn get_build_metadata_for_indexes<I>(&self, indexes: I) -> (Vec<BuildChangeMetadata<'_>>, usize)
where
I: IntoIterator<Item = NodeIdx>,
{
let mut num_deps = 0;
let changes = indexes
.into_iter()
.map(|index| {
let i = index.0 as usize;
let actor = self.actors[i].into();
let timestamp = *self.timestamps.get(i).flatten().unwrap_or_default();
let max_op = self.max_ops[i] as u64;
let num_ops = *self.num_ops.get(i).flatten().unwrap_or_default();
let message = self.messages.get(i).flatten();
let meta = self.extra_bytes_meta.get_with_acc(i).unwrap();
let meta_range =
meta.acc.as_usize()..(meta.acc.as_usize() + meta.item.unwrap().length());
let extra = Cow::Borrowed(&self.extra_bytes_raw[meta_range]);
let deps = self.parents(index).map(|p| p.0 as u64).collect::<Vec<_>>();
num_deps += deps.len();
let start_op = max_op - num_ops + 1;
let seq = self.seq[i] as u64;
BuildChangeMetadata {
actor,
seq,
start_op,
max_op,
timestamp,
message,
extra,
deps,
builder: i,
}
})
.collect();
(changes, num_deps)
}
fn get_build_indexes(&self, clock: SeqClock) -> Vec<NodeIdx> {
let mut change_indexes: Vec<NodeIdx> = Vec::new();
for (actor_index, actor_changes) in self.seq_index.iter().enumerate() {
if let Some(seq) = clock.get_for_actor(&actor_index) {
change_indexes.extend(&actor_changes[seq.get() as usize..]);
} else {
change_indexes.extend(&actor_changes[..]);
}
}
change_indexes.sort_unstable();
change_indexes
}
#[inline(never)]
pub(crate) fn get_hashes(&self, have_deps: &[ChangeHash]) -> Vec<ChangeHash> {
let clock = self.seq_clock_for_heads(have_deps);
self.get_build_indexes(clock)
.into_iter()
.filter_map(|node| self.hashes.get(node.0 as usize))
.copied()
.collect()
}
pub(crate) fn get_build_metadata_clock(
&self,
have_deps: &[ChangeHash],
) -> (Vec<BuildChangeMetadata<'_>>, usize) {
let clock = self.seq_clock_for_heads(have_deps);
let change_indexes = self.get_build_indexes(clock);
self.get_build_metadata_for_indexes(change_indexes)
}
pub(crate) fn get_hash_for_actor_seq(
&self,
actor: usize,
seq: u64,
) -> Result<ChangeHash, AutomergeError> {
self.seq_index
.get(actor)
.and_then(|v| v.get(seq as usize - 1))
.and_then(|i| self.hashes.get(i.0 as usize))
.ok_or(AutomergeError::InvalidSeq(seq))
.copied()
}
fn update_heads(&mut self, change: &Change) {
for d in change.deps() {
self.heads.remove(d);
}
self.heads.insert(change.hash());
}
pub(crate) fn from_iter<
'a,
I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
>(
iter: I,
deps: usize,
num_actors: usize,
) -> Result<Self, MissingDep> {
let mut seen = HashSet::new();
for (change, _) in iter.clone() {
for h in change.deps().iter() {
if !seen.contains(h) {
return Err(MissingDep(*h));
}
}
seen.insert(change.hash());
}
let mut graph = ChangeGraph::with_capacity(iter.len(), deps, num_actors);
graph.add_changes(iter)?;
Ok(graph)
}
pub(crate) fn add_nodes<
'a,
I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone,
>(
&mut self,
iter: I,
) {
self.actors
.extend(iter.clone().map(|(_, a)| ActorIdx::from(a)));
self.seq.extend(iter.clone().map(|(c, _)| c.seq() as u32));
self.max_ops
.extend(iter.clone().map(|(c, _)| c.max_op() as u32));
self.num_ops
.extend(iter.clone().map(|(c, _)| c.len() as u64));
self.timestamps
.extend(iter.clone().map(|(c, _)| c.timestamp()));
self.messages
.extend(iter.clone().map(|(c, _)| c.message().cloned()));
self.extra_bytes_meta
.extend(iter.clone().map(|(c, _)| ValueMeta::from(c.extra_bytes())));
self.parents.extend(std::iter::repeat_n(None, iter.len()));
for (c, _) in iter {
self.extra_bytes_raw.extend_from_slice(c.extra_bytes());
}
}
fn add_changes<'a, I: Iterator<Item = (&'a Change, usize)> + ExactSizeIterator + Clone>(
&mut self,
iter: I,
) -> Result<(), MissingDep> {
let node = NodeIdx(self.hashes.len() as u32);
self.add_nodes(iter.clone());
for (i, (change, actor)) in iter.enumerate() {
let node_idx = node + i;
let hash = change.hash();
self.hashes.push(hash);
debug_assert!(!self.nodes_by_hash.contains_key(&hash));
self.nodes_by_hash.insert(hash, node_idx);
self.update_heads(change);
assert!(actor < self.seq_index.len());
assert_eq!(self.seq_index[actor].len() + 1, change.seq() as usize);
self.seq_index[actor].push(node_idx);
for parent_hash in change.deps().iter() {
self.add_parent(node_idx, parent_hash);
}
if (node_idx + 1).0 % CACHE_STEP == 0 {
self.cache_clock(node_idx);
}
}
Ok(())
}
pub(crate) fn add_change(&mut self, change: &Change, actor: usize) -> Result<(), MissingDep> {
let hash = change.hash();
if self.nodes_by_hash.contains_key(&hash) {
return Ok(());
}
for h in change.deps().iter() {
if !self.nodes_by_hash.contains_key(h) {
return Err(MissingDep(*h));
}
}
self.add_changes([(change, actor)].into_iter())
}
fn cache_clock(&mut self, node_idx: NodeIdx) -> SeqClock {
let mut clock = SeqClock::new(self.num_actors());
let mut to_visit = BTreeSet::from([node_idx]);
self.calculate_clock_inner(&mut clock, &mut to_visit, CACHE_STEP as usize * 2);
for n in to_visit {
let sub = self.cache_clock(n);
SeqClock::merge(&mut clock, &sub);
}
self.clock_cache.insert(node_idx, clock.clone());
clock
}
fn add_parent(&mut self, child_idx: NodeIdx, parent_hash: &ChangeHash) {
debug_assert!(self.nodes_by_hash.contains_key(parent_hash));
let parent_idx = *self.nodes_by_hash.get(parent_hash).unwrap();
let new_edge_idx = EdgeIdx::new(self.edges.len());
self.edges.push(Edge {
target: parent_idx,
next: None,
});
let child = &mut self.parents[child_idx.0 as usize];
if let Some(edge_idx) = child {
let mut edge = &mut self.edges[edge_idx.get()];
while let Some(next) = edge.next {
edge = &mut self.edges[next.get()];
}
edge.next = Some(new_edge_idx);
} else {
*child = Some(new_edge_idx);
}
}
pub(crate) fn deps(&self, hash: &ChangeHash) -> impl Iterator<Item = ChangeHash> + '_ {
let mut iter = self.nodes_by_hash.get(hash).map(|node| self.parents(*node));
std::iter::from_fn(move || {
let next = iter.as_mut()?.next()?;
self.hashes.get(next.0 as usize).copied()
})
}
fn parents(&self, node_idx: NodeIdx) -> impl Iterator<Item = NodeIdx> + '_ {
let mut edge_idx = self.parents[node_idx.0 as usize];
std::iter::from_fn(move || {
let this_edge_idx = edge_idx?;
let edge = &self.edges[this_edge_idx.get()];
edge_idx = edge.next;
Some(edge.target)
})
}
fn heads_to_nodes(&self, heads: &[ChangeHash]) -> Vec<NodeIdx> {
heads
.iter()
.filter_map(|h| self.nodes_by_hash.get(h))
.copied()
.collect()
}
pub(crate) fn clock_for_heads(&self, heads: &[ChangeHash]) -> Clock {
let nodes = self.heads_to_nodes(heads);
self.calculate_clock(nodes)
.iter()
.map(|(actor, seq)| {
self.seq_index
.get(actor)
.and_then(|v| v.get(seq?.get() as usize - 1))
.and_then(|i| self.max_ops.get(i.0 as usize))
.copied()
})
.collect()
}
pub(crate) fn seq_clock_for_heads(&self, heads: &[ChangeHash]) -> SeqClock {
let nodes = self.heads_to_nodes(heads);
self.calculate_clock(nodes)
}
fn clock_data_for(&self, idx: NodeIdx) -> Option<u32> {
Some(*self.seq.get(idx.0 as usize)?)
}
fn calculate_clock(&self, nodes: Vec<NodeIdx>) -> SeqClock {
let mut clock = SeqClock::new(self.num_actors());
let mut to_visit = nodes.into_iter().collect::<BTreeSet<_>>();
self.calculate_clock_inner(&mut clock, &mut to_visit, usize::MAX);
assert!(to_visit.is_empty());
clock
}
fn calculate_clock_inner(
&self,
clock: &mut SeqClock,
to_visit: &mut BTreeSet<NodeIdx>,
limit: usize,
) {
let mut visited = BTreeSet::new();
while let Some(idx) = to_visit.pop_last() {
assert!(!visited.contains(&idx));
assert!(visited.len() <= self.hashes.len());
visited.insert(idx);
let actor = self.actors[idx.0 as usize];
let data = self.clock_data_for(idx);
clock.include(actor.into(), data);
if let Some(cached) = self.clock_cache.get(&idx) {
SeqClock::merge(clock, cached);
} else {
to_visit.extend(self.parents(idx).filter(|p| !visited.contains(p)));
if visited.len() > limit {
break;
}
}
}
}
pub(crate) fn remove_ancestors(
&self,
changes: &mut BTreeSet<ChangeHash>,
heads: &[ChangeHash],
) {
let nodes = self.heads_to_nodes(heads);
self.traverse_ancestors(nodes, |idx| {
let hash = &self.hashes[idx.0 as usize];
changes.remove(hash);
true
});
}
fn traverse_ancestors<F: FnMut(NodeIdx) -> bool>(&self, mut to_visit: Vec<NodeIdx>, mut f: F) {
let mut visited = BTreeSet::new();
while let Some(idx) = to_visit.pop() {
if visited.contains(&idx) {
continue;
} else {
visited.insert(idx);
}
if f(idx) {
to_visit.extend(self.parents(idx));
}
}
}
}
fn as_num_deps(num: usize) -> Option<Cow<'static, u64>> {
Some(Cow::Owned(num as u64))
}
fn as_seq(seq: &u32) -> Option<Cow<'_, i64>> {
Some(Cow::Owned(*seq as i64))
}
fn as_actor(actor_index: &ActorIdx) -> Option<Cow<'_, ActorIdx>> {
Some(Cow::Borrowed(actor_index))
}
fn as_max_op(m: &u32) -> Option<Cow<'_, i64>> {
Some(Cow::Owned(*m as i64))
}
fn as_deps(n: NodeIdx) -> Option<Cow<'static, i64>> {
Some(Cow::Owned(n.0 as i64))
}
#[derive(Debug, thiserror::Error)]
#[error("attempted to derive a clock for a change with dependencies we don't have")]
pub struct MissingDep(ChangeHash);
#[cfg(test)]
mod tests {
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
};
use crate::{
op_set2::{change::build_change, op_set::ResolvedAction, OpSet, TxOp},
types::{ObjMeta, OpId, OpType},
ActorId, TextEncoding,
};
use super::*;
#[test]
fn clock_by_heads() {
let mut builder = TestGraphBuilder::new();
let actor1 = builder.actor();
let actor2 = builder.actor();
let actor3 = builder.actor();
let change1 = builder.change(&actor1, 10, &[]);
let change2 = builder.change(&actor2, 20, &[change1]);
let change3 = builder.change(&actor3, 30, &[change1]);
let change4 = builder.change(&actor1, 10, &[change2, change3]);
let graph = builder.build();
let mut expected_clock = SeqClock::new(3);
expected_clock.include(builder.index(&actor1), Some(2));
expected_clock.include(builder.index(&actor2), Some(1));
expected_clock.include(builder.index(&actor3), Some(1));
let clock = graph.seq_clock_for_heads(&[change4]);
assert_eq!(clock, expected_clock);
}
#[test]
fn remove_ancestors() {
let mut builder = TestGraphBuilder::new();
let actor1 = builder.actor();
let actor2 = builder.actor();
let actor3 = builder.actor();
let change1 = builder.change(&actor1, 10, &[]);
let change2 = builder.change(&actor2, 20, &[change1]);
let change3 = builder.change(&actor3, 30, &[change1]);
let change4 = builder.change(&actor1, 10, &[change2, change3]);
let graph = builder.build();
let mut changes = vec![change1, change2, change3, change4]
.into_iter()
.collect::<BTreeSet<_>>();
let heads = vec![change2];
graph.remove_ancestors(&mut changes, &heads);
let expected_changes = vec![change3, change4].into_iter().collect::<BTreeSet<_>>();
assert_eq!(changes, expected_changes);
}
struct TestGraphBuilder {
actors: Vec<ActorId>,
changes: Vec<Change>,
graph: ChangeGraph,
seqs_by_actor: BTreeMap<ActorId, u64>,
}
impl TestGraphBuilder {
fn new() -> Self {
TestGraphBuilder {
actors: Vec::new(),
changes: Vec::new(),
graph: ChangeGraph::new(0),
seqs_by_actor: BTreeMap::new(),
}
}
fn actor(&mut self) -> ActorId {
let actor = ActorId::random();
self.graph.insert_actor(self.actors.len());
self.actors.push(actor.clone());
actor
}
fn index(&self, actor: &ActorId) -> usize {
self.actors.iter().position(|a| a == actor).unwrap()
}
fn change(
&mut self,
actor: &ActorId,
num_new_ops: usize,
parents: &[ChangeHash],
) -> ChangeHash {
let osd = OpSet::from_actors(self.actors.clone(), TextEncoding::platform_default());
let start_op = parents
.iter()
.map(|c| {
self.changes
.iter()
.find(|change| change.hash() == *c)
.unwrap()
.max_op()
})
.max()
.unwrap_or(0)
+ 1;
let actor_idx = self.index(actor);
let ops = (0..num_new_ops)
.map(|opnum| {
TxOp::map(
OpId::new(start_op + opnum as u64, actor_idx),
ObjMeta::root(),
0,
ResolvedAction::VisibleUpdate(OpType::Put("value".into())),
"key".to_string(),
vec![],
)
})
.collect::<Vec<_>>();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let seq = self.seqs_by_actor.entry(actor.clone()).or_insert(1);
let meta = BuildChangeMetadata {
actor: actor_idx,
builder: 0,
deps: parents
.iter()
.map(|h| self.graph.hash_to_index(h).unwrap() as u64)
.collect(),
seq: *seq,
max_op: start_op + ops.len() as u64 - 1,
start_op,
timestamp,
message: None,
extra: Cow::Owned(vec![]),
};
let change = Change::new(build_change(&ops, &meta, &self.graph, &osd.actors));
*seq = seq.checked_add(1).unwrap();
let hash = change.hash();
self.graph.add_change(&change, actor_idx).unwrap();
self.changes.push(change);
hash
}
fn build(&self) -> ChangeGraph {
let mut graph = ChangeGraph::new(self.actors.len());
for change in &self.changes {
let actor_idx = self.index(change.actor_id());
graph.add_change(change, actor_idx).unwrap();
}
graph
}
}
}