#[derive(Debug,Clone)]
pub struct Counter<V, Seq> {
total: V,
live: std::collections::HashMap<crate::Session, crate::Cell<V, Seq>>,
rollup_seq: Seq,
}
impl<
V,
SF: crate::SequenceFactory,
> Counter<V, crate::Sequence<SF>>
{
pub fn new(ctx: &mut crate::Context<SF>, value: V) -> Self where V: Clone {
let cell = crate::Cell::new(ctx, value.clone());
Counter {
total: value,
rollup_seq: crate::Sequence::zero(),
live: IntoIterator::into_iter([(ctx.session(), cell)]).collect()
}
}
pub fn add(&mut self, ctx: &mut crate::Context<SF>, value: V)
where V: for <'a> std::ops::AddAssign<&'a V> + std::ops::AddAssign<V>
{
match self.live.entry(ctx.session()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
self.total += &value;
*entry.get_mut().borrow_mut(ctx) += value;
}
std::collections::hash_map::Entry::Vacant(entry) => {
self.total += &value;
entry.insert(crate::Cell::new(ctx, value));
}
}
}
pub fn total(&self) -> &V {
&self.total
}
}
impl<
V: Clone
+ for<'a> std::ops::AddAssign<&'a V>
+ for<'a> std::ops::SubAssign<&'a V>,
SF: crate::SequenceFactory,
>
crate::Mergable for Counter<V, crate::Sequence<SF>>
{
type Diff = CounterDiff<V, crate::Sequence<SF>>;
type Seq = crate::Sequence<SF>;
fn diff(&self, that: &Self) -> Self::Diff {
let mut rollup = (self.rollup_seq != that.rollup_seq)
.then(|| (self.rollup_seq.clone(), self.total.clone()));
let mut updates = Vec::new();
for (session, v) in &self.live {
if let Some((_, ref mut total)) = rollup {
*total -= &**v;
}
if v.last_modified() < &that.rollup_seq { continue }
let update = match that.live.get(&session) {
Some(that_v) => {
let diff = v.diff(that_v);
if crate::Diff::is_empty(&diff) { continue }
Update::Up(diff)
}
None => Update::New(v.clone()),
};
updates.push((session.clone(), update))
}
CounterDiff { rollup, updates }
}
fn apply(&mut self, diff: Self::Diff) -> Result<(), crate::ApplyError> {
let CounterDiff{rollup, updates} = diff;
if let Some((cutoff, rollup)) = rollup {
if cutoff > self.rollup_seq {
self.total = rollup;
let total = &mut self.total;
self.live.retain(|_, v| {
if v.last_modified() <= &cutoff { return false }
*total += &**v;
true
});
self.rollup_seq = cutoff;
}
}
for (session, update) in updates {
match self.live.entry(session) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let v = entry.get_mut();
self.total -= &**v;
match update {
Update::New(new) => v.merge(new),
Update::Up(diff) => v.apply(diff)?,
Update::Del(_) => {
entry.remove();
continue
}
}
self.total += &**v;
}
std::collections::hash_map::Entry::Vacant(entry) => {
match update {
Update::New(new) => {
self.total += &*new;
entry.insert(new);
}
Update::Up(_) => {
return Err(crate::ApplyError::Missing(
"Target doesn't contain base value for diff.".into()))
}
Update::Del(_) => {}
}
}
}
}
Ok(())
}
fn clean(&mut self, cutoff: &Self::Seq) {
if self.rollup_seq >= *cutoff { return }
self.live.retain(|_, v| v.last_modified() > cutoff);
self.rollup_seq = cutoff.clone();
}
}
impl<
V: Default,
SF: crate::SequenceFactory
> Default for Counter<V, crate::Sequence<SF>> {
fn default() -> Self {
Counter {
live: Default::default(),
rollup_seq: crate::Sequence::zero(),
total: Default::default(),
}
}
}
impl<V: PartialEq, Seq> PartialEq for Counter<V, Seq> {
fn eq(&self, other: &Self) -> bool {
return self.total == other.total
}
}
#[derive(Clone,Debug)]
pub struct CounterDiff<V, Seq> {
rollup: Option<(Seq, V)>,
updates: Vec<(crate::Session, Update<crate::Cell<V, Seq>, crate::CellDiff<V, Seq>>)>,
}
#[derive(Clone,Debug)]
enum Update<V, Diff> {
New(V),
Del(V),
Up(Diff),
}
impl<
V,
SF: crate::SequenceFactory,
> crate::Diff for CounterDiff<V, crate::Sequence<SF>> {
fn is_empty(&self) -> bool {
self.rollup.is_none() && self.updates.is_empty()
}
fn revert(mut self) -> Result<Self, crate::RevertError> {
crate::map_in_place(&mut self.updates, |(session, update)| Ok((session, match update {
Update::New(v) => Update::Del(v),
Update::Del(v) => Update::New(v),
Update::Up(diff) => Update::Up(diff),
})))?;
Ok(self)
}
}
#[test]
fn test_simple() {
let mut ctx1 = crate::Context::default();
let mut c1 = Counter::default();
c1.add(&mut ctx1, 5);
c1.add(&mut ctx1, 3);
let mut ctx2 = crate::Context::default();
let mut c2 = c1.clone();
c2.add(&mut ctx2, 1);
c2.add(&mut ctx2, 4);
c1.add(&mut ctx1, 8);
let result = crate::test::test_merge(&mut [&c2, &c1]);
assert_eq!(result.total(), &21);
}
#[test]
fn test_revert() {
let mut ctx1 = crate::Context::default();
let mut c1 = Counter::default();
c1.add(&mut ctx1, 5);
c1.add(&mut ctx1, 3);
let mut ctx2 = crate::Context::default();
let mut c2 = c1.clone();
c2.add(&mut ctx2, 1);
c2.add(&mut ctx2, 4);
let diff = crate::Mergable::diff(&c1, &c2);
let revert = crate::Diff::revert(diff.clone()).unwrap();
let result = crate::test::test_apply(c1, &mut [diff, revert]);
assert_eq!(result.total(), &8);
}
#[test]
fn test_revert_revive() {
let mut ctx1 = crate::Context::default();
let mut c1 = Counter::default();
c1.add(&mut ctx1, 5u32);
c1.add(&mut ctx1, 3);
let mut ctx2 = crate::Context::default();
let mut c2 = c1.clone();
c2.add(&mut ctx2, 1);
c2.add(&mut ctx2, 4);
let diff = crate::Mergable::diff(&c1, &c2);
let revert = crate::Diff::revert(diff.clone()).unwrap();
c2.add(&mut ctx2, 7);
let result = crate::test::test_apply(c2, &mut [diff, revert]);
assert_eq!(result.total(), &20);
}
#[test]
fn test_clean() {
let mut ctx1 = crate::Context::default();
let mut c1 = Counter::default();
c1.add(&mut ctx1, 1);
c1.add(&mut ctx1, 2);
let mut ctx2 = crate::Context::default();
let mut c2 = c1.clone();
c2.add(&mut ctx2, 4);
c2.add(&mut ctx2, 8);
c1.add(&mut ctx1, 16);
let seq_pre_merge = ctx1.next_sequence().max(ctx2.next_sequence());
let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
assert_eq!(c1.total(), &31);
ctx1.new_session();
ctx2.new_session();
let mut c2 = c1.clone();
c2.add(&mut ctx2, 32);
c1.add(&mut ctx1, 64);
crate::Mergable::clean(&mut c1, &seq_pre_merge);
assert_eq!(c1.live.len(), 1);
let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
assert_eq!(c1.total(), &127);
let seq_final = ctx1.next_sequence().max(ctx2.next_sequence());
crate::Mergable::clean(&mut c1, &seq_final);
assert_eq!(c1.live.len(), 0);
assert_eq!(c1.total(), &127);
}
#[cfg(test)]
#[derive(Debug,proptest_derive::Arbitrary)]
enum Event {
#[proptest(weight=90)]
Add{
#[proptest(strategy="0..5usize")]
actor: usize,
new_session: bool,
value: std::num::Wrapping<i64>,
},
#[proptest(weight=9)]
Merge{
#[proptest(strategy="0..5usize")]
src: usize,
#[proptest(strategy="0..5usize")]
dst: usize,
},
#[proptest(weight=1)]
Clean,
}
#[cfg(test)]
proptest::proptest!{
#[test]
fn proptest(events: Vec<Event>) {
let mut total = std::num::Wrapping::<i64>(0);
let mut actors = [(); 5].map(|()| {
let mut ctx = crate::Context::default();
let counter = Counter::new(&mut ctx, total);
(ctx, counter)
});
for event in events {
match event {
Event::Add{actor, new_session, value} => {
let (ctx, counter) = &mut actors[actor];
if new_session {
ctx.new_session();
}
total += value;
counter.add(ctx, value);
}
Event::Merge{src, dst} => {
if src == dst { continue }
let (src, dst) = crate::get_2_mut(&mut actors, src, dst);
crate::Mergable::merge(
&mut dst.1,
src.1.clone());
}
Event::Clean => {
for i in 1..actors.len() {
let (src, dst) = crate::get_2_mut(&mut actors, i-1, i);
crate::Mergable::merge(
&mut dst.1,
src.1.clone());
}
for i in (0..actors.len()).rev() {
if i > 0 {
let (dst, src) = crate::get_2_mut(&mut actors, i-1, i);
crate::Mergable::merge(
&mut dst.1,
src.1.clone());
}
let (ctx, counter) = &mut actors[i];
crate::Mergable::clean(counter, &ctx.next_sequence());
}
}
}
}
let mut refs = actors.iter().map(|(_, counter)| counter).collect::<Vec<_>>();
let merged = crate::test::test_merge(&mut refs);
proptest::prop_assert_eq!(merged.total(), &total);
}
}