threshold 0.9.1

Threshold data structures
Documentation
//! This module contains an implementation of a vector clock.
//!
//! The implementation is inspired in [rust-crdt's implementation](https://github.com/rust-crdt/rust-crdt/blob/master/src/vclock.rs).
//!
//! # Examples
//! ```
//! use threshold::*;
//!
//! let actor_a = "A";
//! let mut clock_a = VClock::new();
//! let mut clock_b = VClock::new();
//!
//! clock_a.next(&actor_a);
//! let event = clock_a.next(&actor_a);
//!
//! clock_b.join(&clock_a);
//! assert!(clock_b.contains(&actor_a, event));
//! ```

use crate::*;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::{self, HashMap};
use std::fmt;
use std::iter::FromIterator;

// A Vector Clock is `Clock` with `MaxSet` as `EventSet`.
pub type VClock<A> = Clock<A, MaxSet>;
// An Above Exception Clock is `Clock` with `AboveExSet` as `EventSet`.
pub type AEClock<A> = Clock<A, AboveExSet>;
// An Above Range Clock is `Clock` with `AboveRangeSet` as `EventSet`.
pub type ARClock<A> = Clock<A, AboveRangeSet>;
// A Below Exception Clock is `Clock` with `BelowExSet` as `EventSet`.
pub type BEClock<A> = Clock<A, BelowExSet>;

#[derive(Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct Clock<A: Actor, E: EventSet> {
    /// Mapping from actor identifier to an event set
    clock: HashMap<A, E>,
}

impl<A: Actor, E: EventSet> Clock<A, E> {
    /// Returns a new `Clock` instance.
    #[allow(clippy::new_without_default)]
    pub fn new() -> Self {
        Clock {
            clock: HashMap::new(),
        }
    }

    /// Returns a new `Clock` mapping each actor to a bottom entry.
    ///
    /// # Examples
    /// ```
    /// use std::collections::HashMap;
    /// use std::iter::FromIterator;
    /// use threshold::*;
    ///
    /// let actors = vec!["A", "B"];
    /// let vclock = VClock::with(actors);
    /// assert_eq!(
    ///     vclock.frontier(),
    ///     VClock::from(vec![("A", MaxSet::from(0)), ("B", MaxSet::from(0))])
    /// );
    /// ```
    pub fn with<I: IntoIterator<Item = A>>(iter: I) -> Self {
        Clock {
            clock: iter.into_iter().map(|actor| (actor, E::new())).collect(),
        }
    }

    /// Creates a `Clock` from an iterator of tuples (actor identifier and event
    /// set).
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let a = ("A", MaxSet::from_event(10));
    /// let b = ("B", MaxSet::from_event(20));
    /// let vclock = Clock::from(vec![a, b]);
    ///
    /// assert!(vclock.contains(&"A", 9));
    /// assert!(!vclock.contains(&"A", 11));
    /// ```
    pub fn from<I: IntoIterator<Item = (A, E)>>(iter: I) -> Self {
        Clock {
            clock: HashMap::from_iter(iter),
        }
    }

    /// Returns the number of actors in the clock.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let a = ("A", MaxSet::from_event(10));
    /// let b = ("B", MaxSet::from_event(20));
    /// let vclock = Clock::from(vec![a, b]);
    ///
    /// assert_eq!(vclock.len(), 2);
    /// ```
    pub fn len(&self) -> usize {
        self.clock.len()
    }

    /// Checks that a clock is empty.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let a = ("A", MaxSet::from_event(10));
    /// let b = ("B", MaxSet::from_event(20));
    /// let mut vclock = Clock::from(vec![a, b]);
    ///
    /// assert!(!vclock.is_empty());
    ///
    /// vclock = VClock::new();
    /// assert!(vclock.is_empty());
    /// ```
    pub fn is_empty(&self) -> bool {
        self.clock.is_empty()
    }

    /// Returns the next event for the `actor` while updating its entry in the
    /// clock.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    /// let actor_b = "B";
    ///
    /// let mut clock = VClock::new();
    /// let next = clock.next(&actor_a);
    /// assert_eq!(next, 1);
    ///
    /// let next = clock.next(&actor_a);
    /// assert_eq!(next, 2);
    ///
    /// let next = clock.next(&actor_a);
    /// assert_eq!(next, 3);
    /// ```
    pub fn next(&mut self, actor: &A) -> u64 {
        self.upsert(actor, |eset| eset.next_event(), || (E::from_event(1), 1))
    }

    /// If the actor is in already the clock, its entry is updated using
    /// function `map`. Otherwise, the output of `default` is inserted.
    fn upsert<F, D, R>(&mut self, actor: &A, mut map: F, default: D) -> R
    where
        F: FnMut(&mut E) -> R,
        D: FnOnce() -> (E, R),
    {
        match self.clock.get_mut(actor) {
            Some(eset) => map(eset),
            None => {
                let (value, result) = default();
                self.clock.insert(actor.clone(), value);
                result
            }
        }
    }

    /// Retrieves the event set associated with some `actor`.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    ///
    /// let mut clock = VClock::new();
    /// assert_eq!(clock.get(&actor_a), None);
    ///
    /// clock.add(&actor_a, 1);
    /// clock.add(&actor_a, 2);
    /// let max_set = clock.get(&actor_a).expect("there should be an event set");
    /// let mut iter = max_set.clone().event_iter();
    ///
    /// assert_eq!(iter.next(), Some(1));
    /// assert_eq!(iter.next(), Some(2));
    /// assert_eq!(iter.next(), None);
    /// ```
    pub fn get(&self, actor: &A) -> Option<&E> {
        self.clock.get(actor)
    }

    /// Retrieves (a mutable reference to) the event set associated with some
    /// `actor`.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    ///
    /// let mut clock = VClock::new();
    /// assert_eq!(clock.get_mut(&actor_a), None);
    ///
    /// clock.add(&actor_a, 1);
    /// clock.add(&actor_a, 2);
    /// let max_set = clock
    ///     .get_mut(&actor_a)
    ///     .expect("there should be an event set");
    /// max_set.add_event(3);
    /// let mut iter = max_set.clone().event_iter();
    ///
    /// assert_eq!(iter.next(), Some(1));
    /// assert_eq!(iter.next(), Some(2));
    /// assert_eq!(iter.next(), Some(3));
    /// assert_eq!(iter.next(), None);
    /// ```
    pub fn get_mut(&mut self, actor: &A) -> Option<&mut E> {
        self.clock.get_mut(actor)
    }

    /// Adds an event to the clock.
    /// If the clock did not have this event present, `true` is returned.
    /// If the clock did have this event present, `false` is returned.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    /// let actor_b = "B";
    ///
    /// let mut clock = VClock::new();
    ///
    /// assert!(!clock.contains(&actor_a, 1));
    /// clock.add(&actor_a, 1);
    /// assert!(clock.contains(&actor_a, 1));
    ///
    /// assert!(!clock.contains(&actor_b, 1));
    /// clock.add(&actor_b, 1);
    /// assert!(clock.contains(&actor_b, 1));
    /// ```
    pub fn add(&mut self, actor: &A, seq: u64) -> bool {
        self.upsert(
            actor,
            |eset| eset.add_event(seq),
            || (E::from_event(seq), true),
        )
    }

    /// Adds a range of events to the clock.
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    /// let actor_b = "B";
    ///
    /// let mut clock_a = VClock::new();
    /// clock_a.add_range(&actor_a, 10, 20);
    /// assert!(clock_a.contains(&actor_a, 10));
    /// assert!(clock_a.contains(&actor_a, 11));
    /// assert!(!clock_a.contains(&actor_a, 21));
    /// ```
    pub fn add_range(&mut self, actor: &A, start: u64, end: u64) -> bool {
        self.upsert(
            actor,
            |eset| eset.add_event_range(start, end),
            || (E::from_event_range(start, end), true),
        )
    }

    /// Checks if an event is part of the clock.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    ///
    /// let mut clock = VClock::new();
    /// assert!(!clock.contains(&actor_a, 1));
    /// clock.add(&actor_a, 1);
    /// assert!(clock.contains(&actor_a, 1));
    /// assert!(!clock.contains(&actor_a, 2));
    ///
    /// clock.add(&actor_a, 3);
    /// assert!(clock.contains(&actor_a, 1));
    /// assert!(clock.contains(&actor_a, 2));
    /// assert!(clock.contains(&actor_a, 3));
    /// ```
    pub fn contains(&self, actor: &A, seq: u64) -> bool {
        self.clock
            .get(actor)
            .map_or(false, |eset| eset.is_event(seq))
    }

    /// Returns the clock frontier.
    ///
    /// # Examples
    /// ```
    /// use std::collections::HashMap;
    /// use std::iter::FromIterator;
    /// use threshold::*;
    ///
    /// let a = ("A", AboveExSet::from_events(vec![1, 2, 4]));
    /// let b = ("B", AboveExSet::from_events(vec![1, 2, 3, 5, 6]));
    /// let clock = Clock::from(vec![a, b]);
    ///
    /// assert_eq!(
    ///     clock.frontier(),
    ///     VClock::from(vec![("A", MaxSet::from(2)), ("B", MaxSet::from(3))])
    /// );
    /// ```
    pub fn frontier(&self) -> VClock<A> {
        let frontier = self.clock.iter().map(|(actor, eset)| {
            (actor.clone(), MaxSet::from(eset.frontier()))
        });
        VClock::from(frontier)
    }

    /// By looking at this `Clock`'s frontier, it computes the event that's been
    /// generated in at least `threshold` actors.
    ///
    /// # Examples
    /// ```
    /// use threshold::{clock, *};
    ///
    /// let aset = AboveExSet::from_events(vec![1, 2, 4]);
    /// let bset = AboveExSet::from_events(vec![1, 2, 3, 5]);
    /// let clock = Clock::from(vec![("A", aset), ("B", bset)]);
    /// assert_eq!(clock.frontier_threshold(1), Some(3));
    /// assert_eq!(clock.frontier_threshold(2), Some(2));
    /// assert_eq!(clock.frontier_threshold(3), None);
    ///
    /// let aset = AboveExSet::from_events(vec![1, 2, 3, 5]);
    /// let bset = AboveExSet::from_events(vec![1, 2, 3, 5]);
    /// let clock = Clock::from(vec![("A", aset), ("B", bset)]);
    /// assert_eq!(clock.frontier_threshold(1), Some(3));
    /// assert_eq!(clock.frontier_threshold(2), Some(3));
    ///
    /// let clock = clock::vclock_from_seqs(vec![2, 1, 3]);
    /// assert_eq!(clock.frontier_threshold(1), Some(3));
    /// assert_eq!(clock.frontier_threshold(2), Some(2));
    /// assert_eq!(clock.frontier_threshold(3), Some(1));
    ///
    /// let clock = clock::vclock_from_seqs(vec![4, 4, 5, 3, 2]);
    /// assert_eq!(clock.frontier_threshold(1), Some(5));
    /// assert_eq!(clock.frontier_threshold(2), Some(4));
    /// assert_eq!(clock.frontier_threshold(3), Some(4));
    /// assert_eq!(clock.frontier_threshold(4), Some(3));
    /// assert_eq!(clock.frontier_threshold(5), Some(2));
    /// assert_eq!(clock.frontier_threshold(6), None);
    /// ```
    pub fn frontier_threshold(&self, threshold: usize) -> Option<u64> {
        debug_assert!(threshold > 0);
        let clock_size = self.clock.len();
        if threshold <= clock_size {
            // get frontiers and sort them
            let mut frontiers: Vec<_> =
                self.clock.iter().map(|(_, eset)| eset.frontier()).collect();
            frontiers.sort_unstable();

            // get the frontier at the correct threshold
            frontiers.into_iter().nth(clock_size - threshold)
        } else {
            None
        }
    }

    /// Merges clock `other` passed as argument into `self`.
    /// After merge, all events in `other` are events in `self`.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    /// let mut clock_a = VClock::new();
    /// let mut clock_b = VClock::new();
    ///
    /// clock_a.next(&actor_a);
    /// let event = clock_a.next(&actor_a);
    ///
    /// clock_b.join(&clock_a);
    /// assert!(clock_b.contains(&actor_a, event));
    /// ```
    pub fn join(&mut self, other: &Self) {
        for (actor, eset) in other.clock.iter() {
            self.upsert(
                actor,
                |current_eset| current_eset.join(eset),
                || (eset.clone(), ()),
            );
        }
    }

    /// Intersects clock `other` passed as argument with `self`.
    /// After intersection, only the common events are in `self`.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let actor_a = "A";
    /// let mut clock_a = VClock::new();
    /// let mut clock_b = VClock::new();
    ///
    /// let event = clock_a.next(&actor_a);
    ///
    /// clock_b.meet(&clock_a);
    /// assert!(!clock_b.contains(&actor_a, event));
    ///
    /// clock_b.next(&actor_a);
    /// clock_b.meet(&clock_a);
    /// assert!(clock_b.contains(&actor_a, event));
    /// ```
    pub fn meet(&mut self, other: &Self) {
        let mut to_remove = Vec::new();
        for (actor, eset) in self.clock.iter_mut() {
            if let Some(other_eset) = other.get(actor) {
                eset.meet(other_eset);
            } else {
                to_remove.push(actor.clone());
            }
        }

        // at this point, `to_remove` contains the set of actors are present in
        // the local clock but not in the remote clock
        // - these actors shouldn't be in the final clock, so let's remove them
        for actor in to_remove {
            self.clock.remove(&actor);
        }
    }

    /// Returns a `Clock` iterator.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let mut clock = VClock::new();
    /// clock.next(&"A");
    /// clock.next(&"A");
    /// clock.next(&"B");
    ///
    /// for (&actor, eset) in clock.iter() {
    ///     match actor {
    ///         "A" => assert_eq!(eset, &MaxSet::from_event(2)),
    ///         "B" => assert_eq!(eset, &MaxSet::from_event(1)),
    ///         _ => panic!("unexpected actor name"),
    ///     }
    /// }
    /// ```
    pub fn iter<'a>(&self) -> Iter<'_, A, E> {
        Iter(self.clock.iter())
    }

    /// Returns a `Clock` mutable iterator.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let mut clock = VClock::new();
    /// clock.next(&"A");
    /// clock.next(&"A");
    /// clock.next(&"B");
    ///
    /// for (&actor, eset) in clock.iter_mut() {
    ///     if actor == "A" {
    ///         eset.add_event(3);
    ///     }
    /// }
    ///
    /// let max_set = clock.get(&"A").expect("there should be an event set");
    /// assert_eq!(max_set, &MaxSet::from_event(3));
    /// ```
    pub fn iter_mut<'a>(&mut self) -> IterMut<'_, A, E> {
        IterMut(self.clock.iter_mut())
    }

    pub fn subtracted(&self, other: &Self) -> HashMap<A, Vec<u64>> {
        self.clock
            .iter()
            .map(|(actor, eset)| {
                let subtracted = if let Some(other_eset) = other.get(actor) {
                    eset.subtracted(other_eset)
                } else {
                    eset.clone().event_iter().collect()
                };
                (actor.clone(), subtracted)
            })
            .collect()
    }
}

/// Creates a new vector clock from a list of sequences.
/// `u64` are used as actor identifers and:
/// - the first sequence is mapped to actor number 0
/// - the last sequence is mapped to actor number #sequences - 1
///
/// # Examples
/// ```
/// use threshold::{clock, *};
///
/// let clock = clock::vclock_from_seqs(vec![10, 20]);
/// assert!(clock.contains(&0, 10));
/// assert!(clock.contains(&1, 20));
/// ```
pub fn vclock_from_seqs<I: IntoIterator<Item = u64>>(iter: I) -> VClock<u64> {
    Clock::from(
        iter.into_iter()
            .enumerate()
            .map(|(actor, seq)| (actor as u64, MaxSet::from_event(seq))),
    )
}

pub struct IntoIter<A: Actor, E: EventSet>(hash_map::IntoIter<A, E>);

impl<A: Actor, E: EventSet> Iterator for IntoIter<A, E> {
    type Item = (A, E);

    fn next(&mut self) -> Option<Self::Item> {
        self.0.next()
    }
}

impl<A: Actor, E: EventSet> IntoIterator for Clock<A, E> {
    type Item = (A, E);
    type IntoIter = IntoIter<A, E>;

    /// Returns a `Clock` into-iterator.
    ///
    /// # Examples
    /// ```
    /// use threshold::*;
    ///
    /// let mut clock = VClock::new();
    /// clock.next(&"A");
    /// clock.next(&"A");
    /// clock.next(&"B");
    ///
    /// for (actor, eset) in clock {
    ///     match actor {
    ///         "A" => assert_eq!(eset, MaxSet::from_event(2)),
    ///         "B" => assert_eq!(eset, MaxSet::from_event(1)),
    ///         _ => panic!("unexpected actor name"),
    ///     }
    /// }
    /// ```
    fn into_iter(self) -> Self::IntoIter {
        IntoIter(self.clock.into_iter())
    }
}

pub struct Iter<'a, A: Actor, E: EventSet>(hash_map::Iter<'a, A, E>);

impl<'a, A: Actor, E: EventSet> Iterator for Iter<'a, A, E> {
    type Item = (&'a A, &'a E);

    fn next(&mut self) -> Option<Self::Item> {
        self.0.next()
    }
}

pub struct IterMut<'a, A: Actor, E: EventSet>(hash_map::IterMut<'a, A, E>);

impl<'a, A: Actor, E: EventSet> Iterator for IterMut<'a, A, E> {
    type Item = (&'a A, &'a mut E);

    fn next(&mut self) -> Option<Self::Item> {
        self.0.next()
    }
}

impl<A: Actor, E: EventSet> fmt::Debug for Clock<A, E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let clock: std::collections::BTreeMap<_, _> =
            self.clock.iter().collect();
        write!(f, "{:?}", clock)
    }
}