use crate::*;
use std::collections::HashMap;
use std::marker::PhantomData;
type EventCount = (u64, u64);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TClock<A: Actor, E: EventSet> {
occurrences: HashMap<A, MultiSet<u64, EventCount>>,
phantom: PhantomData<E>,
}
impl<A: Actor, E: EventSet> TClock<A, E> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
TClock {
occurrences: HashMap::new(),
phantom: PhantomData,
}
}
pub fn with_capacitiy(capacity: usize) -> Self {
TClock {
occurrences: HashMap::with_capacity(capacity),
phantom: PhantomData,
}
}
pub fn add(&mut self, clock: Clock<A, E>) {
for (actor, eset) in clock {
self.add_entry(actor, eset);
}
}
fn add_entry(&mut self, actor: A, eset: E) {
let count = event_count(eset);
let mset = self.occurrences.entry(actor).or_insert_with(MultiSet::new);
mset.add(count);
}
}
impl<A: Actor> TClock<A, MaxSet> {
pub fn threshold_union(&self, threshold: u64) -> (VClock<A>, bool) {
let mut equal_to_union = true;
let iter = self.occurrences.iter().map(|(actor, tset)| {
let mut total_positives = 0;
let seq = tset
.iter()
.rev()
.find(|(_, &(positives, _))| {
total_positives += positives;
total_positives >= threshold
})
.map_or(0, |(&seq, _)| seq);
let highest = tset.iter().rev().next().map_or(0, |(&seq, _)| seq);
equal_to_union = equal_to_union && highest == seq;
(actor.clone(), MaxSet::from_event(seq))
});
(VClock::from(iter), equal_to_union)
}
pub fn union(&self) -> (VClock<A>, bool) {
let mut all_equal = true;
let iter = self.occurrences.iter().map(|(actor, tset)| {
let mut tset = tset.iter().rev();
let (highest, _) = tset
.next()
.expect("there should be at least one event per actor");
if tset.next().is_some() {
all_equal = false;
}
(actor.clone(), MaxSet::from_event(*highest))
});
(VClock::from(iter), all_equal)
}
}
impl<A: Actor> TClock<A, BelowExSet> {
pub fn threshold_union(&self, threshold: u64) -> BEClock<A> {
let iter = self.occurrences.iter().map(|(actor, tset)| {
let mut total_pos = 0;
let iter = tset
.iter()
.rev()
.skip_while(|(_, &(pos, _))| {
total_pos += pos;
total_pos < threshold
})
.collect::<Vec<_>>();
let mut iter = iter.iter().peekable();
let highest = match iter.next() {
None => Ok(0),
Some((&seq, &(_, neg))) => {
if total_pos - neg >= threshold {
Ok(seq)
} else {
Err(seq)
}
}
}
.unwrap_or_else(|seq| {
let mut candidate = seq - 1;
loop {
match iter.peek() {
None => {
break candidate;
}
Some((&next_seq, &(pos, neg))) => {
if next_seq == candidate {
iter.next();
total_pos += pos;
if total_pos - neg >= threshold {
break candidate;
} else {
candidate -= 1;
}
} else {
break candidate;
}
}
}
}
});
let exs = iter.filter_map(|(&seq, &(pos, neg))| {
total_pos += pos;
if neg > total_pos || total_pos - neg < threshold {
Some(seq)
} else {
None
}
});
let below_exset = BelowExSet::from(highest, exs);
(actor.clone(), below_exset)
});
BEClock::from(iter)
}
}
fn event_count<E: EventSet>(
eset: E,
) -> impl Iterator<Item = (u64, EventCount)> {
let (left, right) = eset.events();
let left_count = std::iter::once(left).map(|x| (x, (1, 0)));
let right_count = right.into_iter().map(|x| (x, (0, 1)));
left_count.chain(right_count)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn regression_test_beclock() {
let b = String::from("B");
let mut clock_a = BEClock::new();
clock_a.add(&b, 5);
clock_a.add(&b, 6);
let mut clock_b = BEClock::new();
clock_b.add(&b, 5);
clock_b.add(&b, 7);
let mut tclock = TClock::new();
tclock.add(clock_a);
tclock.add(clock_b);
let clock = tclock.threshold_union(2);
let mut expected = BEClock::new();
expected.add(&b, 5);
assert_eq!(clock, expected);
}
#[test]
fn regression_test_vclock() {
let mut tclock = TClock::new();
let bottom = clock::vclock_from_seqs(vec![0, 0, 0, 0, 0]);
let c1 = clock::vclock_from_seqs(vec![1, 0, 0, 0, 0]);
let c2 = clock::vclock_from_seqs(vec![0, 1, 0, 0, 0]);
let both = clock::vclock_from_seqs(vec![1, 1, 0, 0, 0]);
tclock.add(c1.clone());
let (t1, equal_to_union) = tclock.threshold_union(1);
assert_eq!(t1, c1);
assert_eq!(equal_to_union, true);
let (t2, equal_to_union) = tclock.threshold_union(2);
assert_eq!(t2, bottom);
assert_eq!(equal_to_union, false);
tclock.add(c2.clone());
let (t1, equal_to_union) = tclock.threshold_union(1);
assert_eq!(t1, both);
assert_eq!(equal_to_union, true);
let (t2, equal_to_union) = tclock.threshold_union(2);
assert_eq!(t2, bottom);
assert_eq!(equal_to_union, false);
tclock.add(c1.clone());
let (t1, equal_to_union) = tclock.threshold_union(1);
assert_eq!(t1, both);
assert_eq!(equal_to_union, true);
let (t2, equal_to_union) = tclock.threshold_union(2);
assert_eq!(t2, c1);
assert_eq!(equal_to_union, false);
tclock.add(c2.clone());
let (t1, equal_to_union) = tclock.threshold_union(1);
assert_eq!(t1, both);
assert_eq!(equal_to_union, true);
let (t2, equal_to_union) = tclock.threshold_union(2);
assert_eq!(t2, both);
assert_eq!(equal_to_union, true);
}
}