use crate::error::RetrievalError;
use crate::retrieval::{TClock, TEvent};
use smallvec::SmallVec;
use std::collections::{BTreeSet, HashMap, HashSet};
pub use crate::retrieval::GetEvents;
pub use crate::retrieval::Retrieve;
#[derive(Debug, Clone)]
pub struct EventAccumulator<Event> {
events: Vec<Event>,
maximum: Option<usize>,
}
impl<Event: Clone> EventAccumulator<Event> {
pub fn new(maximum: Option<usize>) -> Self { Self { events: Vec::new(), maximum } }
pub fn add(&mut self, event: &Event) -> bool {
if let Some(max) = self.maximum {
if self.events.len() >= max {
return false; }
}
self.events.push(event.clone());
true
}
pub fn take_events(self) -> Vec<Event> { self.events }
pub fn is_at_limit(&self) -> bool { self.maximum.map_or(false, |max| self.events.len() >= max) }
}
#[derive(Debug, PartialEq, Eq)]
pub enum Ordering<Id> {
Equal,
Descends,
NotDescends {
meet: Vec<Id>,
},
Incomparable,
PartiallyDescends {
meet: Vec<Id>,
},
BudgetExceeded {
subject_frontier: BTreeSet<Id>,
other_frontier: BTreeSet<Id>,
},
}
pub async fn compare_unstored_event<G, E, C>(getter: &G, subject: &E, other: &C, budget: usize) -> Result<Ordering<G::Id>, RetrievalError>
where
G: GetEvents,
G::Event: TEvent<Id = G::Id> + Clone,
E: TEvent<Id = G::Id, Parent = C>,
C: TClock<Id = G::Id>,
G::Id: std::hash::Hash + Ord + std::fmt::Display,
{
if other.members().len() == 1 && other.members()[0] == subject.id() {
return Ok(Ordering::Equal);
}
let subject_parent = subject.parent();
let result = compare(getter, subject_parent, other, budget).await?;
Ok(match result {
Ordering::Equal => Ordering::Descends,
other => other,
})
}
pub async fn compare<G, C>(getter: &G, subject: &C, other: &C, budget: usize) -> Result<Ordering<G::Id>, RetrievalError>
where
G: GetEvents,
G::Event: TEvent<Id = G::Id> + Clone,
C: TClock<Id = G::Id>,
G::Id: std::hash::Hash + Ord + std::fmt::Display,
{
if subject.members().is_empty() || other.members().is_empty() {
return Ok(Ordering::Incomparable);
}
if subject.members() == other.members() {
return Ok(Ordering::Equal);
}
let mut comparison = Comparison::new(getter, subject, other, budget);
loop {
if let Some(ordering) = comparison.step().await? {
return Ok(ordering);
}
}
}
#[derive(Debug, Clone, Default)]
struct Origins<Id>(SmallVec<[Id; 8]>);
impl<Id> Origins<Id> {
fn new() -> Self { Self(SmallVec::new()) }
}
impl<Id: Clone + PartialEq> Origins<Id> {
fn add(&mut self, id: Id) {
if !self.0.contains(&id) {
self.0.push(id);
}
}
fn augment(&mut self, other: &Self) {
for h in other.0.iter() {
if !self.0.contains(h) {
self.0.push(h.clone());
}
}
}
}
impl<Id> std::ops::Deref for Origins<Id> {
type Target = [Id];
fn deref(&self) -> &Self::Target { &self.0 }
}
#[derive(Debug, Clone)]
struct State<Id> {
seen_from_subject: bool,
seen_from_other: bool,
common_child_count: usize,
origins: Origins<Id>,
}
impl<Id> Default for State<Id> {
fn default() -> Self { Self { seen_from_subject: false, seen_from_other: false, common_child_count: 0, origins: Origins::new() } }
}
impl<Id> State<Id>
where Id: Clone + PartialEq
{
fn is_common(&self) -> bool { self.seen_from_subject && self.seen_from_other }
fn mark_seen_from(&mut self, from_subject: bool, from_other: bool) {
if from_subject {
self.seen_from_subject = true;
}
if from_other {
self.seen_from_other = true;
}
}
}
pub(crate) struct Comparison<'a, G>
where
G: GetEvents + 'a,
G::Event: TEvent<Id = G::Id>,
G::Id: std::hash::Hash + Ord + std::fmt::Debug,
{
getter: &'a G,
original_other_events: BTreeSet<G::Id>,
outstanding_heads: BTreeSet<G::Id>,
remaining_budget: usize,
subject_frontier: BTreeSet<G::Id>,
other_frontier: BTreeSet<G::Id>,
states: HashMap<G::Id, State<G::Id>>,
meet_candidates: BTreeSet<G::Id>,
unseen_other_heads: usize,
head_overlap: bool,
initial_heads_equal: bool,
any_common: bool,
subject_event_accumulator: Option<EventAccumulator<ankurah_proto::Attested<G::Event>>>,
}
impl<'a, G> Comparison<'a, G>
where
G: GetEvents + 'a,
G::Event: TEvent<Id = G::Id> + Clone,
G::Id: std::hash::Hash + Ord + std::fmt::Debug + std::fmt::Display,
{
pub fn new<C: TClock<Id = G::Id>>(getter: &'a G, subject: &C, other: &C, budget: usize) -> Self {
Self::new_with_accumulator(getter, subject, other, budget, None)
}
pub fn new_with_accumulator<C: TClock<Id = G::Id>>(
getter: &'a G,
subject: &C,
other: &C,
budget: usize,
subject_event_accumulator: Option<EventAccumulator<ankurah_proto::Attested<G::Event>>>,
) -> Self {
let subject_frontier: BTreeSet<_> = subject.members().iter().cloned().collect();
let other: BTreeSet<_> = other.members().iter().cloned().collect();
let original_other_events = other.clone();
let initial_heads_equal = subject_frontier == other;
let head_overlap = initial_heads_equal;
Self {
getter,
unseen_other_heads: other.len(),
subject_frontier,
other_frontier: other.clone(),
remaining_budget: budget,
original_other_events,
head_overlap,
initial_heads_equal,
any_common: false,
states: HashMap::new(),
meet_candidates: BTreeSet::new(),
outstanding_heads: other,
subject_event_accumulator,
}
}
pub fn take_accumulated_events(self) -> Option<Vec<ankurah_proto::Attested<G::Event>>> {
self.subject_event_accumulator.map(|acc| acc.take_events())
}
pub async fn step(&mut self) -> Result<Option<Ordering<G::Id>>, RetrievalError> {
if self.initial_heads_equal {
return Ok(Some(Ordering::Equal));
}
let ids: Vec<G::Id> = self.subject_frontier.union(&self.other_frontier).cloned().collect();
let mut result_checklist: HashSet<G::Id> = ids.iter().cloned().collect();
let (cost, events) = self.getter.retrieve_event(ids).await?;
self.remaining_budget = self.remaining_budget.saturating_sub(cost);
for event in events {
if result_checklist.remove(&event.payload.id()) {
self.process_event(&event);
}
}
if !result_checklist.is_empty() {
return Err(RetrievalError::StorageError(format!("Events not found: {:?}", result_checklist).into()));
}
if let Some(ordering) = self.check_result() {
return Ok(Some(ordering));
}
Ok(None)
}
fn process_event(&mut self, event: &ankurah_proto::Attested<G::Event>) {
let id = event.payload.id();
let parents = event.payload.parent().members();
let from_subject = self.subject_frontier.remove(&id);
let from_other = self.other_frontier.remove(&id);
let (is_common, origins) = {
let node_state = self.states.entry(id.clone()).or_default();
node_state.mark_seen_from(from_subject, from_other);
if from_subject && !self.original_other_events.contains(&id) && !node_state.is_common() {
if let Some(ref mut accumulator) = self.subject_event_accumulator {
accumulator.add(&event); }
}
if from_other && self.original_other_events.contains(&id) {
node_state.origins.add(id.clone());
}
(node_state.is_common(), node_state.origins.clone())
};
if is_common && self.meet_candidates.insert(id.clone()) {
self.any_common = true;
for h in origins.iter() {
self.outstanding_heads.remove(h);
}
for p in parents {
let parent_state = self.states.entry(p.clone()).or_default();
if from_other {
parent_state.origins.augment(&origins);
}
parent_state.common_child_count += 1;
}
} else if from_other {
for p in parents {
let parent_state = self.states.entry(p.clone()).or_default();
parent_state.origins.augment(&origins);
}
}
if from_subject {
self.subject_frontier.extend(parents.iter().cloned());
if self.original_other_events.contains(&id) {
self.unseen_other_heads = self.unseen_other_heads.saturating_sub(1);
self.head_overlap = true;
}
}
if from_other {
self.other_frontier.extend(parents.iter().cloned());
}
}
fn check_result(&mut self) -> Option<Ordering<G::Id>> {
if self.remaining_budget == 0 {
return Some(Ordering::BudgetExceeded {
subject_frontier: self.subject_frontier.clone(),
other_frontier: self.other_frontier.clone(),
});
}
if self.subject_frontier.is_empty() && self.other_frontier.is_empty() {
return Some(self.determine_final_ordering());
}
if self.any_common && self.outstanding_heads.is_empty() && self.unseen_other_heads > 0 {
return Some(self.compute_not_descends_ordering());
}
None
}
fn determine_final_ordering(&self) -> Ordering<G::Id> {
if self.unseen_other_heads == 0 {
return if self.initial_heads_equal { Ordering::Equal } else { Ordering::Descends };
}
if !self.any_common || !self.outstanding_heads.is_empty() {
return Ordering::Incomparable;
}
self.compute_not_descends_ordering()
}
fn compute_not_descends_ordering(&self) -> Ordering<G::Id> {
let meet: Vec<_> = self
.meet_candidates
.iter()
.filter(|id| self.states.get(*id).map_or(0, |state| state.common_child_count) == 0)
.cloned()
.collect();
if self.head_overlap {
Ordering::PartiallyDescends { meet }
} else {
Ordering::NotDescends { meet }
}
}
}
#[cfg(test)]
mod tests {
use ankurah_proto::{AttestationSet, Attested};
use itertools::Itertools;
use super::*;
use async_trait::async_trait;
use std::collections::HashMap;
type TestId = u32;
#[derive(Clone)]
struct TestClock {
members: Vec<TestId>,
}
#[derive(Clone)]
struct TestEvent {
id: TestId,
parent_clock: TestClock,
}
impl TClock for TestClock {
type Id = TestId;
fn members(&self) -> &[Self::Id] { &self.members }
}
impl TEvent for TestEvent {
type Id = TestId;
type Parent = TestClock;
fn id(&self) -> TestId { self.id }
fn parent(&self) -> &TestClock { &self.parent_clock }
}
impl std::fmt::Display for TestEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Event({})", self.id) }
}
struct MockEventStore {
events: HashMap<TestId, Attested<TestEvent>>,
}
impl MockEventStore {
fn new() -> Self { Self { events: HashMap::new() } }
fn add(&mut self, id: TestId, parent_ids: &[TestId]) {
let event = TestEvent { id, parent_clock: TestClock { members: parent_ids.to_vec() } };
let attested = Attested { payload: event, attestations: AttestationSet::default() };
self.events.insert(id, attested);
}
}
#[async_trait]
impl GetEvents for MockEventStore {
type Id = TestId;
type Event = TestEvent;
async fn retrieve_event(&self, event_ids: Vec<Self::Id>) -> Result<(usize, Vec<Attested<Self::Event>>), RetrievalError> {
let mut result = Vec::new();
for id in event_ids {
if let Some(event) = self.events.get(&id) {
result.push(event.clone());
}
}
Ok((1, result))
}
fn stage_events(&self, _events: impl IntoIterator<Item = Attested<Self::Event>>) {
}
fn mark_event_used(&self, _event_id: &Self::Id) {
}
}
#[tokio::test]
async fn test_linear_history() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
let ancestor = TestClock { members: vec![1] };
let descendant = TestClock { members: vec![3] };
assert_eq!(compare(&store, &descendant, &ancestor, 100).await.unwrap(), Ordering::Descends);
assert_eq!(compare(&store, &ancestor, &descendant, 100).await.unwrap(), Ordering::NotDescends { meet: vec![1] });
}
#[tokio::test]
async fn test_concurrent_history() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[1]);
store.add(4, &[1]);
store.add(5, &[2, 3]);
store.add(6, &[3, 4]);
store.add(7, &[5, 6]);
{
let ancestor = TestClock { members: vec![1] };
let descendant = TestClock { members: vec![5] };
assert_eq!(compare(&store, &descendant, &ancestor, 100).await.unwrap(), Ordering::Descends);
assert_eq!(compare(&store, &ancestor, &descendant, 100).await.unwrap(), Ordering::NotDescends { meet: vec![1] });
}
{
let ancestor = TestClock { members: vec![2, 3] };
let descendant = TestClock { members: vec![5] };
assert_eq!(compare(&store, &descendant, &ancestor, 100).await.unwrap(), Ordering::Descends);
assert_eq!(compare(&store, &ancestor, &descendant, 100).await.unwrap(), Ordering::NotDescends { meet: vec![2, 3] });
}
{
let a = TestClock { members: vec![2] };
let b = TestClock { members: vec![3] };
assert_eq!(compare(&store, &a, &b, 100).await.unwrap(), Ordering::NotDescends { meet: vec![1] });
assert_eq!(compare(&store, &b, &a, 100).await.unwrap(), Ordering::NotDescends { meet: vec![1] });
}
{
let a = TestClock { members: vec![6] };
let b = TestClock { members: vec![2, 3] };
assert_eq!(
compare(&store, &a, &b, 100).await.unwrap(),
Ordering::PartiallyDescends { meet: vec![3] }
);
}
}
#[tokio::test]
async fn test_incomparable() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
store.add(4, &[1]);
store.add(5, &[4]);
store.add(6, &[]);
store.add(7, &[6]);
store.add(8, &[7]);
{
let a = TestClock { members: vec![3] };
let b = TestClock { members: vec![8] };
assert_eq!(compare(&store, &a, &b, 100).await.unwrap(), Ordering::Incomparable);
}
{
let a = TestClock { members: vec![2] };
let b = TestClock { members: vec![8] };
assert_eq!(compare(&store, &a, &b, 100).await.unwrap(), Ordering::Incomparable);
}
{
let a = TestClock { members: vec![3] };
let b = TestClock { members: vec![5, 8] };
assert_eq!(compare(&store, &a, &b, 100).await.unwrap(), Ordering::Incomparable);
}
}
#[tokio::test]
async fn test_empty_clocks() {
let mut store = MockEventStore::new();
store.add(1, &[]);
let empty = TestClock { members: vec![] };
let non_empty = TestClock { members: vec![1] };
assert_eq!(compare(&store, &empty, &empty, 100).await.unwrap(), Ordering::Incomparable);
assert_eq!(compare(&store, &non_empty, &empty, 100).await.unwrap(), Ordering::Incomparable);
assert_eq!(compare(&store, &empty, &non_empty, 100).await.unwrap(), Ordering::Incomparable);
}
#[tokio::test]
async fn test_budget_exceeded() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
store.add(4, &[3]);
store.add(5, &[1]);
store.add(6, &[5]);
store.add(7, &[6]);
store.add(8, &[5]);
{
let ancestor = TestClock { members: vec![1] };
let descendant = TestClock { members: vec![4] };
assert_eq!(
compare(&store, &descendant, &ancestor, 2).await.unwrap(),
Ordering::BudgetExceeded { subject_frontier: [2].into(), other_frontier: [].into() }
);
}
{
let ancestor = TestClock { members: vec![1] };
let descendant = TestClock { members: vec![4, 5] };
assert_eq!(compare(&store, &descendant, &ancestor, 10).await.unwrap(), Ordering::Descends);
assert_eq!(
compare(&store, &ancestor, &descendant, 2).await.unwrap(),
Ordering::BudgetExceeded { subject_frontier: [].into(), other_frontier: [2].into() }
);
}
}
#[tokio::test]
async fn test_self_comparison() {
let mut store = MockEventStore::new();
store.add(1, &[]);
let clock = TestClock { members: vec![1] };
assert_eq!(compare(&store, &clock, &clock, 100).await.unwrap(), Ordering::Equal);
}
#[tokio::test]
async fn multiple_roots() {
let mut store = MockEventStore::new();
for id in 1..=6 {
store.add(id, &[]);
}
store.add(7, &[1, 2, 3, 4, 5, 6]);
store.add(8, &[7]);
let subject = TestClock { members: vec![8] };
let big_other = TestClock { members: vec![1, 2, 3, 4, 5, 6] };
assert_eq!(compare(&store, &subject, &big_other, 1_000).await.unwrap(), Ordering::Descends);
assert_eq!(compare(&store, &big_other, &subject, 1_000).await.unwrap(), Ordering::NotDescends { meet: vec![1, 2, 3, 4, 5, 6] });
}
#[tokio::test]
async fn test_compare_event_unstored() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
let unstored_event = TestEvent { id: 4, parent_clock: TestClock { members: vec![3] } };
let clock_1 = TestClock { members: vec![1] };
let clock_2 = TestClock { members: vec![2] };
let clock_3 = TestClock { members: vec![3] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_1, 100).await.unwrap(), Ordering::Descends);
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_2, 100).await.unwrap(), Ordering::Descends);
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_3, 100).await.unwrap(), Ordering::Descends);
let unstored_merge_event = TestEvent { id: 5, parent_clock: TestClock { members: vec![2, 3] } };
assert_eq!(compare_unstored_event(&store, &unstored_merge_event, &clock_1, 100).await.unwrap(), Ordering::Descends);
store.add(10, &[]); let incomparable_clock = TestClock { members: vec![10] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &incomparable_clock, 100).await.unwrap(), Ordering::Incomparable);
let root_event = TestEvent { id: 11, parent_clock: TestClock { members: vec![] } };
let empty_clock = TestClock { members: vec![] };
assert_eq!(compare_unstored_event(&store, &root_event, &empty_clock, 100).await.unwrap(), Ordering::Incomparable);
assert_eq!(compare_unstored_event(&store, &root_event, &clock_1, 100).await.unwrap(), Ordering::Incomparable);
let empty_clock = TestClock { members: vec![] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &empty_clock, 100).await.unwrap(), Ordering::Incomparable);
}
#[tokio::test]
async fn test_compare_event_redundant_delivery() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
let unstored_event = TestEvent { id: 4, parent_clock: TestClock { members: vec![3] } };
let clock_3 = TestClock { members: vec![3] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_3, 100).await.unwrap(), Ordering::Descends);
store.add(4, &[3]);
let clock_with_event = TestClock { members: vec![4] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_with_event, 100).await.unwrap(), Ordering::Equal);
let clock_with_multiple = TestClock { members: vec![3, 4] };
assert_eq!(compare_unstored_event(&store, &unstored_event, &clock_with_multiple, 100).await.unwrap(), Ordering::Incomparable);
}
#[tokio::test]
async fn test_event_accumulator() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
store.add(4, &[3]);
store.add(5, &[4]);
let current = TestClock { members: vec![5] };
let known = TestClock { members: vec![2] };
let accumulator = EventAccumulator::new(None);
let mut comparison = Comparison::new_with_accumulator(&store, ¤t, &known, 100, Some(accumulator));
loop {
if let Some(ordering) = comparison.step().await.unwrap() {
assert_eq!(ordering, Ordering::Descends);
break;
}
}
let events = comparison.take_accumulated_events().unwrap();
assert_eq!(events.iter().map(|e| e.payload.id()).sorted().collect::<Vec<_>>(), vec![3, 4, 5]);
}
#[tokio::test]
async fn test_event_accumulator_with_concurrent_history() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[1]);
store.add(4, &[1]);
store.add(5, &[2, 3]);
store.add(6, &[3, 4]);
store.add(7, &[5, 6]);
let current = TestClock { members: vec![7] };
let known = TestClock { members: vec![1] };
let accumulator = EventAccumulator::new(None);
let mut comparison = Comparison::new_with_accumulator(&store, ¤t, &known, 100, Some(accumulator));
loop {
if let Some(ordering) = comparison.step().await.unwrap() {
assert_eq!(ordering, Ordering::Descends);
break;
}
}
let events = comparison.take_accumulated_events().unwrap();
let event_ids: Vec<TestId> = events.iter().map(|e| e.payload.id()).collect();
assert_eq!(event_ids.len(), 6); assert!(event_ids.contains(&7));
assert!(event_ids.contains(&5));
assert!(event_ids.contains(&6));
assert!(event_ids.contains(&2));
assert!(event_ids.contains(&3));
assert!(event_ids.contains(&4));
assert!(!event_ids.contains(&1));
}
#[tokio::test]
async fn test_event_accumulator_equal_clocks() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[2]);
let current = TestClock { members: vec![3] };
let known = TestClock { members: vec![3] };
let accumulator = EventAccumulator::new(None);
let mut comparison = Comparison::new_with_accumulator(&store, ¤t, &known, 100, Some(accumulator));
loop {
if let Some(ordering) = comparison.step().await.unwrap() {
assert_eq!(ordering, Ordering::Equal);
break;
}
}
let events = comparison.take_accumulated_events().unwrap();
assert_eq!(events.len(), 0);
}
#[tokio::test]
async fn test_event_accumulator_only_subject_side() {
let mut store = MockEventStore::new();
store.add(1, &[]);
store.add(2, &[1]);
store.add(3, &[1]);
store.add(4, &[2]);
store.add(5, &[3]);
let subject = TestClock { members: vec![4] };
let other = TestClock { members: vec![5] };
let accumulator = EventAccumulator::new(None);
let mut comparison = Comparison::new_with_accumulator(&store, &subject, &other, 100, Some(accumulator));
loop {
if let Some(ordering) = comparison.step().await.unwrap() {
assert!(matches!(ordering, Ordering::NotDescends { .. }));
break;
}
}
let events = comparison.take_accumulated_events().unwrap();
let event_ids: Vec<TestId> = events.iter().map(|e| e.payload.id()).collect();
assert!(event_ids.contains(&4));
assert!(event_ids.contains(&2));
assert!(!event_ids.contains(&5));
assert!(!event_ids.contains(&3));
}
}