use std::hash::Hash;
use std::ops::Deref;
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use tracing::{trace, trace_span};
pub trait CausalSetItem: Clone + DeserializeOwned + Serialize {
type ID: Hash + Clone + PartialEq + Eq + Serialize + DeserializeOwned + Debug;
fn id(&self) -> Self::ID;
fn prev(&self) -> HashSet<&Self::ID>;
}
#[derive(Debug)]
pub struct CausalSet<I: CausalSetItem> {
items: HashMap<I::ID, I>,
pub frontier: CausalSetFrontier<I>,
descendants: HashMap<I::ID, HashSet<I::ID>>,
disconnected_items: HashMap<I::ID, I>,
}
impl<I: CausalSetItem> Deref for CausalSet<I> {
type Target = HashMap<I::ID, I>;
fn deref(&self) -> &Self::Target {
&self.items
}
}
impl<I: CausalSetItem + Debug> CausalSet<I> {
pub fn new() -> Self {
Self {
items: HashMap::new(),
frontier: CausalSetFrontier(HashSet::new()),
descendants: HashMap::new(),
disconnected_items: HashMap::new(),
}
}
pub fn insert(&mut self, item: I) -> Vec<I> {
trace_span!("CausalSet::insert", item = ?item, item_id = ?item.id(), existing_items = ?self.items).in_scope(|| {
self.disconnected_items.insert(item.id(), item);
let mut connected_items = Vec::new();
loop {
trace!(disconnected_items = ?self.disconnected_items, "Insert step start");
let mut new_connected_found = false;
self.disconnected_items.retain(|id, item| {
let item_prev = item.prev();
if item_prev.iter().all(|prev| self.items.contains_key(prev)) {
for prev in item_prev {
self.frontier.0.remove(prev);
self.descendants
.entry(prev.clone())
.or_default()
.insert(item.id().clone());
}
connected_items.push(item.clone());
self.items.insert(id.clone(), item.clone());
self.frontier.0.insert(id.clone());
new_connected_found = true;
false
} else {
true
}
});
if !new_connected_found {
break;
}
}
trace!(disconnected_items = ?self.disconnected_items, connected_items = ?self.items, "Inserting done");
connected_items
})
}
pub fn frontier(&self) -> &CausalSetFrontier<I> {
&self.frontier
}
pub fn item_ids_after(
&self,
frontier: &CausalSetFrontier<I>,
) -> (Vec<I::ID>, ItemsAfterFrontierResult) {
let known_frontier = frontier
.0
.iter()
.filter(|frontier| self.items.contains_key(frontier))
.cloned()
.collect::<HashSet<_>>();
let result = if known_frontier.is_empty() {
return (
self.items.keys().cloned().collect(),
ItemsAfterFrontierResult::FrontierWasUnknown,
);
} else if known_frontier.len() == frontier.0.len() {
ItemsAfterFrontierResult::FrontierWasKnown
} else {
ItemsAfterFrontierResult::FrontierWasPartiallyKnown
};
let mut advancing_frontier = known_frontier;
let mut missing_item_ids = Vec::new();
while advancing_frontier != self.frontier.0 {
let mut made_progress = false;
trace!(
advancing_frontier = ?advancing_frontier,
own_frontier = ?self.frontier.0,
"advancing frontier",
);
for id in advancing_frontier.clone() {
let mut descendants = self.descendants.get(&id).into_iter().flatten().peekable();
if descendants.peek().is_some() {
advancing_frontier.remove(&id);
made_progress = true;
}
for descendant_id in descendants {
missing_item_ids.push(descendant_id.clone());
advancing_frontier.insert(descendant_id.clone());
}
}
if !made_progress {
break;
}
}
(missing_item_ids, result)
}
pub fn items_after(
&self,
frontier: &CausalSetFrontier<I>,
) -> (Vec<I>, ItemsAfterFrontierResult) {
let (ids, result) = self.item_ids_after(frontier);
(
ids.iter()
.map(|id| {
self.items
.get(id)
.cloned()
.expect("Expected item for id after frontier to exist.")
})
.collect(),
result,
)
}
pub fn as_optimistic_frontier(&self) -> OptimisticCausalSetFrontier<I> {
OptimisticCausalSetFrontier {
frontier: self.frontier.clone(),
disconnected: self.disconnected_items.clone(),
}
}
pub fn has_disconnected(&self) -> bool {
!self.disconnected_items.is_empty()
}
pub fn is_before_or_at_frontier(
&self,
item_id: &I::ID,
frontier: &CausalSetFrontier<I>,
) -> bool {
let mut retreating_frontier = frontier.0.clone();
loop {
if retreating_frontier.contains(item_id) {
return true;
}
retreating_frontier = retreating_frontier
.iter()
.flat_map(|id| {
self.items
.get(id)
.expect("Expected to have item before frontier to check")
.prev()
})
.cloned()
.collect();
if retreating_frontier.is_empty() {
return false;
}
}
}
}
impl<I: CausalSetItem + Debug> Default for CausalSet<I> {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CausalSetFrontier<I: CausalSetItem>(pub HashSet<I::ID>);
impl<I: CausalSetItem> Default for CausalSetFrontier<I> {
fn default() -> Self {
Self(HashSet::new())
}
}
#[allow(clippy::enum_variant_names)]
pub enum ItemsAfterFrontierResult {
FrontierWasKnown,
FrontierWasPartiallyKnown,
FrontierWasUnknown,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct OptimisticCausalSetFrontier<I: CausalSetItem> {
#[serde(bound(deserialize = "I: DeserializeOwned"))]
frontier: CausalSetFrontier<I>,
#[serde(bound(deserialize = "I: DeserializeOwned, I::ID: DeserializeOwned"))]
disconnected: HashMap<I::ID, I>,
}
impl<I: CausalSetItem + Debug> OptimisticCausalSetFrontier<I> {
pub fn is_empty(&self) -> bool {
self.frontier.0.is_empty() && self.disconnected.is_empty()
}
pub fn insert_items<II: IntoIterator<Item = I>>(
&mut self,
items: II,
underlying_set: Option<&CausalSet<I>>,
) {
for item in items {
self.disconnected.insert(item.id().clone(), item);
}
if let Some(underlying_set) = underlying_set {
self.resolve(underlying_set);
}
}
pub fn resolve(&mut self, underlying_set: &CausalSet<I>) -> &CausalSetFrontier<I> {
loop {
let mut new_connected_found = false;
self.disconnected.retain(|disconnected_id, disconnected| {
if underlying_set.items.contains_key(disconnected_id) {
if disconnected
.prev()
.into_iter()
.all(|prev| underlying_set.is_before_or_at_frontier(prev, &self.frontier))
{
for prev in disconnected.prev() {
self.frontier.0.remove(prev);
}
self.frontier.0.insert(disconnected_id.clone());
new_connected_found = true;
false
} else {
true
}
} else {
true
}
});
if !new_connected_found {
break;
}
}
&self.frontier
}
}
impl<I: CausalSetItem> Default for OptimisticCausalSetFrontier<I> {
fn default() -> Self {
Self {
frontier: CausalSetFrontier::default(),
disconnected: HashMap::new(),
}
}
}
#[cfg(test)]
mod test;