jazz-telepathy 0.1.1

A framework for distributed logs and append-only sets
Documentation
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ops::Deref;

use serde::de::DeserializeOwned;
use serde::{Serialize};
use serde_derive::{Serialize, Deserialize};

pub trait CausalSetItem: Clone + DeserializeOwned + Serialize {
    type ID: Hash + Clone + PartialEq + Eq + Serialize + DeserializeOwned;

    fn id(&self) -> Self::ID;
    fn prev(&self) -> HashSet<&Self::ID>;
}

pub struct CausalSet<I: CausalSetItem> {
    items: HashMap<I::ID, I>,
    frontier: CausalSetFrontier<I>,
    descendants: HashMap<I::ID, HashSet<I::ID>>,
    // TODO(optimization): keep whole disconnected subsets
    disconnected_items: HashMap<I::ID, I>,
}

impl<I: CausalSetItem> Deref for CausalSet<I> {
    type Target = HashMap<I::ID, I>;

    fn deref(&self) -> &Self::Target {
        &self.items
    }
}

impl<I: CausalSetItem> CausalSet<I> {
    pub fn new() -> Self {
        Self {
            items: HashMap::new(),
            frontier: CausalSetFrontier(HashSet::new()),
            descendants: HashMap::new(),
            disconnected_items: HashMap::new(),
        }
    }

    pub fn insert(&mut self, item: I) -> Vec<I> {
        self.disconnected_items.insert(item.id(), item);

        let mut connected_items = Vec::new();

        loop {
            let mut new_connected_found = false;
            self.disconnected_items.retain(|id, item| {
                let item_prev = item.prev();
                if item_prev.iter().all(|prev| self.items.contains_key(prev)) {
                    for prev in item_prev {
                        self.frontier.0.remove(prev);
                        self.descendants
                            .entry(prev.clone())
                            .or_default()
                            .insert(item.id().clone());
                    }

                    connected_items.push(item.clone());
                    self.items.insert(id.clone(), item.clone());
                    self.frontier.0.insert(id.clone());
                    new_connected_found = true;
                    false
                } else {
                    true
                }
            });
            if !new_connected_found {
                break;
            }
        }

        connected_items
    }

    pub fn frontier(&self) -> &CausalSetFrontier<I> {
        &self.frontier
    }

    pub fn item_ids_after(
        &self,
        frontier: &CausalSetFrontier<I>,
    ) -> (Vec<I::ID>, ItemsAfterFrontierResult) {
        let known_frontier = frontier
            .0
            .iter()
            .filter(|frontier| self.items.contains_key(frontier))
            .cloned()
            .collect::<HashSet<_>>();

        let result = if known_frontier.is_empty() {
            return (
                self.items.keys().cloned().collect(),
                ItemsAfterFrontierResult::FrontierWasUnknown,
            );
        } else if known_frontier.len() == frontier.0.len() {
            ItemsAfterFrontierResult::FrontierWasKnown
        } else {
            ItemsAfterFrontierResult::FrontierWasPartiallyKnown
        };

        let mut advancing_frontier = known_frontier;
        let mut missing_item_ids = Vec::new();

        while advancing_frontier != self.frontier.0 {
            for id in advancing_frontier.clone() {
                let mut descendants = self.descendants.get(&id).into_iter().flatten().peekable();
                if descendants.peek().is_some() {
                    advancing_frontier.remove(&id);
                }
                for descendant_id in descendants {
                    missing_item_ids.push(descendant_id.clone());
                    advancing_frontier.insert(descendant_id.clone());
                }
            }
        }

        (missing_item_ids, result)
    }

    pub fn items_after(
        &self,
        frontier: &CausalSetFrontier<I>,
    ) -> (Vec<I>, ItemsAfterFrontierResult) {
        let (ids, result) = self.item_ids_after(frontier);
        (
            ids.iter()
                .map(|id| {
                    self.items
                        .get(id)
                        .cloned()
                        .expect("Expected item for id after frontier to exist.")
                })
                .collect(),
            result,
        )
    }

    pub fn as_optimistic_frontier(&self) -> OptimisticCausalSetFrontier<I> {
        OptimisticCausalSetFrontier {
            frontier: self.frontier.clone(),
            disconnected: self.disconnected_items.clone(),
        }
    }

    pub fn has_disconnected(&self) -> bool {
        !self.disconnected_items.is_empty()
    }

    pub fn is_before_or_at_frontier(
        &self,
        item_id: &I::ID,
        frontier: &CausalSetFrontier<I>,
    ) -> bool {
        let mut retreating_frontier = frontier.0.clone();
        loop {
            if retreating_frontier.contains(item_id) {
                return true;
            }

            retreating_frontier = retreating_frontier
                .iter()
                .flat_map(|id| {
                    self.items
                        .get(id)
                        .expect("Expected to have item before frontier to check")
                        .prev()
                })
                .cloned()
                .collect();

            if retreating_frontier.is_empty() {
                return false;
            }
        }
    }
}

impl <I: CausalSetItem> Default for CausalSet<I> {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CausalSetFrontier<I: CausalSetItem>(HashSet<I::ID>);


impl <I: CausalSetItem> Default for CausalSetFrontier<I> {
    fn default() -> Self {
        Self(HashSet::new())
    }
}

#[allow(clippy::enum_variant_names)]
pub enum ItemsAfterFrontierResult {
    FrontierWasKnown,
    FrontierWasPartiallyKnown,
    FrontierWasUnknown,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OptimisticCausalSetFrontier<I: CausalSetItem> {
    #[serde(bound(deserialize = "I: DeserializeOwned"))]
    frontier: CausalSetFrontier<I>,
    #[serde(bound(deserialize = "I: DeserializeOwned, I::ID: DeserializeOwned"))]
    disconnected: HashMap<I::ID, I>,
}

impl<I: CausalSetItem> OptimisticCausalSetFrontier<I> {
    pub fn insert_items<II: IntoIterator<Item = I>>(
        &mut self,
        items: II,
        underlying_set: Option<&CausalSet<I>>,
    ) {
        for item in items {
            self.disconnected.insert(item.id().clone(), item);
        }
        if let Some(underlying_set) = underlying_set {
            self.resolve(underlying_set);
        }
    }

    pub fn resolve(&mut self, underlying_set: &CausalSet<I>) -> &CausalSetFrontier<I> {
        loop {
            let mut new_connected_found = false;
            self.disconnected.retain(|disconnected_id, disconnected| {
                if underlying_set.items.contains_key(disconnected_id) {
                    if disconnected.prev().into_iter().all(|prev| {
                        underlying_set.is_before_or_at_frontier(prev, &self.frontier)
                    }) {
                        for prev in disconnected.prev() {
                            self.frontier.0.remove(prev);
                        }
                        self.frontier.0.insert(disconnected_id.clone());
                        new_connected_found = true;
                        false
                    } else {
                        true
                    }
                } else {
                    true
                }
            });
            if !new_connected_found {
                break;
            }
        }

        &self.frontier
    }
}

impl <I: CausalSetItem> Default for OptimisticCausalSetFrontier<I> {
    fn default() -> Self {
        Self {
            frontier: CausalSetFrontier::default(),
            disconnected: HashMap::new(),
        }
    }
}