use std::collections::{BTreeMap, BTreeSet};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AncestorAction {
Proceed,
Fetch {
missing: Vec<String>,
},
Fork {
triggering_missing: Vec<String>,
reason: String,
},
}
impl AncestorAction {
#[must_use]
pub fn proceeds(&self) -> bool {
matches!(self, Self::Proceed)
}
}
pub fn is_causally_closed<I, S>(events: I) -> bool
where
I: IntoIterator<Item = (S, Vec<S>)>,
S: AsRef<str>,
{
let pairs: Vec<(String, Vec<String>)> = events
.into_iter()
.map(|(id, parents)| {
(
id.as_ref().to_string(),
parents.iter().map(|p| p.as_ref().to_string()).collect(),
)
})
.collect();
let known: BTreeSet<&str> = pairs.iter().map(|(id, _)| id.as_str()).collect();
for (_, parents) in &pairs {
for p in parents {
if !known.contains(p.as_str()) {
return false;
}
}
}
true
}
pub fn missing_ancestors<I, S>(events: I) -> Vec<String>
where
I: IntoIterator<Item = (S, Vec<S>)>,
S: AsRef<str>,
{
let pairs: Vec<(String, Vec<String>)> = events
.into_iter()
.map(|(id, parents)| {
(
id.as_ref().to_string(),
parents.iter().map(|p| p.as_ref().to_string()).collect(),
)
})
.collect();
let known: BTreeSet<String> = pairs.iter().map(|(id, _)| id.clone()).collect();
let mut missing: BTreeSet<String> = BTreeSet::new();
for (_, parents) in &pairs {
for p in parents {
if !known.contains(p) {
missing.insert(p.clone());
}
}
}
missing.into_iter().collect()
}
pub fn causal_closure<I1, I2, S>(received: I1, available: I2) -> Result<Vec<String>, Vec<String>>
where
I1: IntoIterator<Item = (S, Vec<S>)>,
I2: IntoIterator<Item = (S, Vec<S>)>,
S: AsRef<str>,
{
let pool: BTreeMap<String, Vec<String>> = available
.into_iter()
.map(|(id, parents)| {
(
id.as_ref().to_string(),
parents.iter().map(|p| p.as_ref().to_string()).collect(),
)
})
.collect();
let received_ids: Vec<String> = received
.into_iter()
.map(|(id, _)| id.as_ref().to_string())
.collect();
let mut closure: BTreeSet<String> = BTreeSet::new();
let mut missing: BTreeSet<String> = BTreeSet::new();
let mut queue: Vec<String> = received_ids.clone();
while let Some(id) = queue.pop() {
if closure.contains(&id) {
continue;
}
match pool.get(&id) {
Some(parents) => {
closure.insert(id.clone());
for p in parents {
if !closure.contains(p) {
queue.push(p.clone());
}
}
}
None => {
missing.insert(id.clone());
}
}
}
if missing.is_empty() {
Ok(closure.into_iter().collect())
} else {
Err(missing.into_iter().collect())
}
}
pub fn classify_ancestor_action<I, S>(events: I) -> AncestorAction
where
I: IntoIterator<Item = (S, Vec<S>)>,
S: AsRef<str>,
{
let missing = missing_ancestors(events);
if missing.is_empty() {
AncestorAction::Proceed
} else {
AncestorAction::Fetch { missing }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(id: &str, parents: &[&str]) -> (String, Vec<String>) {
(
id.to_string(),
parents.iter().map(|s| (*s).to_string()).collect(),
)
}
fn evs(events: &[(String, Vec<String>)]) -> Vec<(String, Vec<String>)> {
events.to_vec()
}
#[test]
fn empty_set_is_closed() {
let events: Vec<(String, Vec<String>)> = vec![];
assert!(is_causally_closed(evs(&events)));
assert!(missing_ancestors(evs(&events)).is_empty());
assert_eq!(
classify_ancestor_action(evs(&events)),
AncestorAction::Proceed
);
}
#[test]
fn root_event_with_no_parents_is_closed() {
let events = vec![ev("e1", &[])];
assert!(is_causally_closed(evs(&events)));
assert!(missing_ancestors(evs(&events)).is_empty());
}
#[test]
fn linear_chain_is_closed() {
let events = vec![ev("e1", &[]), ev("e2", &["e1"]), ev("e3", &["e2"])];
assert!(is_causally_closed(evs(&events)));
assert!(missing_ancestors(evs(&events)).is_empty());
}
#[test]
fn missing_root_is_detected() {
let events = vec![ev("e2", &["e1"]), ev("e3", &["e2"])];
assert!(!is_causally_closed(evs(&events)));
assert_eq!(missing_ancestors(evs(&events)), vec!["e1"]);
}
#[test]
fn multiple_missing_ancestors_are_deduplicated_and_sorted() {
let events = vec![
ev("e3", &["e1", "e2"]),
ev("e4", &["e2", "e1"]),
ev("e5", &["e3"]),
];
assert_eq!(missing_ancestors(evs(&events)), vec!["e1", "e2"]);
}
#[test]
fn classify_returns_fetch_when_missing() {
let events = vec![ev("e2", &["e1"])];
let action = classify_ancestor_action(evs(&events));
match action {
AncestorAction::Fetch { missing } => assert_eq!(missing, vec!["e1"]),
other => panic!("expected Fetch, got {other:?}"),
}
}
#[test]
fn classify_returns_proceed_when_closed() {
let events = vec![ev("e1", &[]), ev("e2", &["e1"])];
assert_eq!(
classify_ancestor_action(evs(&events)),
AncestorAction::Proceed
);
}
#[test]
fn diamond_is_closed_when_complete() {
let events = vec![
ev("e1", &[]),
ev("e2", &["e1"]),
ev("e3", &["e1"]),
ev("e4", &["e2", "e3"]),
];
assert!(is_causally_closed(evs(&events)));
}
#[test]
fn causal_closure_walks_back_through_pool() {
let pool = vec![
ev("e1", &[]),
ev("e2", &["e1"]),
ev("e3", &["e2"]),
ev("e4", &["e3"]),
];
let received = vec![ev("e3", &["e2"])];
let closure = causal_closure(received, pool.clone()).unwrap();
assert_eq!(closure, vec!["e1", "e2", "e3"]);
}
#[test]
fn causal_closure_reports_unreachable_missing() {
let pool = vec![ev("e2", &["e1"]), ev("e3", &["e2"])];
let received = vec![ev("e3", &["e2"])];
let result = causal_closure(received, pool);
match result {
Err(missing) => assert_eq!(missing, vec!["e1"]),
Ok(_) => panic!("expected unreachable missing"),
}
}
#[test]
fn ancestor_action_fork_carries_reason() {
let action = AncestorAction::Fork {
triggering_missing: vec!["e1".to_string()],
reason: "peer hub partitioned; proceed with local view".to_string(),
};
match &action {
AncestorAction::Fork {
triggering_missing,
reason,
} => {
assert_eq!(triggering_missing, &vec!["e1"]);
assert!(reason.starts_with("peer hub"));
}
other => panic!("expected Fork, got {other:?}"),
}
assert!(!action.proceeds());
}
#[test]
fn ancestor_action_serde_round_trip() {
let actions = vec![
AncestorAction::Proceed,
AncestorAction::Fetch {
missing: vec!["e1".to_string()],
},
AncestorAction::Fork {
triggering_missing: vec!["e2".to_string()],
reason: "fork policy invoked".to_string(),
},
];
for action in actions {
let json = serde_json::to_string(&action).unwrap();
let back: AncestorAction = serde_json::from_str(&json).unwrap();
assert_eq!(back, action);
}
}
#[test]
fn closure_handles_already_received_events() {
let pool = vec![ev("e1", &[]), ev("e2", &["e1"])];
let received = vec![ev("e1", &[]), ev("e2", &["e1"])];
let closure = causal_closure(received, pool).unwrap();
assert_eq!(closure, vec!["e1", "e2"]);
}
}