use crate::acl::{Acl, Permission};
use crate::dotset::DotSet;
use crate::id::{DocId, PeerId};
use crate::lens::LensesRef;
use crate::path::{Path, PathBuf};
use crate::radixdb::BlobSet;
use crate::subscriber::Subscriber;
use anyhow::Result;
use bytecheck::CheckBytes;
use rkyv::{Archive, Archived, Deserialize, Serialize};
use std::iter::FromIterator;
use vec_collections::radix_tree::{AbstractRadixTree, AbstractRadixTreeMut, IterKey, RadixTree};
#[derive(Clone, Default, Eq, PartialEq, Archive, Deserialize, Serialize)]
#[archive(bound(serialize = "__S: rkyv::ser::ScratchSpace + rkyv::ser::Serializer"))]
#[archive_attr(derive(CheckBytes))]
#[archive_attr(check_bytes(
bound = "__C: rkyv::validation::ArchiveContext, <__C as rkyv::Fallible>::Error: std::error::Error"
))]
#[repr(C)]
pub struct DotStore(RadixTree<u8, ()>);
impl DotStore {
pub fn new() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn prefix(&self, path: Path) -> Self {
let mut res = self.0.clone();
res.prepend(path.as_ref());
Self(res)
}
pub fn contains(&self, path: Path) -> bool {
self.0.contains_key(path.as_ref())
}
pub fn contains_prefix(&self, prefix: Path) -> bool {
self.0.scan_prefix(prefix.as_ref()).next().is_some()
}
pub fn insert(&mut self, path: PathBuf) {
self.0.union_with(&RadixTree::single(path.as_ref(), ()));
}
pub fn iter(&self) -> impl Iterator<Item = PathBuf> + '_ {
self.0.iter().map(|path| Path::new(&path.0).to_owned())
}
pub fn union(&mut self, other: &Self) {
self.0.union_with(&other.0)
}
pub fn extend(&mut self, other: Self) {
self.0.union_with(&other.0)
}
}
impl FromIterator<PathBuf> for DotStore {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = PathBuf>,
{
let mut store = DotStore::new();
for path in iter.into_iter() {
store.insert(path);
}
store
}
}
impl std::fmt::Debug for DotStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut m = f.debug_map();
for p in self.iter() {
let path = p.as_path();
m.entry(&path.dot(), &path);
}
m.finish()
}
}
impl ArchivedDotStore {
pub fn iter(&self) -> impl Iterator<Item = PathBuf> + '_ {
self.0.iter().map(|path| Path::new(&path.0).to_owned())
}
}
impl std::fmt::Debug for ArchivedDotStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut m = f.debug_map();
for p in self.iter() {
let path = p.as_path();
m.entry(&path.dot(), &path);
}
m.finish()
}
}
#[derive(Clone, Default, Eq, PartialEq, Archive, Deserialize, Serialize)]
#[archive_attr(derive(CheckBytes))]
#[repr(C)]
pub struct CausalContext {
pub(crate) store: DotSet,
pub(crate) expired: DotSet,
}
impl CausalContext {
pub fn new() -> Self {
Self {
store: Default::default(),
expired: Default::default(),
}
}
pub fn store(&self) -> &DotSet {
&self.store
}
pub fn expired(&self) -> &DotSet {
&self.expired
}
}
impl ArchivedCausalContext {
pub fn store(&self) -> &Archived<DotSet> {
&self.store
}
pub fn expired(&self) -> &Archived<DotSet> {
&self.expired
}
}
impl std::fmt::Debug for CausalContext {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("CausalContext")
.field("store", &self.store)
.field("expired", &self.expired)
.finish()
}
}
impl std::fmt::Debug for ArchivedCausalContext {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("CausalContext")
.field("store", &self.store)
.field("expired", &self.expired)
.finish()
}
}
#[derive(Clone, Eq, PartialEq, Archive, Deserialize, Serialize, Default)]
#[archive_attr(derive(CheckBytes))]
#[repr(C)]
pub struct Causal {
pub(crate) store: DotStore,
pub(crate) expired: DotStore,
}
impl Causal {
pub fn store(&self) -> &DotStore {
&self.store
}
pub fn expired(&self) -> &DotStore {
&self.expired
}
pub fn is_empty(&self) -> bool {
self.store.is_empty() && self.expired.is_empty()
}
pub fn ctx(&self) -> CausalContext {
let mut ctx = CausalContext::new();
for buf in self.store.iter() {
let path = buf.as_path();
ctx.store.insert(path.dot());
}
for buf in self.expired.iter() {
let path = buf.as_path();
let dot = path.parent().unwrap().parent().unwrap().dot();
ctx.expired.insert(dot);
}
ctx
}
pub fn join(&mut self, that: &Causal) {
self.store.union(&that.store);
self.expired.union(&that.expired);
self.store.0.difference_with(&self.expired.0);
}
#[must_use]
pub fn unjoin(&self, ctx: &CausalContext) -> Self {
let mut expired = DotStore::new();
for buf in self.expired.iter() {
let path = buf.as_path();
let dot = path.parent().unwrap().parent().unwrap().dot();
if !ctx.expired.contains(&dot) {
expired.insert(buf);
}
}
let mut store = DotStore::new();
for buf in self.store.iter() {
let path = buf.as_path();
let dot = path.dot();
if !ctx.store.contains(&dot) && !ctx.expired.contains(&dot) && !expired.contains(path) {
store.insert(buf);
}
}
Self { expired, store }
}
pub fn transform(&mut self, from: LensesRef, to: LensesRef) {
let mut store = DotStore::new();
for buf in self.store.iter() {
let path = buf.as_path();
if let Some(path) = from.transform_path(path, to) {
store.insert(path);
}
}
self.store = store;
let mut expired = DotStore::new();
for buf in self.expired.iter() {
let path = buf.as_path();
if let Some(path) = from.transform_path(path, to) {
expired.insert(path);
}
}
self.expired = expired;
}
}
impl std::fmt::Debug for Causal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Causal")
.field("store", &self.store)
.field("expired", &self.expired)
.finish()
}
}
impl std::fmt::Debug for ArchivedCausal {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Causal")
.field("store", &self.store)
.field("expired", &self.expired)
.finish()
}
}
#[derive(Clone)]
pub struct Crdt {
store: BlobSet,
expired: BlobSet,
acl: Acl,
}
impl std::fmt::Debug for Crdt {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Crdt")
.field("store", &StoreDebug(&self.store))
.field("expired", &ExpiredDebug(&self.expired))
.field("acl", &self.acl)
.finish()
}
}
struct StoreDebug<'a>(&'a BlobSet);
impl<'a> std::fmt::Debug for StoreDebug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut m = f.debug_map();
for k in self.0.keys() {
let path = Path::new(&k);
m.entry(&path.dot(), &path);
}
m.finish()
}
}
struct ExpiredDebug<'a>(&'a BlobSet);
impl<'a> std::fmt::Debug for ExpiredDebug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut m = f.debug_map();
for k in self.0.keys() {
let path = Path::new(&k);
m.entry(&path.parent().unwrap().parent().unwrap().dot(), &path);
}
m.finish()
}
}
impl Crdt {
pub fn new(store: BlobSet, expired: BlobSet, acl: Acl) -> Self {
Self {
store,
expired,
acl,
}
}
pub fn iter(&self) -> impl Iterator<Item = IterKey<u8>> {
self.store.keys()
}
pub fn scan_path(&self, path: Path) -> impl Iterator<Item = IterKey<u8>> {
#[allow(clippy::unnecessary_to_owned)]
self.store.scan_prefix(path.as_ref().to_vec())
}
pub fn watch_path(&self, path: Path) -> Subscriber {
Subscriber::new(
self.store.watch_prefix(path),
self.acl.subscribe(&path.first().unwrap().doc().unwrap()),
)
}
pub fn can(&self, peer: &PeerId, perm: Permission, path: Path) -> Result<bool> {
self.acl.can(*peer, perm, path)
}
pub fn ctx(&self, doc: &DocId) -> Result<CausalContext> {
let mut ctx = CausalContext::new();
let mut path = PathBuf::new();
path.doc(doc);
for k in self.store.scan_prefix(&path) {
let dot = Path::new(&k).dot();
ctx.store.insert(dot);
}
for k in self.expired.scan_prefix(&path) {
let dot = Path::new(&k).parent().unwrap().parent().unwrap().dot();
ctx.expired.insert(dot);
}
Ok(ctx)
}
pub fn join_policy(&self, causal: &Causal) -> Result<()> {
for buf in causal.store.iter() {
let path = buf.as_path();
if path
.parent()
.unwrap()
.parent()
.unwrap()
.last()
.unwrap()
.policy()
.is_some()
{
tracing::info!("join_policy: {}", path);
self.store.insert(path);
}
}
self.store.flush()?;
Ok(())
}
pub fn join(&self, peer: &PeerId, causal: &Causal) -> Result<()> {
for buf in causal.store.iter() {
let path = buf.as_path();
let is_expired = self.expired.scan_prefix(path.as_ref()).next().is_some();
if !is_expired && !causal.expired.contains_prefix(path) {
if !self.can(peer, Permission::Write, path)? {
tracing::info!("join: peer is unauthorized to insert {}", path);
continue;
}
self.store.insert(&path);
}
}
for buf in causal.expired.iter() {
let path = buf.as_path();
let store_path = path.parent().unwrap().parent().unwrap();
if !self.can(peer, Permission::Write, store_path)? {
tracing::info!("join: peer is unauthorized to remove {}", store_path);
continue;
}
if self.store.contains(store_path) {
self.store.remove(store_path);
}
self.expired.insert(&path);
}
self.expired.flush()?;
self.store.flush()?;
Ok(())
}
pub fn unjoin(
&self,
peer_id: &PeerId,
doc: &DocId,
other: &Archived<CausalContext>,
) -> Result<Causal> {
let mut path = PathBuf::new();
path.doc(doc);
let ctx = self.ctx(doc)?;
let expired_dots = ctx.expired.difference(&other.expired);
let store_dots = ctx
.store
.difference(&other.store)
.difference(&other.expired);
let mut store = DotStore::new();
for k in self.store.scan_prefix(&path) {
let path = Path::new(&k[..]);
let dot = path.dot();
if !store_dots.contains(&dot) {
continue;
}
if !self.can(peer_id, Permission::Read, path)? {
tracing::info!("unjoin: peer is unauthorized to read");
continue;
}
if store_dots.contains(&dot) {
store.insert(path.to_owned());
}
}
let mut expired = DotStore::new();
for k in self.expired.scan_prefix(&path) {
let path = Path::new(&k);
let dot = path.parent().unwrap().parent().unwrap().dot();
if !expired_dots.contains(&dot) {
continue;
}
if !self.can(peer_id, Permission::Read, path)? {
tracing::info!("unjoin: peer is unauthorized to read {}", path);
continue;
}
if expired_dots.contains(&dot) {
expired.insert(path.to_owned());
}
}
Ok(Causal { expired, store })
}
pub fn remove(&self, doc: &DocId) -> Result<()> {
let mut path = PathBuf::new();
path.doc(doc);
for k in self.store.scan_prefix(&path) {
self.store.remove(k);
}
for k in self.expired.scan_prefix(&path) {
self.expired.remove(k);
}
self.expired.flush()?;
self.store.flush()?;
Ok(())
}
pub fn transform(&self, doc: &DocId, from: LensesRef, to: LensesRef) -> Result<()> {
let mut path = PathBuf::new();
path.doc(doc);
for k in self.scan_path(path.as_path()) {
let path = Path::new(&k);
if let Some(path) = from.transform_path(path, to) {
self.store.insert(path);
}
self.store.remove(k);
}
for k in self.scan_path(path.as_path()) {
let path = Path::new(&k);
if let Some(path) = from.transform_path(path, to) {
self.expired.insert(path);
}
self.expired.remove(k);
}
self.expired.flush()?;
self.store.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::doc::Backend;
use crate::util::Ref;
use crate::{props::*, Keypair};
use proptest::prelude::*;
use std::collections::BTreeSet;
use std::pin::Pin;
#[async_std::test]
async fn test_ewflag() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: EWFlag
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
assert!(!doc.cursor().enabled()?);
let op = doc.cursor().enable()?;
doc.apply(&op)?;
assert!(doc.cursor().enabled()?);
let op = doc.cursor().disable()?;
doc.apply(&op)?;
assert!(!doc.cursor().enabled()?);
Ok(())
}
#[async_std::test]
async fn test_ewflag_unjoin() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: EWFlag
}
}
"#;
let la = Keypair::generate();
let key = Keypair::generate();
let peer = key.peer_id();
let mut sdk1 = Backend::test(packages)?;
sdk1.frontend().add_keypair(key)?;
let fut = sdk1.frontend().create_doc(peer, "test", la)?;
Pin::new(&mut sdk1).await?;
let doc1 = fut.await;
let mut sdk2 = Backend::test(packages)?;
sdk2.frontend().add_keypair(key)?;
let fut = sdk2.frontend().create_doc(peer, "test", la)?;
Pin::new(&mut sdk2).await?;
let doc2 = fut.await;
let op = doc1.cursor().enable()?;
doc1.apply(&op)?;
doc2.apply(&op)?;
assert!(doc1.cursor().enabled()?);
assert!(doc2.cursor().enabled()?);
let op = doc1.cursor().disable()?;
doc1.apply(&op)?;
let delta = sdk1.unjoin(&peer, doc1.id(), Ref::archive(&doc2.ctx()?).as_ref())?;
let hash = sdk1.frontend().schema(doc1.id())?.as_ref().hash();
sdk2.join(&peer, doc1.id(), &hash, delta)?;
assert!(!doc1.cursor().enabled()?);
assert!(!doc2.cursor().enabled()?);
Ok(())
}
#[async_std::test]
async fn test_mvreg() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer1 = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer1, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let peer2 = sdk.frontend().generate_keypair()?;
let op = doc.cursor().say_can(Some(peer2), Permission::Write)?;
doc.apply(&op)?;
Pin::new(&mut sdk).await?;
let doc2 = sdk.frontend().doc_as(*doc.id(), &peer2)?;
let op1 = doc.cursor().assign_u64(42)?;
let op2 = doc2.cursor().assign_u64(43)?;
doc.apply(&op1)?;
doc.apply(&op2)?;
let values = doc.cursor().u64s()?.collect::<Result<BTreeSet<u64>>>()?;
assert_eq!(values.len(), 2);
assert!(values.contains(&42));
assert!(values.contains(&43));
let op = doc.cursor().assign_u64(99)?;
doc.apply(&op)?;
let values = doc.cursor().u64s()?.collect::<Result<BTreeSet<u64>>>()?;
assert_eq!(values.len(), 1);
assert!(values.contains(&99));
Ok(())
}
#[async_std::test]
async fn test_orarray_smoke() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Array
.[]: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let mut cur = doc.cursor();
cur.index(0)?;
let op = cur.assign_u64(42)?;
let op1 = cur.assign_u64(43)?;
doc.apply(&op)?;
doc.apply(&op1)?;
let r = doc
.cursor()
.index(0)?
.u64s()?
.collect::<Result<BTreeSet<_>>>()?;
assert_eq!(r.len(), 2);
assert!(r.contains(&42));
assert!(r.contains(&43));
let op2 = cur.assign_u64(44)?;
doc.apply(&op2)?;
let r = doc.cursor().index(0)?.u64s()?.collect::<Result<Vec<_>>>()?;
assert_eq!(r, vec![44]);
let op_delete = doc.cursor().index(0)?.delete()?;
doc.apply(&op_delete)?;
assert!(doc.cursor().index(0)?.u64s()?.next().is_none());
Ok(())
}
#[async_std::test]
async fn test_orarray_nested_crdt() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Array
.[]: Table<String>
.[].{}: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let mut cur = doc.cursor();
cur.index(0)?.key_str("a")?;
let op = cur.assign_u64(42)?;
let op1 = cur.assign_u64(43)?;
doc.apply(&op)?;
doc.apply(&op1)?;
let r = doc
.cursor()
.index(0)?
.key_str("a")?
.u64s()?
.collect::<Result<BTreeSet<_>>>()?;
assert_eq!(r.len(), 2);
assert!(r.contains(&42));
assert!(r.contains(&43));
let op2 = cur.assign_u64(44)?;
doc.apply(&op2)?;
let r = doc
.cursor()
.index(0)?
.key_str("a")?
.u64s()?
.collect::<Result<Vec<_>>>()?;
assert_eq!(r, vec![44]);
let op_delete = doc.cursor().index(0)?.key_str("a")?.delete()?;
doc.apply(&op_delete)?;
assert!(doc
.cursor()
.index(0)?
.key_str("a")?
.u64s()?
.next()
.is_none());
Ok(())
}
#[async_std::test]
async fn test_orarray_nested_struct() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Array
.[]: Struct
.[].complete: EWFlag
.[].title: MVReg<String>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let mut cur = doc.cursor();
cur.index(0)?.field("title")?;
let op = cur.assign_str("something that needs to be done")?;
doc.apply(&op)?;
let r = doc
.cursor()
.index(0)?
.field("title")?
.strs()?
.collect::<Result<Vec<_>>>()?;
assert_eq!(r.len(), 1);
assert_eq!(r[0], "something that needs to be done");
let mut cur = doc.cursor();
cur.index(0)?.field("complete")?;
let op = cur.enable()?;
doc.apply(&op)?;
let r = doc
.cursor()
.index(0)?
.field("title")?
.strs()?
.collect::<Result<Vec<_>>>()?;
assert_eq!(r.len(), 1);
assert_eq!(r[0], "something that needs to be done");
Ok(())
}
#[async_std::test]
async fn test_orarray_nested_orarray() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Array
.[]: Table<String>
.[].{}: Array
.[].{}.[]: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let mut cur = doc.cursor();
cur.index(0)?.key_str("a")?.index(0)?;
let op = cur.assign_u64(42)?;
let op1 = cur.assign_u64(43)?;
doc.apply(&op)?;
doc.apply(&op1)?;
let r = doc
.cursor()
.index(0)?
.key_str("a")?
.index(0)?
.u64s()?
.collect::<Result<BTreeSet<_>>>()?;
assert_eq!(r.len(), 2);
assert!(r.contains(&42));
assert!(r.contains(&43));
let op2 = cur.assign_u64(44)?;
doc.apply(&op2)?;
let r = doc
.cursor()
.index(0)?
.key_str("a")?
.index(0)?
.u64s()?
.collect::<Result<Vec<_>>>()?;
assert_eq!(r, vec![44]);
let op_delete = doc.cursor().index(0)?.key_str("a")?.delete()?;
doc.apply(&op_delete)?;
assert!(doc
.cursor()
.index(0)?
.key_str("a")?
.index(0)?
.u64s()?
.next()
.is_none());
Ok(())
}
#[async_std::test]
async fn test_orarray_move() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Array
.[]: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
for i in 0..10 {
let op = doc.cursor().index(i)?.assign_u64(i as u64)?;
doc.apply(&op)?;
}
let mut r = vec![];
for i in 0..doc.cursor().len()? as usize {
r.extend(doc.cursor().index(i)?.u64s()?.collect::<Result<Vec<_>>>()?);
}
assert_eq!(r, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let move_op = doc.cursor().index(5)?.r#move(2)?;
doc.apply(&move_op)?;
let mut r = vec![];
for i in 0..doc.cursor().len()? as usize {
r.extend(doc.cursor().index(i)?.u64s()?.collect::<Result<Vec<_>>>()?);
}
assert_eq!(r, vec![0, 1, 5, 2, 3, 4, 6, 7, 8, 9]);
Ok(())
}
#[async_std::test]
async fn test_ormap() -> Result<()> {
let packages = r#"
test {
0.1.0 {
.: Table<String>
.{}: Table<String>
.{}.{}: MVReg<u64>
}
}
"#;
let mut sdk = Backend::test(packages)?;
let peer = sdk.frontend().generate_keypair()?;
let fut = sdk
.frontend()
.create_doc(peer, "test", Keypair::generate())?;
Pin::new(&mut sdk).await?;
let doc = fut.await;
let mut cur = doc.cursor();
cur.key_str("a")?.key_str("b")?;
let op = cur.assign_u64(42)?;
doc.apply(&op)?;
let values = cur.u64s()?.collect::<Result<BTreeSet<u64>>>()?;
assert_eq!(values.len(), 1);
assert!(values.contains(&42));
let op = doc.cursor().key_str("a")?.key_str("b")?.remove()?;
doc.apply(&op)?;
let values = doc
.cursor()
.key_str("a")?
.key_str("b")?
.u64s()?
.collect::<Result<BTreeSet<u64>>>()?;
assert!(values.is_empty());
Ok(())
}
proptest! {
#[test]
fn causal_unjoin(a in arb_causal(), b in arb_causal()) {
let b = a.unjoin(&b.ctx());
prop_assert_eq!(join(&a, &b), a);
}
#[test]
fn causal_join_idempotent(a in arb_causal()) {
prop_assert_eq!(join(&a, &a), a);
}
#[test]
fn causal_join_commutative(a in arb_causal(), b in arb_causal()) {
let ab = join(&a, &b);
let ba = join(&b, &a);
prop_assert_eq!(ab, ba);
}
#[test]
fn causal_join_associative(a in arb_causal(), b in arb_causal(), c in arb_causal()) {
prop_assert_eq!(join(&join(&a, &b), &c), join(&a, &join(&b, &c)));
}
#[test]
fn crdt_join(a in arb_causal(), b in arb_causal()) {
let doc = DocId::new([0; 32]);
let crdt = causal_to_crdt(&doc, &a);
let c = join(&a, &b);
crdt.join(&doc.into(), &b).unwrap();
let c2 = crdt_to_causal(&doc, &crdt);
assert_eq!(c, c2);
}
#[test]
fn crdt_unjoin(a in arb_causal(), b in arb_causal()) {
let doc = DocId::new([0; 32]);
let crdt = causal_to_crdt(&doc, &a);
let c = a.unjoin(&b.ctx());
let actx = Ref::archive(&b.ctx());
let c2 = crdt.unjoin(&doc.into(), &doc, actx.as_ref()).unwrap();
assert_eq!(c, c2);
}
}
}