use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::Debug;
use std::num::NonZeroU64;
use std::ops::RangeBounds;
use crate::clock::ClockData;
use crate::clocks::Clocks;
use crate::columnar::Key as EncodedKey;
use crate::exid::ExId;
use crate::keys::Keys;
use crate::op_observer::OpObserver;
use crate::op_set::OpSet;
use crate::parents::Parents;
use crate::storage::{self, load, CompressConfig};
use crate::transaction::{
self, CommitOptions, Failure, Observed, Success, Transaction, TransactionInner, UnObserved,
};
use crate::types::{
ActorId, ChangeHash, Clock, ElemId, Export, Exportable, Key, ObjId, Op, OpId, OpType,
ScalarValue, Value,
};
use crate::{
query, AutomergeError, Change, KeysAt, ListRange, ListRangeAt, MapRange, MapRangeAt, ObjType,
Prop, Values,
};
use serde::Serialize;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum Actor {
Unused(ActorId),
Cached(usize),
}
#[derive(Debug, Clone)]
pub struct Automerge {
pub(crate) queue: Vec<Change>,
pub(crate) history: Vec<Change>,
pub(crate) history_index: HashMap<ChangeHash, usize>,
pub(crate) clocks: HashMap<ChangeHash, Clock>,
pub(crate) states: HashMap<usize, Vec<usize>>,
pub(crate) deps: HashSet<ChangeHash>,
pub(crate) saved: Vec<ChangeHash>,
pub(crate) ops: OpSet,
pub(crate) actor: Actor,
pub(crate) max_op: u64,
}
impl Automerge {
pub fn new() -> Self {
Automerge {
queue: vec![],
history: vec![],
history_index: HashMap::new(),
clocks: HashMap::new(),
states: HashMap::new(),
ops: Default::default(),
deps: Default::default(),
saved: Default::default(),
actor: Actor::Unused(ActorId::random()),
max_op: 0,
}
}
pub fn with_actor(mut self, actor: ActorId) -> Self {
self.actor = Actor::Unused(actor);
self
}
pub fn set_actor(&mut self, actor: ActorId) -> &mut Self {
self.actor = Actor::Unused(actor);
self
}
pub fn get_actor(&self) -> &ActorId {
match &self.actor {
Actor::Unused(actor) => actor,
Actor::Cached(index) => self.ops.m.actors.get(*index),
}
}
pub(crate) fn get_actor_index(&mut self) -> usize {
match &mut self.actor {
Actor::Unused(actor) => {
let index = self
.ops
.m
.actors
.cache(std::mem::replace(actor, ActorId::from(&[][..])));
self.actor = Actor::Cached(index);
index
}
Actor::Cached(index) => *index,
}
}
pub fn transaction(&mut self) -> Transaction<'_, UnObserved> {
Transaction {
inner: Some(self.transaction_inner()),
doc: self,
observation: Some(UnObserved),
}
}
pub fn transaction_with_observer<Obs: OpObserver>(
&mut self,
op_observer: Obs,
) -> Transaction<'_, Observed<Obs>> {
Transaction {
inner: Some(self.transaction_inner()),
doc: self,
observation: Some(Observed::new(op_observer)),
}
}
pub(crate) fn transaction_inner(&mut self) -> TransactionInner {
let actor = self.get_actor_index();
let seq = self.states.get(&actor).map_or(0, |v| v.len()) as u64 + 1;
let mut deps = self.get_heads();
if seq > 1 {
let last_hash = self.get_hash(actor, seq - 1).unwrap();
if !deps.contains(&last_hash) {
deps.push(last_hash);
}
}
TransactionInner {
actor,
seq,
start_op: NonZeroU64::new(self.max_op + 1).unwrap(),
time: 0,
message: None,
operations: vec![],
deps,
}
}
pub fn transact<F, O, E>(&mut self, f: F) -> transaction::Result<O, (), E>
where
F: FnOnce(&mut Transaction<'_, UnObserved>) -> Result<O, E>,
{
self.transact_with_impl(None::<&dyn Fn(&O) -> CommitOptions>, f)
}
pub fn transact_with<F, O, E, C>(&mut self, c: C, f: F) -> transaction::Result<O, (), E>
where
F: FnOnce(&mut Transaction<'_, UnObserved>) -> Result<O, E>,
C: FnOnce(&O) -> CommitOptions,
{
self.transact_with_impl(Some(c), f)
}
fn transact_with_impl<F, O, E, C>(
&mut self,
c: Option<C>,
f: F,
) -> transaction::Result<O, (), E>
where
F: FnOnce(&mut Transaction<'_, UnObserved>) -> Result<O, E>,
C: FnOnce(&O) -> CommitOptions,
{
let mut tx = self.transaction();
let result = f(&mut tx);
match result {
Ok(result) => {
let hash = if let Some(c) = c {
let commit_options = c(&result);
tx.commit_with(commit_options)
} else {
tx.commit()
};
Ok(Success {
result,
hash,
op_observer: (),
})
}
Err(error) => Err(Failure {
error,
cancelled: tx.rollback(),
}),
}
}
pub fn transact_observed<F, O, E, Obs>(&mut self, f: F) -> transaction::Result<O, Obs, E>
where
F: FnOnce(&mut Transaction<'_, Observed<Obs>>) -> Result<O, E>,
Obs: OpObserver + Default,
{
self.transact_observed_with_impl(None::<&dyn Fn(&O) -> CommitOptions>, f)
}
pub fn transact_observed_with<F, O, E, C, Obs>(
&mut self,
c: C,
f: F,
) -> transaction::Result<O, Obs, E>
where
F: FnOnce(&mut Transaction<'_, Observed<Obs>>) -> Result<O, E>,
C: FnOnce(&O) -> CommitOptions,
Obs: OpObserver + Default,
{
self.transact_observed_with_impl(Some(c), f)
}
fn transact_observed_with_impl<F, O, Obs, E, C>(
&mut self,
c: Option<C>,
f: F,
) -> transaction::Result<O, Obs, E>
where
F: FnOnce(&mut Transaction<'_, Observed<Obs>>) -> Result<O, E>,
C: FnOnce(&O) -> CommitOptions,
Obs: OpObserver + Default,
{
let observer = Obs::default();
let mut tx = self.transaction_with_observer(observer);
let result = f(&mut tx);
match result {
Ok(result) => {
let (obs, hash) = if let Some(c) = c {
let commit_options = c(&result);
tx.commit_with(commit_options)
} else {
tx.commit()
};
Ok(Success {
result,
hash,
op_observer: obs,
})
}
Err(error) => Err(Failure {
error,
cancelled: tx.rollback(),
}),
}
}
pub fn fork(&self) -> Self {
let mut f = self.clone();
f.set_actor(ActorId::random());
f
}
pub fn fork_at(&self, heads: &[ChangeHash]) -> Result<Self, AutomergeError> {
let mut seen = heads.iter().cloned().collect::<HashSet<_>>();
let mut heads = heads.to_vec();
let mut changes = vec![];
while let Some(hash) = heads.pop() {
if let Some(idx) = self.history_index.get(&hash) {
let change = &self.history[*idx];
for dep in change.deps() {
if !seen.contains(dep) {
heads.push(*dep);
}
}
changes.push(change);
seen.insert(hash);
} else {
return Err(AutomergeError::InvalidHash(hash));
}
}
let mut f = Self::new();
f.set_actor(ActorId::random());
f.apply_changes(changes.into_iter().rev().cloned())?;
Ok(f)
}
pub fn parents<O: AsRef<ExId>>(&self, obj: O) -> Result<Parents<'_>, AutomergeError> {
let obj_id = self.exid_to_obj(obj.as_ref())?;
Ok(self.ops.parents(obj_id))
}
pub fn path_to_object<O: AsRef<ExId>>(
&self,
obj: O,
) -> Result<Vec<(ExId, Prop)>, AutomergeError> {
let mut path = self.parents(obj.as_ref().clone())?.collect::<Vec<_>>();
path.reverse();
Ok(path)
}
pub fn keys<O: AsRef<ExId>>(&self, obj: O) -> Keys<'_, '_> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
let iter_keys = self.ops.keys(obj);
Keys::new(self, iter_keys)
} else {
Keys::new(self, None)
}
}
pub fn keys_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> KeysAt<'_, '_> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return KeysAt::new(self, self.ops.keys_at(obj, clock));
}
}
KeysAt::new(self, None)
}
pub fn map_range<O: AsRef<ExId>, R: RangeBounds<String>>(
&self,
obj: O,
range: R,
) -> MapRange<'_, R> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
MapRange::new(self, self.ops.map_range(obj, range))
} else {
MapRange::new(self, None)
}
}
pub fn map_range_at<O: AsRef<ExId>, R: RangeBounds<String>>(
&self,
obj: O,
range: R,
heads: &[ChangeHash],
) -> MapRangeAt<'_, R> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
let iter_range = self.ops.map_range_at(obj, range, clock);
return MapRangeAt::new(self, iter_range);
}
}
MapRangeAt::new(self, None)
}
pub fn list_range<O: AsRef<ExId>, R: RangeBounds<usize>>(
&self,
obj: O,
range: R,
) -> ListRange<'_, R> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
ListRange::new(self, self.ops.list_range(obj, range))
} else {
ListRange::new(self, None)
}
}
pub fn list_range_at<O: AsRef<ExId>, R: RangeBounds<usize>>(
&self,
obj: O,
range: R,
heads: &[ChangeHash],
) -> ListRangeAt<'_, R> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
let iter_range = self.ops.list_range_at(obj, range, clock);
return ListRangeAt::new(self, iter_range);
}
}
ListRangeAt::new(self, None)
}
pub fn values<O: AsRef<ExId>>(&self, obj: O) -> Values<'_> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
match self.ops.object_type(&obj) {
Some(t) if t.is_sequence() => Values::new(self, self.ops.list_range(obj, ..)),
Some(_) => Values::new(self, self.ops.map_range(obj, ..)),
None => Values::empty(self),
}
} else {
Values::empty(self)
}
}
pub fn values_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> Values<'_> {
if let Ok(obj) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return match self.ops.object_type(&obj) {
Some(ObjType::Map) | Some(ObjType::Table) => {
let iter_range = self.ops.map_range_at(obj, .., clock);
Values::new(self, iter_range)
}
Some(ObjType::List) | Some(ObjType::Text) => {
let iter_range = self.ops.list_range_at(obj, .., clock);
Values::new(self, iter_range)
}
None => Values::empty(self),
};
}
}
Values::empty(self)
}
pub fn length<O: AsRef<ExId>>(&self, obj: O) -> usize {
if let Ok(inner_obj) = self.exid_to_obj(obj.as_ref()) {
match self.ops.object_type(&inner_obj) {
Some(ObjType::Map) | Some(ObjType::Table) => self.keys(obj).count(),
Some(ObjType::List) | Some(ObjType::Text) => {
self.ops.search(&inner_obj, query::Len::new()).len
}
None => 0,
}
} else {
0
}
}
pub fn length_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> usize {
if let Ok(inner_obj) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return match self.ops.object_type(&inner_obj) {
Some(ObjType::Map) | Some(ObjType::Table) => self.keys_at(obj, heads).count(),
Some(ObjType::List) | Some(ObjType::Text) => {
self.ops.search(&inner_obj, query::LenAt::new(clock)).len
}
None => 0,
};
}
}
0
}
pub fn object_type<O: AsRef<ExId>>(&self, obj: O) -> Option<ObjType> {
let obj = self.exid_to_obj(obj.as_ref()).ok()?;
self.ops.object_type(&obj)
}
pub(crate) fn exid_to_obj(&self, id: &ExId) -> Result<ObjId, AutomergeError> {
match id {
ExId::Root => Ok(ObjId::root()),
ExId::Id(ctr, actor, idx) => {
let obj = if self.ops.m.actors.cache.get(*idx) == Some(actor) {
ObjId(OpId(*ctr, *idx))
} else {
let idx = self
.ops
.m
.actors
.lookup(actor)
.ok_or(AutomergeError::Fail)?;
ObjId(OpId(*ctr, idx))
};
if self.ops.object_type(&obj).is_some() {
Ok(obj)
} else {
Err(AutomergeError::NotAnObject)
}
}
}
}
pub(crate) fn id_to_exid(&self, id: OpId) -> ExId {
self.ops.id_to_exid(id)
}
pub fn text<O: AsRef<ExId>>(&self, obj: O) -> Result<String, AutomergeError> {
let obj = self.exid_to_obj(obj.as_ref())?;
let query = self.ops.search(&obj, query::ListVals::new());
let mut buffer = String::new();
for q in &query.ops {
if let OpType::Put(ScalarValue::Str(s)) = &q.action {
buffer.push_str(s);
} else {
buffer.push('\u{fffc}');
}
}
Ok(buffer)
}
pub fn text_at<O: AsRef<ExId>>(
&self,
obj: O,
heads: &[ChangeHash],
) -> Result<String, AutomergeError> {
let obj = self.exid_to_obj(obj.as_ref())?;
let clock = self.clock_at(heads)?;
let query = self.ops.search(&obj, query::ListValsAt::new(clock));
let mut buffer = String::new();
for q in &query.ops {
if let OpType::Put(ScalarValue::Str(s)) = &q.action {
buffer.push_str(s);
} else {
buffer.push('\u{fffc}');
}
}
Ok(buffer)
}
pub fn get<O: AsRef<ExId>, P: Into<Prop>>(
&self,
obj: O,
prop: P,
) -> Result<Option<(Value<'_>, ExId)>, AutomergeError> {
Ok(self.get_all(obj, prop.into())?.last().cloned())
}
pub fn get_at<O: AsRef<ExId>, P: Into<Prop>>(
&self,
obj: O,
prop: P,
heads: &[ChangeHash],
) -> Result<Option<(Value<'_>, ExId)>, AutomergeError> {
Ok(self.get_all_at(obj, prop, heads)?.last().cloned())
}
pub fn get_all<O: AsRef<ExId>, P: Into<Prop>>(
&self,
obj: O,
prop: P,
) -> Result<Vec<(Value<'_>, ExId)>, AutomergeError> {
let obj = self.exid_to_obj(obj.as_ref())?;
let mut result = match prop.into() {
Prop::Map(p) => {
let prop = self.ops.m.props.lookup(&p);
if let Some(p) = prop {
self.ops
.search(&obj, query::Prop::new(p))
.ops
.into_iter()
.map(|o| (o.value(), self.id_to_exid(o.id)))
.collect()
} else {
vec![]
}
}
Prop::Seq(n) => self
.ops
.search(&obj, query::Nth::new(n))
.ops
.into_iter()
.map(|o| (o.value(), self.id_to_exid(o.id)))
.collect(),
};
result.sort_by(|a, b| b.1.cmp(&a.1));
Ok(result)
}
pub fn get_all_at<O: AsRef<ExId>, P: Into<Prop>>(
&self,
obj: O,
prop: P,
heads: &[ChangeHash],
) -> Result<Vec<(Value<'_>, ExId)>, AutomergeError> {
let prop = prop.into();
let obj = self.exid_to_obj(obj.as_ref())?;
let clock = self.clock_at(heads)?;
let result = match prop {
Prop::Map(p) => {
let prop = self.ops.m.props.lookup(&p);
if let Some(p) = prop {
self.ops
.search(&obj, query::PropAt::new(p, clock))
.ops
.into_iter()
.map(|o| (o.clone_value(), self.id_to_exid(o.id)))
.collect()
} else {
vec![]
}
}
Prop::Seq(n) => self
.ops
.search(&obj, query::NthAt::new(n, clock))
.ops
.into_iter()
.map(|o| (o.clone_value(), self.id_to_exid(o.id)))
.collect(),
};
Ok(result)
}
pub fn load(data: &[u8]) -> Result<Self, AutomergeError> {
Self::load_with::<()>(data, None)
}
#[tracing::instrument(skip(data, observer), err)]
pub fn load_with<Obs: OpObserver>(
data: &[u8],
mut observer: Option<&mut Obs>,
) -> Result<Self, AutomergeError> {
if data.is_empty() {
tracing::trace!("no data, initializing empty document");
return Ok(Self::new());
}
tracing::trace!("loading first chunk");
let (remaining, first_chunk) = storage::Chunk::parse(storage::parse::Input::new(data))
.map_err(|e| load::Error::Parse(Box::new(e)))?;
if !first_chunk.checksum_valid() {
return Err(load::Error::BadChecksum.into());
}
let mut am = match first_chunk {
storage::Chunk::Document(d) => {
tracing::trace!("first chunk is document chunk, inflating");
let storage::load::Reconstructed {
max_op,
result: op_set,
changes,
heads,
} = match &mut observer {
Some(o) => storage::load::reconstruct_document(&d, OpSet::observed_builder(*o)),
None => storage::load::reconstruct_document(&d, OpSet::builder()),
}
.map_err(|e| load::Error::InflateDocument(Box::new(e)))?;
let mut hashes_by_index = HashMap::new();
let mut actor_to_history: HashMap<usize, Vec<usize>> = HashMap::new();
let mut clocks = Clocks::new();
for (index, change) in changes.iter().enumerate() {
let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap();
actor_to_history.entry(actor_index).or_default().push(index);
hashes_by_index.insert(index, change.hash());
clocks.add_change(change, actor_index)?;
}
let history_index = hashes_by_index.into_iter().map(|(k, v)| (v, k)).collect();
Self {
queue: vec![],
history: changes,
history_index,
states: actor_to_history,
clocks: clocks.into(),
ops: op_set,
deps: heads.into_iter().collect(),
saved: Default::default(),
actor: Actor::Unused(ActorId::random()),
max_op,
}
}
storage::Chunk::Change(stored_change) => {
tracing::trace!("first chunk is change chunk, applying");
let change = Change::new_from_unverified(stored_change.into_owned(), None)
.map_err(|e| load::Error::InvalidChangeColumns(Box::new(e)))?;
let mut am = Self::new();
am.apply_change(change, &mut observer);
am
}
storage::Chunk::CompressedChange(stored_change, compressed) => {
tracing::trace!("first chunk is compressed change, decompressing and applying");
let change = Change::new_from_unverified(
stored_change.into_owned(),
Some(compressed.into_owned()),
)
.map_err(|e| load::Error::InvalidChangeColumns(Box::new(e)))?;
let mut am = Self::new();
am.apply_change(change, &mut observer);
am
}
};
tracing::trace!("first chunk loaded, loading remaining chunks");
match load::load_changes(remaining.reset()) {
load::LoadedChanges::Complete(c) => {
for change in c {
am.apply_change(change, &mut observer);
}
}
load::LoadedChanges::Partial { error, .. } => return Err(error.into()),
}
Ok(am)
}
pub fn load_incremental(&mut self, data: &[u8]) -> Result<usize, AutomergeError> {
self.load_incremental_with::<()>(data, None)
}
pub fn load_incremental_with<Obs: OpObserver>(
&mut self,
data: &[u8],
op_observer: Option<&mut Obs>,
) -> Result<usize, AutomergeError> {
let changes = match load::load_changes(storage::parse::Input::new(data)) {
load::LoadedChanges::Complete(c) => c,
load::LoadedChanges::Partial { error, loaded, .. } => {
tracing::warn!(successful_chunks=loaded.len(), err=?error, "partial load");
loaded
}
};
let start = self.ops.len();
self.apply_changes_with(changes, op_observer)?;
let delta = self.ops.len() - start;
Ok(delta)
}
fn duplicate_seq(&self, change: &Change) -> bool {
let mut dup = false;
if let Some(actor_index) = self.ops.m.actors.lookup(change.actor_id()) {
if let Some(s) = self.states.get(&actor_index) {
dup = s.len() >= change.seq() as usize;
}
}
dup
}
pub fn apply_changes(
&mut self,
changes: impl IntoIterator<Item = Change>,
) -> Result<(), AutomergeError> {
self.apply_changes_with::<_, ()>(changes, None)
}
pub fn apply_changes_with<I: IntoIterator<Item = Change>, Obs: OpObserver>(
&mut self,
changes: I,
mut op_observer: Option<&mut Obs>,
) -> Result<(), AutomergeError> {
for c in changes {
if !self.history_index.contains_key(&c.hash()) {
if self.duplicate_seq(&c) {
return Err(AutomergeError::DuplicateSeqNumber(
c.seq(),
c.actor_id().clone(),
));
}
if self.is_causally_ready(&c) {
self.apply_change(c, &mut op_observer);
} else {
self.queue.push(c);
}
}
}
while let Some(c) = self.pop_next_causally_ready_change() {
if !self.history_index.contains_key(&c.hash()) {
self.apply_change(c, &mut op_observer);
}
}
Ok(())
}
fn apply_change<Obs: OpObserver>(&mut self, change: Change, observer: &mut Option<&mut Obs>) {
let ops = self.import_ops(&change);
self.update_history(change, ops.len());
if let Some(observer) = observer {
for (obj, op) in ops {
self.ops.insert_op_with_observer(&obj, op, *observer);
}
} else {
for (obj, op) in ops {
self.ops.insert_op(&obj, op);
}
}
}
fn is_causally_ready(&self, change: &Change) -> bool {
change
.deps()
.iter()
.all(|d| self.history_index.contains_key(d))
}
fn pop_next_causally_ready_change(&mut self) -> Option<Change> {
let mut index = 0;
while index < self.queue.len() {
if self.is_causally_ready(&self.queue[index]) {
return Some(self.queue.swap_remove(index));
}
index += 1;
}
None
}
fn import_ops(&mut self, change: &Change) -> Vec<(ObjId, Op)> {
let actor = self.ops.m.actors.cache(change.actor_id().clone());
let mut actors = Vec::with_capacity(change.other_actor_ids().len() + 1);
actors.push(actor);
actors.extend(
change
.other_actor_ids()
.iter()
.map(|a| self.ops.m.actors.cache(a.clone()))
.collect::<Vec<_>>(),
);
change
.iter_ops()
.enumerate()
.map(|(i, c)| {
let id = OpId(change.start_op().get() + i as u64, actor);
let key = match &c.key {
EncodedKey::Prop(n) => Key::Map(self.ops.m.props.cache(n.to_string())),
EncodedKey::Elem(e) if e.is_head() => Key::Seq(ElemId::head()),
EncodedKey::Elem(ElemId(o)) => {
Key::Seq(ElemId(OpId::new(actors[o.actor()], o.counter())))
}
};
let obj = if c.obj.is_root() {
ObjId::root()
} else {
ObjId(OpId(c.obj.opid().counter(), actors[c.obj.opid().actor()]))
};
let pred = c
.pred
.iter()
.map(|p| OpId::new(actors[p.actor()], p.counter()));
let pred = self.ops.m.sorted_opids(pred);
(
obj,
Op {
id,
action: OpType::from_index_and_value(c.action, c.val).unwrap(),
key,
succ: Default::default(),
pred,
insert: c.insert,
},
)
})
.collect()
}
pub fn merge(&mut self, other: &mut Self) -> Result<Vec<ChangeHash>, AutomergeError> {
self.merge_with::<()>(other, None)
}
pub fn merge_with<Obs: OpObserver>(
&mut self,
other: &mut Self,
op_observer: Option<&mut Obs>,
) -> Result<Vec<ChangeHash>, AutomergeError> {
let changes = self
.get_changes_added(other)
.into_iter()
.cloned()
.collect::<Vec<_>>();
tracing::trace!(changes=?changes.iter().map(|c| c.hash()).collect::<Vec<_>>(), "merging new changes");
self.apply_changes_with(changes, op_observer)?;
Ok(self.get_heads())
}
pub fn save(&mut self) -> Vec<u8> {
let heads = self.get_heads();
let c = self.history.iter();
let bytes = crate::storage::save::save_document(
c,
self.ops.iter(),
&self.ops.m.actors,
&self.ops.m.props,
&heads,
None,
);
self.saved = self.get_heads();
bytes
}
pub fn save_nocompress(&mut self) -> Vec<u8> {
let heads = self.get_heads();
let c = self.history.iter();
let bytes = crate::storage::save::save_document(
c,
self.ops.iter(),
&self.ops.m.actors,
&self.ops.m.props,
&heads,
Some(CompressConfig::None),
);
self.saved = self.get_heads();
bytes
}
pub fn save_incremental(&mut self) -> Vec<u8> {
let changes = self
.get_changes(self.saved.as_slice())
.expect("Should only be getting changes using previously saved heads");
let mut bytes = vec![];
for c in changes {
bytes.extend(c.raw_bytes());
}
if !bytes.is_empty() {
self.saved = self.get_heads()
}
bytes
}
pub(crate) fn filter_changes(
&self,
heads: &[ChangeHash],
changes: &mut BTreeSet<ChangeHash>,
) -> Result<(), AutomergeError> {
let heads = heads
.iter()
.filter(|hash| self.history_index.contains_key(hash))
.copied()
.collect::<Vec<_>>();
let heads_clock = self.clock_at(&heads)?;
changes.retain(|hash| {
self.clocks
.get(hash)
.unwrap()
.partial_cmp(&heads_clock)
.map_or(true, |o| o == Ordering::Greater)
});
Ok(())
}
pub fn get_missing_deps(&self, heads: &[ChangeHash]) -> Vec<ChangeHash> {
let in_queue: HashSet<_> = self.queue.iter().map(|change| change.hash()).collect();
let mut missing = HashSet::new();
for head in self.queue.iter().flat_map(|change| change.deps()) {
if !self.history_index.contains_key(head) {
missing.insert(head);
}
}
for head in heads {
if !self.history_index.contains_key(head) {
missing.insert(head);
}
}
let mut missing = missing
.into_iter()
.filter(|hash| !in_queue.contains(hash))
.copied()
.collect::<Vec<_>>();
missing.sort();
missing
}
fn get_changes_clock(&self, have_deps: &[ChangeHash]) -> Result<Vec<&Change>, AutomergeError> {
let clock = self.clock_at(have_deps)?;
let mut change_indexes: Vec<usize> = Vec::new();
for (actor_index, actor_changes) in &self.states {
if let Some(clock_data) = clock.get_for_actor(actor_index) {
change_indexes.extend(&actor_changes[clock_data.seq as usize..]);
} else {
change_indexes.extend(&actor_changes[..]);
}
}
change_indexes.sort_unstable();
Ok(change_indexes
.into_iter()
.map(|i| &self.history[i])
.collect())
}
pub fn get_changes(&self, have_deps: &[ChangeHash]) -> Result<Vec<&Change>, AutomergeError> {
self.get_changes_clock(have_deps)
}
pub fn get_last_local_change(&self) -> Option<&Change> {
return self
.history
.iter()
.rev()
.find(|c| c.actor_id() == self.get_actor());
}
fn clock_at(&self, heads: &[ChangeHash]) -> Result<Clock, AutomergeError> {
if let Some(first_hash) = heads.first() {
let mut clock = self
.clocks
.get(first_hash)
.ok_or(AutomergeError::MissingHash(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self
.clocks
.get(hash)
.ok_or(AutomergeError::MissingHash(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
}
pub fn get_change_by_hash(&self, hash: &ChangeHash) -> Option<&Change> {
self.history_index
.get(hash)
.and_then(|index| self.history.get(*index))
}
#[tracing::instrument(skip(self, other))]
pub fn get_changes_added<'a>(&self, other: &'a Self) -> Vec<&'a Change> {
let mut stack: Vec<_> = other.get_heads();
tracing::trace!(their_heads=?stack, "finding changes to merge");
let mut seen_hashes = HashSet::new();
let mut added_change_hashes = Vec::new();
while let Some(hash) = stack.pop() {
if !seen_hashes.contains(&hash) && self.get_change_by_hash(&hash).is_none() {
seen_hashes.insert(hash);
added_change_hashes.push(hash);
if let Some(change) = other.get_change_by_hash(&hash) {
stack.extend(change.deps());
}
}
}
added_change_hashes.reverse();
added_change_hashes
.into_iter()
.filter_map(|h| other.get_change_by_hash(&h))
.collect()
}
pub fn get_heads(&self) -> Vec<ChangeHash> {
let mut deps: Vec<_> = self.deps.iter().copied().collect();
deps.sort_unstable();
deps
}
fn get_hash(&self, actor: usize, seq: u64) -> Result<ChangeHash, AutomergeError> {
self.states
.get(&actor)
.and_then(|v| v.get(seq as usize - 1))
.and_then(|&i| self.history.get(i))
.map(|c| c.hash())
.ok_or(AutomergeError::InvalidSeq(seq))
}
pub(crate) fn update_history(&mut self, change: Change, num_ops: usize) -> usize {
self.max_op = std::cmp::max(self.max_op, change.start_op().get() + num_ops as u64 - 1);
self.update_deps(&change);
let history_index = self.history.len();
let actor_index = self.ops.m.actors.cache(change.actor_id().clone());
self.states
.entry(actor_index)
.or_default()
.push(history_index);
self.history_index.insert(change.hash(), history_index);
let mut clock = Clock::new();
for hash in change.deps() {
let c = self
.clocks
.get(hash)
.expect("Change's deps should already be in the document");
clock.merge(c);
}
clock.include(
actor_index,
ClockData {
max_op: change.max_op(),
seq: change.seq(),
},
);
self.clocks.insert(change.hash(), clock);
self.history_index.insert(change.hash(), history_index);
self.history.push(change);
history_index
}
fn update_deps(&mut self, change: &Change) {
for d in change.deps() {
self.deps.remove(d);
}
self.deps.insert(change.hash());
}
pub fn import(&self, s: &str) -> Result<ExId, AutomergeError> {
if s == "_root" {
Ok(ExId::Root)
} else {
let n = s
.find('@')
.ok_or_else(|| AutomergeError::InvalidObjIdFormat(s.to_owned()))?;
let counter = s[0..n]
.parse()
.map_err(|_| AutomergeError::InvalidObjIdFormat(s.to_owned()))?;
let actor = ActorId::from(hex::decode(&s[(n + 1)..]).unwrap());
let actor = self
.ops
.m
.actors
.lookup(&actor)
.ok_or_else(|| AutomergeError::InvalidObjId(s.to_owned()))?;
Ok(ExId::Id(
counter,
self.ops.m.actors.cache[actor].clone(),
actor,
))
}
}
pub(crate) fn to_string<E: Exportable>(&self, id: E) -> String {
match id.export() {
Export::Id(id) => format!("{}@{}", id.counter(), self.ops.m.actors[id.actor()]),
Export::Prop(index) => self.ops.m.props[index].clone(),
Export::Special(s) => s,
}
}
pub fn dump(&self) {
log!(
" {:12} {:12} {:12} {:12} {:12} {:12}",
"id",
"obj",
"key",
"value",
"pred",
"succ"
);
for (obj, op) in self.ops.iter() {
let id = self.to_string(op.id);
let obj = self.to_string(obj);
let key = match op.key {
Key::Map(n) => self.ops.m.props[n].clone(),
Key::Seq(n) => self.to_string(n),
};
let value: String = match &op.action {
OpType::Put(value) => format!("{}", value),
OpType::Make(obj) => format!("make({})", obj),
OpType::Increment(obj) => format!("inc({})", obj),
OpType::Delete => format!("del{}", 0),
};
let pred: Vec<_> = op.pred.iter().map(|id| self.to_string(*id)).collect();
let succ: Vec<_> = op.succ.into_iter().map(|id| self.to_string(*id)).collect();
log!(
" {:12} {:12} {:12} {:12} {:12?} {:12?}",
id,
obj,
key,
value,
pred,
succ
);
}
}
#[cfg(feature = "optree-visualisation")]
pub fn visualise_optree(&self, objects: Option<Vec<ExId>>) -> String {
let objects =
objects.map(|os| os.iter().filter_map(|o| self.exid_to_obj(o).ok()).collect());
self.ops.visualise(objects)
}
}
impl Default for Automerge {
fn default() -> Self {
Self::new()
}
}
#[derive(Serialize, Debug, Clone, PartialEq)]
pub(crate) struct SpanInfo {
pub(crate) id: ExId,
pub(crate) time: i64,
pub(crate) start: usize,
pub(crate) end: usize,
#[serde(rename = "type")]
pub(crate) span_type: String,
pub(crate) value: ScalarValue,
}