automerge 0.2.0

A JSON-like data structure (a CRDT) that can be modified concurrently by different users, and merged again automatically
Documentation
use std::{borrow::Cow, num::NonZeroU64};

use crate::{
    columnar::Key as StoredKey,
    storage::{
        change::{Unverified, Verified},
        parse, Change as StoredChange, ChangeOp, Chunk, Compressed, ReadChangeOpError,
    },
    types::{ActorId, ChangeHash, ElemId},
};

#[derive(Clone, Debug, PartialEq)]
pub struct Change {
    stored: StoredChange<'static, Verified>,
    compression: CompressionState,
    len: usize,
}

impl Change {
    pub(crate) fn new(stored: StoredChange<'static, Verified>) -> Self {
        let len = stored.iter_ops().count();
        Self {
            stored,
            len,
            compression: CompressionState::NotCompressed,
        }
    }

    pub(crate) fn new_from_unverified(
        stored: StoredChange<'static, Unverified>,
        compressed: Option<Compressed<'static>>,
    ) -> Result<Self, ReadChangeOpError> {
        let mut len = 0;
        let stored = stored.verify_ops(|_| len += 1)?;
        let compression = if let Some(c) = compressed {
            CompressionState::Compressed(c)
        } else {
            CompressionState::NotCompressed
        };
        Ok(Self {
            stored,
            len,
            compression,
        })
    }

    pub fn actor_id(&self) -> &ActorId {
        self.stored.actor()
    }

    pub fn other_actor_ids(&self) -> &[ActorId] {
        self.stored.other_actors()
    }

    pub fn len(&self) -> usize {
        self.len
    }

    pub fn is_empty(&self) -> bool {
        self.len == 0
    }

    pub fn max_op(&self) -> u64 {
        self.stored.start_op().get() + (self.len as u64) - 1
    }

    pub fn start_op(&self) -> NonZeroU64 {
        self.stored.start_op()
    }

    pub fn message(&self) -> Option<&String> {
        self.stored.message().as_ref()
    }

    pub fn deps(&self) -> &[ChangeHash] {
        self.stored.dependencies()
    }

    pub fn hash(&self) -> ChangeHash {
        self.stored.hash()
    }

    pub fn seq(&self) -> u64 {
        self.stored.seq()
    }

    pub fn timestamp(&self) -> i64 {
        self.stored.timestamp()
    }

    pub fn bytes(&mut self) -> Cow<'_, [u8]> {
        if let CompressionState::NotCompressed = self.compression {
            if let Some(compressed) = self.stored.compress() {
                self.compression = CompressionState::Compressed(compressed);
            } else {
                self.compression = CompressionState::TooSmallToCompress;
            }
        };
        match &self.compression {
            // SAFETY: We just checked this case above
            CompressionState::NotCompressed => unreachable!(),
            CompressionState::TooSmallToCompress => Cow::Borrowed(self.stored.bytes()),
            CompressionState::Compressed(c) => c.bytes(),
        }
    }

    pub fn raw_bytes(&self) -> &[u8] {
        self.stored.bytes()
    }

    pub(crate) fn iter_ops(&self) -> impl Iterator<Item = ChangeOp> + '_ {
        self.stored.iter_ops()
    }

    pub fn extra_bytes(&self) -> &[u8] {
        self.stored.extra_bytes()
    }

    // TODO replace all uses of this with TryFrom<&[u8]>
    pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, LoadError> {
        Self::try_from(&bytes[..])
    }

    pub fn decode(&self) -> crate::ExpandedChange {
        crate::ExpandedChange::from(self)
    }
}

#[derive(Clone, Debug, PartialEq)]
enum CompressionState {
    /// We haven't tried to compress this change
    NotCompressed,
    /// We have compressed this change
    Compressed(Compressed<'static>),
    /// We tried to compress this change but it wasn't big enough to be worth it
    TooSmallToCompress,
}

impl AsRef<StoredChange<'static, Verified>> for Change {
    fn as_ref(&self) -> &StoredChange<'static, Verified> {
        &self.stored
    }
}

impl From<Change> for StoredChange<'static, Verified> {
    fn from(c: Change) -> Self {
        c.stored
    }
}

#[derive(thiserror::Error, Debug)]
pub enum LoadError {
    #[error("unable to parse change: {0}")]
    Parse(Box<dyn std::error::Error + Send + Sync + 'static>),
    #[error("leftover data after parsing")]
    LeftoverData,
    #[error("wrong chunk type")]
    WrongChunkType,
}

impl<'a> TryFrom<&'a [u8]> for Change {
    type Error = LoadError;

    fn try_from(value: &'a [u8]) -> Result<Self, Self::Error> {
        let input = parse::Input::new(value);
        let (remaining, chunk) = Chunk::parse(input).map_err(|e| LoadError::Parse(Box::new(e)))?;
        if !remaining.is_empty() {
            return Err(LoadError::LeftoverData);
        }
        match chunk {
            Chunk::Change(c) => Self::new_from_unverified(c.into_owned(), None)
                .map_err(|e| LoadError::Parse(Box::new(e))),
            Chunk::CompressedChange(c, compressed) => {
                Self::new_from_unverified(c.into_owned(), Some(compressed.into_owned()))
                    .map_err(|e| LoadError::Parse(Box::new(e)))
            }
            _ => Err(LoadError::WrongChunkType),
        }
    }
}

impl<'a> TryFrom<StoredChange<'a, Unverified>> for Change {
    type Error = ReadChangeOpError;

    fn try_from(c: StoredChange<'a, Unverified>) -> Result<Self, Self::Error> {
        Self::new_from_unverified(c.into_owned(), None)
    }
}

impl From<crate::ExpandedChange> for Change {
    fn from(e: crate::ExpandedChange) -> Self {
        let stored = StoredChange::builder()
            .with_actor(e.actor_id)
            .with_extra_bytes(e.extra_bytes)
            .with_seq(e.seq)
            .with_dependencies(e.deps)
            .with_timestamp(e.time)
            .with_start_op(e.start_op)
            .with_message(e.message)
            .build(e.operations.iter());
        match stored {
            Ok(c) => Change::new(c),
            Err(crate::storage::change::PredOutOfOrder) => {
                // Should never happen because we use `SortedVec` in legacy::Op::pred
                panic!("preds out of order");
            }
        }
    }
}

mod convert_expanded {
    use std::borrow::Cow;

    use crate::{convert, legacy, storage::AsChangeOp, types::ActorId, ScalarValue};

    impl<'a> AsChangeOp<'a> for &'a legacy::Op {
        type ActorId = &'a ActorId;
        type OpId = &'a legacy::OpId;
        type PredIter = std::slice::Iter<'a, legacy::OpId>;

        fn action(&self) -> u64 {
            self.action.action_index()
        }

        fn insert(&self) -> bool {
            self.insert
        }

        fn pred(&self) -> Self::PredIter {
            self.pred.iter()
        }

        fn key(&self) -> convert::Key<'a, Self::OpId> {
            match &self.key {
                legacy::Key::Map(s) => convert::Key::Prop(Cow::Borrowed(s)),
                legacy::Key::Seq(legacy::ElementId::Head) => {
                    convert::Key::Elem(convert::ElemId::Head)
                }
                legacy::Key::Seq(legacy::ElementId::Id(o)) => {
                    convert::Key::Elem(convert::ElemId::Op(o))
                }
            }
        }

        fn obj(&self) -> convert::ObjId<Self::OpId> {
            match &self.obj {
                legacy::ObjectId::Root => convert::ObjId::Root,
                legacy::ObjectId::Id(o) => convert::ObjId::Op(o),
            }
        }

        fn val(&self) -> Cow<'a, crate::ScalarValue> {
            match self.primitive_value() {
                Some(v) => Cow::Owned(v),
                None => Cow::Owned(ScalarValue::Null),
            }
        }
    }

    impl<'a> convert::OpId<&'a ActorId> for &'a legacy::OpId {
        fn counter(&self) -> u64 {
            legacy::OpId::counter(self)
        }

        fn actor(&self) -> &'a ActorId {
            &self.1
        }
    }
}

impl From<&Change> for crate::ExpandedChange {
    fn from(c: &Change) -> Self {
        let actors = std::iter::once(c.actor_id())
            .chain(c.other_actor_ids().iter())
            .cloned()
            .enumerate()
            .collect::<std::collections::HashMap<_, _>>();
        let operations = c
            .iter_ops()
            .map(|o| crate::legacy::Op {
                action: crate::types::OpType::from_index_and_value(o.action, o.val).unwrap(),
                insert: o.insert,
                key: match o.key {
                    StoredKey::Elem(e) if e.is_head() => {
                        crate::legacy::Key::Seq(crate::legacy::ElementId::Head)
                    }
                    StoredKey::Elem(ElemId(o)) => {
                        crate::legacy::Key::Seq(crate::legacy::ElementId::Id(
                            crate::legacy::OpId::new(o.counter(), actors.get(&o.actor()).unwrap()),
                        ))
                    }
                    StoredKey::Prop(p) => crate::legacy::Key::Map(p),
                },
                obj: if o.obj.is_root() {
                    crate::legacy::ObjectId::Root
                } else {
                    crate::legacy::ObjectId::Id(crate::legacy::OpId::new(
                        o.obj.opid().counter(),
                        actors.get(&o.obj.opid().actor()).unwrap(),
                    ))
                },
                pred: o
                    .pred
                    .into_iter()
                    .map(|p| crate::legacy::OpId::new(p.counter(), actors.get(&p.actor()).unwrap()))
                    .collect(),
            })
            .collect::<Vec<_>>();
        crate::ExpandedChange {
            operations,
            actor_id: actors.get(&0).unwrap().clone(),
            hash: Some(c.hash()),
            time: c.timestamp(),
            deps: c.deps().to_vec(),
            seq: c.seq(),
            start_op: c.start_op(),
            extra_bytes: c.extra_bytes().to_vec(),
            message: c.message().cloned(),
        }
    }
}

#[cfg(test)]
pub(crate) mod gen {
    use super::Change;
    use crate::{
        op_tree::OpSetMetadata,
        storage::{change::ChangeBuilder, convert::op_as_actor_id},
        types::{
            gen::{gen_hash, gen_op},
            ObjId, Op, OpId,
        },
        ActorId,
    };
    use proptest::prelude::*;

    fn gen_actor() -> impl Strategy<Value = ActorId> {
        proptest::array::uniform32(proptest::bits::u8::ANY).prop_map(ActorId::from)
    }

    prop_compose! {
        fn gen_actors()(this_actor in gen_actor(), other_actors in proptest::collection::vec(gen_actor(), 0..10)) -> (ActorId, Vec<ActorId>) {
            (this_actor, other_actors)
        }
    }

    fn gen_ops(
        this_actor: ActorId,
        other_actors: Vec<ActorId>,
    ) -> impl Strategy<Value = (Vec<(ObjId, Op)>, OpSetMetadata)> {
        let mut all_actors = vec![this_actor];
        all_actors.extend(other_actors);
        let mut m = OpSetMetadata::from_actors(all_actors);
        m.props.cache("someprop".to_string());
        let root_id = ObjId::root();
        (0_u64..10)
            .prop_map(|num_ops| {
                (0..num_ops)
                    .map(|counter| OpId::new(0, counter))
                    .collect::<Vec<_>>()
            })
            .prop_flat_map(move |opids| {
                let mut strat = Just(Vec::new()).boxed();
                for opid in opids {
                    strat = (gen_op(opid, vec![0]), strat)
                        .prop_map(move |(op, ops)| {
                            let mut result = Vec::with_capacity(ops.len() + 1);
                            result.extend(ops);
                            result.push((root_id, op));
                            result
                        })
                        .boxed();
                }
                strat
            })
            .prop_map(move |ops| (ops, m.clone()))
    }

    prop_compose! {
        pub(crate) fn gen_change()((this_actor, other_actors) in gen_actors())(
                (ops, metadata) in gen_ops(this_actor.clone(), other_actors),
                start_op in 1_u64..200000,
                seq in 0_u64..200000,
                timestamp in 0..i64::MAX,
                deps in proptest::collection::vec(gen_hash(), 0..100),
                message in proptest::option::of("[a-z]{200}"),
                this_actor in Just(this_actor),
            ) -> Change {
            let ops = ops.iter().map(|(obj, op)| op_as_actor_id(obj, op, &metadata));
            Change::new(ChangeBuilder::new()
                .with_dependencies(deps)
                .with_start_op(start_op.try_into().unwrap())
                .with_message(message)
                .with_actor(this_actor)
                .with_seq(seq)
                .with_timestamp(timestamp)
                .build(ops.into_iter())
                .unwrap())
        }

    }
}