use super::ArbitraryDelta;
use crate::{
CausalDotStore, ComputeDeletionsArg, DotStoreJoin, Identifier, compute_deletions_unknown_to,
crdts::{orarray::Uid, test_util::Delta},
dotstores::recording_sentinel::RecordingSentinel,
sentinel::DummySentinel,
};
use bimap::BiHashMap;
use quickcheck::Gen;
use std::{collections::HashMap, fmt};
#[derive(Debug, Clone, Default)]
pub(crate) struct KeyTracker {
pub(crate) map_keys: BiHashMap<String, usize>,
pub(crate) array_keys: BiHashMap<Uid, usize>,
pub(crate) inner_keys: Vec<KeyTracker>,
}
impl KeyTracker {
pub(crate) fn len(&self) -> usize {
self.inner_keys.len()
}
pub(crate) fn add_map_key(&mut self, key: String) -> usize {
let keyi = self.len();
self.map_keys.insert_no_overwrite(key, keyi).unwrap();
self.inner_keys.push(Default::default());
keyi
}
pub(crate) fn add_array_key(&mut self, key: Uid) -> usize {
let keyi = self.len();
self.array_keys.insert_no_overwrite(key, keyi).unwrap();
self.inner_keys.push(Default::default());
keyi
}
}
#[cfg_attr(feature = "serde", derive(::serde::Deserialize, ::serde::Serialize))]
#[derive(Debug, Clone)]
struct Op<Delta> {
by: Identifier,
action: Action<Delta>,
}
#[cfg_attr(feature = "serde", derive(::serde::Deserialize, ::serde::Serialize))]
#[derive(Debug, Clone)]
enum Action<Delta> {
Sync(Identifier),
Data(Delta),
}
#[cfg_attr(feature = "serde", derive(::serde::Deserialize, ::serde::Serialize))]
#[cfg_attr(not(feature = "serde"), derive(::core::fmt::Debug))]
pub(crate) struct Ops<DS>
where
DS: ArbitraryDelta,
{
ops: Vec<Op<DS::Delta>>,
nkeys: usize,
}
#[cfg(feature = "json")]
impl<DS> fmt::Debug for Ops<DS>
where
DS: ArbitraryDelta,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", serde_json::to_string(&self).unwrap())
}
}
impl<DS> Clone for Ops<DS>
where
DS: ArbitraryDelta,
DS::Delta: Clone,
{
fn clone(&self) -> Self {
Self {
ops: self.ops.clone(),
nkeys: self.nkeys,
}
}
}
impl<DS> quickcheck::Arbitrary for Ops<DS>
where
DS: ArbitraryDelta + DotStoreJoin<RecordingSentinel> + Clone + Default + 'static,
DS::Delta: Clone,
{
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
eprintln!("\n:: Generate a new test case");
let mut key_tracker = KeyTracker::default();
let nodes = 0..((u8::arbitrary(g) % 4) + 1);
let mut ccs: Vec<(Identifier, CausalDotStore<DS>)> = nodes
.map(|i| {
let id = Identifier::new(1, i as u16);
(id, CausalDotStore::new())
})
.collect();
assert!(!ccs.is_empty());
let n = g.size().min(256) as u8;
let mut g = Gen::new(g.size() / ccs.len());
let g = &mut g;
let mut ops = Vec::with_capacity(usize::from(n));
for _ in 0..n {
let id = {
let &(id, _) = g.choose(&ccs).unwrap();
id
};
if u8::arbitrary(g) < 64 {
let (wid, with) = g.choose(&ccs).unwrap().clone();
if wid == id {
continue;
}
eprintln!("==> {id:?} syncs from {wid:?}");
ops.push(Op {
by: id,
action: Action::Sync(wid),
});
let (_, cc) = ccs.iter_mut().find(|&&mut (ccid, _)| ccid == id).unwrap();
cc.test_join_with(with.store.clone(), &with.context);
continue;
};
let (_, cc) = ccs.iter_mut().find(|&&mut (ccid, _)| ccid == id).unwrap();
eprintln!("==> {id:?} generates data delta");
let (op, crdt) =
DS::arbitrary_delta(&cc.store, &cc.context, id, &mut key_tracker, g, 1);
cc.test_join_with(crdt.store.clone(), &crdt.context);
ops.push(Op {
by: id,
action: Action::Data(op),
});
}
let mut s = Self {
ops,
nkeys: key_tracker.inner_keys.len(),
};
s.prune_unnecessary();
s
}
fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
if self.ops.is_empty() {
return quickcheck::empty_shrinker();
}
Box::new(OpsShrinker {
seed: self.clone(),
key_subset: 1,
size: 0,
})
}
}
impl<DS> Ops<DS>
where
DS: ArbitraryDelta,
DS::Delta: Clone,
{
fn prune_unnecessary(&mut self) {
let mut previous_for_node = HashMap::new();
let mut i = 0;
self.ops.retain(|op| {
i += 1;
let id = op.by;
let mut keep = true;
match op.action {
Action::Data(_) => {}
Action::Sync(oid) if id == oid => {
keep = false;
}
Action::Sync(oid) => {
let other_last = previous_for_node.get(&oid).cloned();
if let Some(&(synci, Action::Sync(loid))) = previous_for_node.get(&id) {
if loid == oid {
if other_last.is_none_or(|(lasti, _)| lasti < synci) {
keep = false;
}
}
}
}
}
if keep {
previous_for_node.insert(id, (i, op.action.clone()));
}
keep
});
}
}
impl<DS> fmt::Display for Ops<DS>
where
DS: ArbitraryDelta,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.ops.iter().enumerate() {
if i != 0 && i % 10 == 0 {
writeln!(f, "--- {i} ---")?;
}
let id = op.by;
match &op.action {
Action::Data(op) => {
writeln!(f, " -> {id:?} {op}")?;
}
Action::Sync(i2) => writeln!(f, " -> {id:?} syncs from {i2:?}")?,
}
}
Ok(())
}
}
impl<DS> Ops<DS>
where
DS: DotStoreJoin<RecordingSentinel>,
DS: ArbitraryDelta + fmt::Debug,
{
#[cfg_attr(feature = "arbitrary", allow(dead_code))]
pub fn check_order_invariance(self, seed: u64) -> quickcheck::TestResult
where
DS: DotStoreJoin<DummySentinel> + Default + Clone + PartialEq,
{
eprintln!("\n:: Running test case:");
eprint!("{self}");
eprintln!("==> Executing steps");
if self.ops.is_empty() {
return quickcheck::TestResult::passed();
}
let mut keys = KeyTracker::default();
let fresh = CausalDotStore::<DS>::new();
let (final_state, ordered_deltas) = self.ops.into_iter().enumerate().fold(
(HashMap::new(), vec![]),
|(mut causals, mut deltas), (opi, op)| {
eprintln!(" -> executing #{opi}");
let id = op.by;
let mut causal = causals
.entry(id)
.or_insert_with(|| CausalDotStore::<DS>::new());
match op.action {
Action::Data(op) => {
let modded: CausalDotStore<DS> =
op.into_crdt(&causal.store, &causal.context, id, &mut keys);
causal
.join_with(modded.store.clone(), &modded.context, &mut DummySentinel)
.unwrap();
deltas.push(modded);
}
Action::Sync(other) => {
let other = causals
.entry(other)
.or_insert_with(|| CausalDotStore::<DS>::new())
.clone();
causal = causals.get_mut(&id).unwrap();
let mut inflate = other.subset_for_inflation_from(&causal.context);
inflate
.context
.union(&compute_deletions_unknown_to(ComputeDeletionsArg {
known_dots: &other.context,
live_dots: &other.store.dots(),
ignorant: &causal.store.dots(),
}));
let mut full_sync = causal.clone();
full_sync
.join_with(other.store.clone(), &other.context, &mut DummySentinel)
.unwrap();
let mut partial_sync = causal.clone();
partial_sync
.join_with(inflate.store.clone(), &inflate.context, &mut DummySentinel)
.unwrap();
assert_eq!(full_sync, partial_sync, "inflating with {inflate:?}");
*causal = partial_sync;
deltas.push(inflate);
}
}
(causals, deltas)
},
);
let final_state = final_state
.into_values()
.reduce(|acc, delta| acc.join(delta, &mut DummySentinel).unwrap())
.unwrap();
let state_ordered = ordered_deltas
.clone()
.into_iter()
.fold(fresh.clone(), |acc, delta| {
acc.join(delta, &mut DummySentinel).unwrap()
});
let merged_delta = ordered_deltas
.clone()
.into_iter()
.reduce(|acc, delta| acc.join(delta, &mut DummySentinel).unwrap())
.unwrap();
let state_ordered_merged = {
let mut fresh = fresh.clone();
fresh
.join_with(
merged_delta.store,
&merged_delta.context,
&mut DummySentinel,
)
.unwrap();
fresh
};
let shuffled_deltas = {
use rand::{SeedableRng, seq::SliceRandom};
let mut rng = rand::rngs::SmallRng::seed_from_u64(seed);
let mut tmp = ordered_deltas;
tmp.shuffle(&mut rng);
tmp
};
let state_shuffled = shuffled_deltas.into_iter().fold(fresh, |acc, delta| {
acc.join(delta, &mut DummySentinel).unwrap()
});
quickcheck::TestResult::from_bool(
dbg!(final_state == state_ordered_merged)
&& dbg!(final_state == state_ordered)
&& dbg!(final_state == state_shuffled),
)
}
}
struct OpsShrinker<DS>
where
DS: ArbitraryDelta,
{
seed: Ops<DS>,
key_subset: usize,
size: usize,
}
impl<DS> Iterator for OpsShrinker<DS>
where
DS: ArbitraryDelta,
DS::Delta: Clone,
{
type Item = Ops<DS>;
fn next(&mut self) -> Option<Self::Item> {
if self.size == 0 {
self.size = 1;
return Some(Ops {
ops: Vec::new(),
nkeys: 0,
});
}
if self.size < self.seed.ops.len() {
let ops = Vec::from(&self.seed.ops[..self.size]);
self.size *= 2;
return Some(Ops {
ops,
nkeys: self.seed.nkeys,
});
}
if self.key_subset < self.seed.nkeys {
let nkeys = self.key_subset;
self.key_subset *= 2;
let ops = self
.seed
.ops
.iter()
.filter(|op| {
if let Action::Data(d) = &op.action {
!d.depends_on_keyi_in(nkeys..)
} else {
true
}
})
.cloned()
.collect();
let mut s = Ops { ops, nkeys };
s.prune_unnecessary();
return Some(s);
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
OrMap,
crdts::{
NoExtensionTypes,
mvreg::MvRegValue,
test_util::arbitrary_delta_impls::{MapOp, RegisterOp, ValueDelta},
},
};
use quickcheck::Arbitrary;
#[test]
fn shrink_empty() {
let ops = Ops::<OrMap<String, NoExtensionTypes>> {
ops: vec![],
nkeys: 0,
};
assert_eq!(ops.shrink().count(), 0);
}
#[test]
fn shrink_single_key() {
let ops = Ops::<OrMap<String, NoExtensionTypes>> {
ops: vec![
Op {
by: (0, 0).into(),
action: Action::Data(MapOp::Apply(
0,
Some(String::from("key0")),
Box::new(ValueDelta::Register(RegisterOp(Some(MvRegValue::Bool(
true,
))))),
)),
},
Op {
by: (0, 0).into(),
action: Action::Data(MapOp::Apply(
0,
None,
Box::new(ValueDelta::Register(RegisterOp(Some(MvRegValue::Bool(
false,
))))),
)),
},
],
nkeys: 1,
};
let shrinks: Vec<_> = ops.shrink().collect();
assert_eq!(shrinks[0].ops.len(), 0);
assert_eq!(shrinks[1].ops.len(), 1);
assert!(matches!(
shrinks[1].ops[0].action,
Action::Data(MapOp::Apply(_, Some(_), _))
));
assert_eq!(shrinks.len(), 2);
}
#[test]
fn shrink_keys() {
let ops = Ops::<OrMap<String, NoExtensionTypes>> {
ops: vec![
Op {
by: (0, 0).into(),
action: Action::Data(MapOp::Apply(
0,
Some(String::from("key0")),
Box::new(ValueDelta::Register(RegisterOp(Some(MvRegValue::Bool(
true,
))))),
)),
},
Op {
by: (0, 0).into(),
action: Action::Data(MapOp::Apply(
1,
Some(String::from("key1")),
Box::new(ValueDelta::Register(RegisterOp(Some(MvRegValue::Bool(
true,
))))),
)),
},
Op {
by: (0, 0).into(),
action: Action::Data(MapOp::Apply(
0,
None,
Box::new(ValueDelta::Register(RegisterOp(Some(MvRegValue::Bool(
false,
))))),
)),
},
],
nkeys: 2,
};
let shrinks: Vec<_> = ops.shrink().collect();
assert_eq!(shrinks[0].ops.len(), 0);
assert_eq!(shrinks[1].ops.len(), 1);
assert!(matches!(
shrinks[1].ops[0].action,
Action::Data(MapOp::Apply(0, Some(_), _))
));
assert_eq!(shrinks[2].ops.len(), 2);
assert!(matches!(
shrinks[2].ops[0].action,
Action::Data(MapOp::Apply(0, Some(_), _))
));
assert!(matches!(
shrinks[2].ops[1].action,
Action::Data(MapOp::Apply(1, Some(_), _))
));
assert_eq!(shrinks[3].ops.len(), 2);
assert!(matches!(
shrinks[3].ops[0].action,
Action::Data(MapOp::Apply(0, Some(_), _))
));
assert!(matches!(
shrinks[3].ops[1].action,
Action::Data(MapOp::Apply(0, None, _))
));
assert_eq!(shrinks.len(), 4);
}
}