mergable 0.53.0

A library for user-friendly and efficient CRDTs.
Documentation
/** A counter.
 */
#[derive(Debug,Clone)]
pub struct Counter<V, Seq> {
	total: V,
	live: std::collections::HashMap<crate::Session, crate::Cell<V, Seq>>,
	rollup_seq: Seq,
}

impl<
	V,
	SF: crate::SequenceFactory,
> Counter<V, crate::Sequence<SF>>
{
	pub fn new(ctx: &mut crate::Context<SF>, value: V) -> Self where V: Clone {
		let cell = crate::Cell::new(ctx, value.clone());
		Counter {
			total: value,
			rollup_seq: crate::Sequence::zero(),
			live: IntoIterator::into_iter([(ctx.session(), cell)]).collect()
		}
	}

	/** Set this session's value
	*
	* This will overwrite any existing values.
	*/
	pub fn add(&mut self, ctx: &mut crate::Context<SF>, value: V)
	where V: for <'a> std::ops::AddAssign<&'a V> + std::ops::AddAssign<V>
	{
		match self.live.entry(ctx.session()) {
			std::collections::hash_map::Entry::Occupied(mut entry) => {
				self.total += &value;
				*entry.get_mut().borrow_mut(ctx) += value;
			}
			std::collections::hash_map::Entry::Vacant(entry) => {
				self.total += &value;
				entry.insert(crate::Cell::new(ctx, value));
			}
		}
	}

	pub fn total(&self) -> &V {
		&self.total
	}
}

impl<
	V: Clone
	+ for<'a> std::ops::AddAssign<&'a V>
	+ for<'a> std::ops::SubAssign<&'a V>,
	SF: crate::SequenceFactory,
>
	crate::Mergable for Counter<V, crate::Sequence<SF>>
{
	type Diff = CounterDiff<V, crate::Sequence<SF>>;
	type Seq = crate::Sequence<SF>;

	// fn merge(&mut self, that: Self) {
	// 	let removed = that.removed.into_iter()
	// 		.map(|(what, when)| Update::Del(what, None, when));
	// 	let live = that.live.into_iter()
	// 		.map(|(k, Value{sequence, value})| Update::Up(sequence, k, UpdateValue::New(value)));
	// 	self.apply(removed.chain(live)).unwrap()
	// }

	fn diff(&self, that: &Self) -> Self::Diff {
		let mut rollup = (self.rollup_seq != that.rollup_seq)
			.then(|| (self.rollup_seq.clone(), self.total.clone()));

		let mut updates = Vec::new();

		for (session, v) in &self.live {
			if let Some((_, ref mut total)) = rollup {
				*total -= &**v;
			}

			if v.last_modified() < &that.rollup_seq { continue }

			let update = match that.live.get(&session) {
				Some(that_v) => {
					let diff = v.diff(that_v);
					if crate::Diff::is_empty(&diff) { continue }
					Update::Up(diff)
				}
				None => Update::New(v.clone()),
			};

			updates.push((session.clone(), update))
		}

		CounterDiff { rollup, updates }
	}

	fn apply(&mut self, diff: Self::Diff) -> Result<(), crate::ApplyError> {
		let CounterDiff{rollup, updates} = diff;
		if let Some((cutoff, rollup)) = rollup {
			if cutoff > self.rollup_seq {
				self.total = rollup;
				let total = &mut self.total;
				self.live.retain(|_, v| {
					if v.last_modified() <= &cutoff { return false }

					*total += &**v;
					true
				});
				self.rollup_seq = cutoff;
			}
		}

		for (session, update) in updates {
			match self.live.entry(session) {
				std::collections::hash_map::Entry::Occupied(mut entry) => {
					let v = entry.get_mut();

					self.total -= &**v;
					match update {
						Update::New(new) => v.merge(new),
						Update::Up(diff) => v.apply(diff)?,
						Update::Del(_) => {
							entry.remove();
							continue
						}
					}
					self.total += &**v;
				}
				std::collections::hash_map::Entry::Vacant(entry) => {
					match update {
						Update::New(new) => {
							self.total += &*new;
							entry.insert(new);
						}
						Update::Up(_) => {
							return Err(crate::ApplyError::Missing(
								"Target doesn't contain base value for diff.".into()))
						}
						Update::Del(_) => {}
					}
				}
			}
		}

		Ok(())
	}

	fn clean(&mut self, cutoff: &Self::Seq) {
		if self.rollup_seq >= *cutoff { return }

		self.live.retain(|_, v| v.last_modified() > cutoff);
		self.rollup_seq = cutoff.clone();
	}
}

impl<
	V: Default,
	SF: crate::SequenceFactory
> Default for Counter<V, crate::Sequence<SF>> {
	fn default() -> Self {
		Counter {
			live: Default::default(),
			rollup_seq: crate::Sequence::zero(),
			total: Default::default(),
		}
	}
}

impl<V: PartialEq, Seq> PartialEq for Counter<V, Seq> {
	fn eq(&self, other: &Self) -> bool {
		return self.total == other.total
	}
}

#[derive(Clone,Debug)]
pub struct CounterDiff<V, Seq> {
	rollup: Option<(Seq, V)>,
	updates: Vec<(crate::Session, Update<crate::Cell<V, Seq>, crate::CellDiff<V, Seq>>)>,
}

#[derive(Clone,Debug)]
enum Update<V, Diff> {
	New(V),
	Del(V),
	Up(Diff),
}

impl<
	V,
	SF: crate::SequenceFactory,
> crate::Diff for CounterDiff<V, crate::Sequence<SF>> {
	fn is_empty(&self) -> bool {
		self.rollup.is_none() && self.updates.is_empty()
	}

	fn revert(mut self) -> Result<Self, crate::RevertError> {
		crate::map_in_place(&mut self.updates, |(session, update)| Ok((session, match update {
			Update::New(v) => Update::Del(v),
			Update::Del(v) => Update::New(v),
			Update::Up(diff) => Update::Up(diff),
		})))?;
		Ok(self)
	}
}

#[test]
fn test_simple() {
	let mut ctx1 = crate::Context::default();
	let mut c1 = Counter::default();
	c1.add(&mut ctx1, 5);
	c1.add(&mut ctx1, 3);

	let mut ctx2 = crate::Context::default();
	let mut c2 = c1.clone();
	c2.add(&mut ctx2, 1);
	c2.add(&mut ctx2, 4);

	c1.add(&mut ctx1, 8);

	let result = crate::test::test_merge(&mut [&c2, &c1]);
	assert_eq!(result.total(), &21);
}

#[test]
fn test_revert() {
	let mut ctx1 = crate::Context::default();
	let mut c1 = Counter::default();
	c1.add(&mut ctx1, 5);
	c1.add(&mut ctx1, 3);

	let mut ctx2 = crate::Context::default();
	let mut c2 = c1.clone();
	c2.add(&mut ctx2, 1);
	c2.add(&mut ctx2, 4);

	let diff = crate::Mergable::diff(&c1, &c2);
	let revert = crate::Diff::revert(diff.clone()).unwrap();

	let result = crate::test::test_apply(c1, &mut [diff, revert]);
	assert_eq!(result.total(), &8);
}

#[test]
fn test_revert_revive() {
	let mut ctx1 = crate::Context::default();
	let mut c1 = Counter::default();
	c1.add(&mut ctx1, 5u32);
	c1.add(&mut ctx1, 3);

	let mut ctx2 = crate::Context::default();
	let mut c2 = c1.clone();
	c2.add(&mut ctx2, 1);
	c2.add(&mut ctx2, 4);

	let diff = crate::Mergable::diff(&c1, &c2);
	let revert = crate::Diff::revert(diff.clone()).unwrap();

	c2.add(&mut ctx2, 7);

	let result = crate::test::test_apply(c2, &mut [diff, revert]);
	// FIXME: Right now reverts to parts of sessions get ignored. Ideally this would be 15.
	// We probably need to rethink how our revert system works.
	// It would be possible to do it with the current revert system by creating a negative session for the revert but that would require "signed" types as well as the ability to generate a sequence number in the revert function.
	assert_eq!(result.total(), &20);
}

#[test]
fn test_clean() {
	let mut ctx1 = crate::Context::default();
	let mut c1 = Counter::default();
	c1.add(&mut ctx1, 1);
	c1.add(&mut ctx1, 2);

	let mut ctx2 = crate::Context::default();
	let mut c2 = c1.clone();
	c2.add(&mut ctx2, 4);
	c2.add(&mut ctx2, 8);

	c1.add(&mut ctx1, 16);

	let seq_pre_merge = ctx1.next_sequence().max(ctx2.next_sequence());

	let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
	assert_eq!(c1.total(), &31);

	ctx1.new_session();
	ctx2.new_session();

	let mut c2 = c1.clone();
	c2.add(&mut ctx2, 32);

	c1.add(&mut ctx1, 64);

	crate::Mergable::clean(&mut c1, &seq_pre_merge);
	assert_eq!(c1.live.len(), 1);

	let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
	assert_eq!(c1.total(), &127);

	let seq_final = ctx1.next_sequence().max(ctx2.next_sequence());
	crate::Mergable::clean(&mut c1, &seq_final);
	assert_eq!(c1.live.len(), 0);
	assert_eq!(c1.total(), &127);
}

#[cfg(test)]
#[derive(Debug,proptest_derive::Arbitrary)]
enum Event {
	#[proptest(weight=90)]
	Add{
		#[proptest(strategy="0..5usize")]
		actor: usize,
		new_session: bool,
		value: std::num::Wrapping<i64>,
	},
	#[proptest(weight=9)]
	Merge{
		#[proptest(strategy="0..5usize")]
		src: usize,
		#[proptest(strategy="0..5usize")]
		dst: usize,
	},
	#[proptest(weight=1)]
	Clean,
}

#[cfg(test)]
proptest::proptest!{
	#[test]
	fn proptest(events: Vec<Event>) {
		let mut total = std::num::Wrapping::<i64>(0);

		let mut actors = [(); 5].map(|()| {
			let mut ctx = crate::Context::default();
			let counter = Counter::new(&mut ctx, total);
			(ctx, counter)
		});

		for event in events {
			match event {
				Event::Add{actor, new_session, value} => {
					let (ctx, counter) = &mut actors[actor];
					if new_session {
						ctx.new_session();
					}
					total += value;
					counter.add(ctx, value);
				}
				Event::Merge{src, dst} => {
					if src == dst { continue }
					let (src, dst) = crate::get_2_mut(&mut actors, src, dst);

					crate::Mergable::merge(
						&mut dst.1,
						src.1.clone());
				}
				Event::Clean => {
					for i in 1..actors.len() {
						let (src, dst) = crate::get_2_mut(&mut actors, i-1, i);
						crate::Mergable::merge(
							&mut dst.1,
							src.1.clone());
					}
					for i in (0..actors.len()).rev() {
						if i > 0 {
							let (dst, src) = crate::get_2_mut(&mut actors, i-1, i);
							crate::Mergable::merge(
								&mut dst.1,
								src.1.clone());
						}

						let (ctx, counter) = &mut actors[i];
						crate::Mergable::clean(counter, &ctx.next_sequence());
					}
				}
			}
		}

		let mut refs = actors.iter().map(|(_, counter)| counter).collect::<Vec<_>>();
		let merged = crate::test::test_merge(&mut refs);
		proptest::prop_assert_eq!(merged.total(), &total);
	}
}