#![allow(missing_docs)]
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use mempill_types::{
AssertionKind, Belief, BeliefStatus, Cardinality, Claim, ClaimRef, CurrencySignal,
CurrencyState, Disposition, StalenessFlag, ValidityAssertion,
};
use crate::config::EngineConfig;
use crate::engine::valid_time_helpers;
fn is_non_live_disposition(d: &Disposition) -> bool {
matches!(
d,
Disposition::Quarantined
| Disposition::Superseded
| Disposition::Invalidated
| Disposition::Rejected
)
}
#[derive(Debug, Clone)]
pub(crate) struct FoldResult {
pub live_claims: Vec<ClaimWithStatus>,
pub has_conflict: bool,
pub succession_selected: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct ClaimWithStatus {
pub claim: Claim,
pub is_live: bool,
pub last_disposition: Option<Disposition>,
}
fn ordering_key(claim: &Claim, config: &EngineConfig) -> (DateTime<Utc>, DateTime<Utc>, u128) {
let primary = if claim.valid_time().valid_time_confidence >= config.valid_time_confidence_threshold {
claim.valid_time().start.unwrap_or(claim.transaction_time().0)
} else {
claim.transaction_time().0
};
(primary, claim.transaction_time().0, claim.claim_ref().0.as_u128())
}
pub(crate) fn is_claim_live(
assertions: &[ValidityAssertion],
as_of_tx_time: DateTime<Utc>,
) -> bool {
let mut sorted: Vec<&ValidityAssertion> = assertions.iter().collect();
sorted.sort_by(|a, b| {
a.asserted_at.0.cmp(&b.asserted_at.0)
.then(a.assertion_ref.cmp(&b.assertion_ref)) });
let mut live = true;
for assertion in sorted {
if assertion.asserted_at.0 > as_of_tx_time {
continue;
}
match &assertion.kind {
AssertionKind::Bound { bound_at } => {
if *bound_at <= as_of_tx_time {
live = false;
}
}
AssertionKind::Reopen { reopen_at } => {
if *reopen_at <= as_of_tx_time {
live = true;
}
}
_ => {}
}
}
live
}
pub(crate) fn fold<F>(
mut claims: Vec<Claim>,
assertions_for: F,
as_of_tx_time: DateTime<Utc>,
config: &EngineConfig,
latest_disposition: &HashMap<ClaimRef, Disposition>,
) -> FoldResult
where
F: Fn(&ClaimRef) -> Vec<ValidityAssertion>,
{
claims.sort_by(|a, b| {
let ka = ordering_key(a, config);
let kb = ordering_key(b, config);
ka.cmp(&kb)
});
let mut with_status: Vec<ClaimWithStatus> = claims
.into_iter()
.map(|c| {
let last_disp = latest_disposition.get(c.claim_ref()).cloned();
let disposition_live = last_disp
.as_ref()
.map(|d| !is_non_live_disposition(d))
.unwrap_or(true); let assertions = assertions_for(c.claim_ref());
let assertion_live = is_claim_live(&assertions, as_of_tx_time);
let live = assertion_live && disposition_live;
ClaimWithStatus {
claim: c,
is_live: live,
last_disposition: last_disp,
}
})
.collect();
let live_claims: Vec<ClaimWithStatus> = with_status
.iter()
.filter(|c| c.is_live)
.cloned()
.collect();
let (live_claims, succession_selected) = if live_claims.len() > 1 {
let live_claim_refs: Vec<&Claim> = live_claims.iter().map(|cs| &cs.claim).collect();
if valid_time_helpers::is_trusted_succession(&live_claim_refs, config.valid_time_confidence_threshold) {
let selected = valid_time_helpers::select_by_valid_time_instant(&live_claim_refs, as_of_tx_time);
let selected_ref = selected.map(|c| c.claim_ref().clone());
drop(live_claim_refs); let narrowed: Vec<ClaimWithStatus> = match selected_ref {
Some(ref cref) => live_claims.into_iter()
.filter(|cs| cs.claim.claim_ref() == cref)
.collect(),
None => vec![], };
(narrowed, true)
} else {
(live_claims, false)
}
} else {
(live_claims, false)
};
let functional_live_count = live_claims
.iter()
.filter(|c| *c.claim.cardinality() == Cardinality::Functional)
.count();
let has_conflict = functional_live_count > 1 || (live_claims.len() > 1 && {
live_claims.iter().any(|c| *c.claim.cardinality() != Cardinality::SetValued)
});
for cs in &mut with_status {
cs.is_live = live_claims.iter().any(|lc| lc.claim.claim_ref() == cs.claim.claim_ref());
}
FoldResult { live_claims, has_conflict, succession_selected }
}
pub(crate) fn claim_to_belief(cs: &ClaimWithStatus) -> Belief {
Belief {
claim_ref: cs.claim.claim_ref().clone(),
fact: cs.claim.fact().clone(),
provenance: cs.claim.provenance().clone(),
valid_time: cs.claim.valid_time().clone(),
transaction_time: cs.claim.transaction_time().clone(),
confidence: cs.claim.confidence().clone(),
currency_signal: CurrencySignal {
last_refreshed_at: cs.claim.transaction_time().clone(),
state: CurrencyState::Fresh, corroboration_count: 0,
},
criticality: cs.claim.criticality().clone(),
}
}
pub(crate) fn fold_status(
fold: &FoldResult,
has_pending_review: bool,
) -> BeliefStatus {
let _ = has_pending_review; if fold.live_claims.is_empty() {
BeliefStatus::NoBelief
} else if fold.has_conflict {
BeliefStatus::Conflict
} else if fold.live_claims.len() == 1 {
let c = &fold.live_claims[0].claim;
if c.valid_time().is_unknown() {
BeliefStatus::TimingUncertain
} else {
BeliefStatus::Resolved
}
} else {
BeliefStatus::Resolved
}
}
pub(crate) fn fold_staleness(fold: &FoldResult) -> StalenessFlag {
if fold.live_claims.is_empty() {
StalenessFlag { is_stale: true, reason: Some("no live claim on subject-line".into()) }
} else {
StalenessFlag { is_stale: false, reason: None }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::EngineConfig;
use chrono::Utc;
use mempill_types::{
AgentId, AssertionKind, Cardinality, ClaimRef, Confidence, ExternalAnchor, ExternalKind,
Fact, ProvenanceLabel, TransactionTime, ValidTime, ValidityAssertion,
};
fn no_dispositions() -> std::collections::HashMap<ClaimRef, Disposition> {
std::collections::HashMap::new()
}
fn agent() -> AgentId {
AgentId("agent-1".into())
}
#[allow(clippy::too_many_arguments)]
fn make_claim(
agent_id: &AgentId,
subject: &str,
predicate: &str,
value: serde_json::Value,
tx_time: DateTime<Utc>,
vt_start: Option<DateTime<Utc>>,
vt_confidence: f32,
cardinality: Cardinality,
) -> Claim {
Claim::new(
ClaimRef::new_random(),
agent_id.clone(),
Fact { subject: subject.into(), predicate: predicate.into(), value },
cardinality,
ProvenanceLabel::External(ExternalKind::UserAsserted),
ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime(tx_time),
ValidTime { start: vt_start, end: None, valid_time_confidence: vt_confidence },
Confidence { value_confidence: 0.9, valid_time_confidence: vt_confidence },
mempill_types::Criticality::Medium,
vec![],
None,
None,
)
}
fn no_assertions(_: &ClaimRef) -> Vec<ValidityAssertion> {
vec![]
}
fn now() -> DateTime<Utc> {
Utc::now()
}
#[test]
fn fold_determinism_i8_same_claims_different_order() {
let config = EngineConfig::default();
let agent = agent();
let t1 = Utc::now() - chrono::Duration::hours(10);
let t2 = Utc::now() - chrono::Duration::hours(5);
let t3 = Utc::now() - chrono::Duration::hours(1);
let c1 = make_claim(&agent, "user", "name", serde_json::json!("Alice"), t1, None, 0.0, Cardinality::Functional);
let c2 = make_claim(&agent, "user", "name", serde_json::json!("Bob"), t2, None, 0.0, Cardinality::Functional);
let c3 = make_claim(&agent, "user", "name", serde_json::json!("Carol"), t3, None, 0.0, Cardinality::Functional);
let order_a = vec![c1.clone(), c2.clone(), c3.clone()];
let order_b = vec![c3.clone(), c1.clone(), c2.clone()];
let order_c = vec![c2.clone(), c3.clone(), c1.clone()];
let disp = no_dispositions();
let result_a = fold(order_a, no_assertions, now(), &config, &disp);
let result_b = fold(order_b, no_assertions, now(), &config, &disp);
let result_c = fold(order_c, no_assertions, now(), &config, &disp);
assert_eq!(result_a.live_claims.len(), result_b.live_claims.len(), "live count must be arrival-order independent");
assert_eq!(result_b.live_claims.len(), result_c.live_claims.len(), "live count must be arrival-order independent");
let refs_a: Vec<ClaimRef> = result_a.live_claims.iter().map(|c| c.claim.claim_ref().clone()).collect();
let refs_b: Vec<ClaimRef> = result_b.live_claims.iter().map(|c| c.claim.claim_ref().clone()).collect();
let refs_c: Vec<ClaimRef> = result_c.live_claims.iter().map(|c| c.claim.claim_ref().clone()).collect();
assert_eq!(refs_a, refs_b, "I8: canonical order must be arrival-independent (A vs B)");
assert_eq!(refs_b, refs_c, "I8: canonical order must be arrival-independent (B vs C)");
}
#[test]
fn ordering_key_high_confidence_uses_valid_time() {
let config = EngineConfig::default(); let agent = agent();
let tx_early = Utc::now() - chrono::Duration::hours(20);
let tx_late = Utc::now() - chrono::Duration::hours(1);
let vt_very_early = Utc::now() - chrono::Duration::days(365);
let claim_a = make_claim(
&agent, "user", "name", serde_json::json!("Alice"),
tx_early, Some(vt_very_early), 0.9, Cardinality::Functional,
);
let claim_b = make_claim(
&agent, "user", "name", serde_json::json!("Bob"),
tx_late, None, 0.3, Cardinality::Functional,
);
let key_a = ordering_key(&claim_a, &config);
let key_b = ordering_key(&claim_b, &config);
assert!(
key_a.0 < key_b.0,
"high-confidence valid_time_start should be the ordering key for claim_a"
);
}
#[test]
fn ordering_key_low_confidence_uses_tx_time() {
let config = EngineConfig::default(); let agent = agent();
let tx_time = Utc::now();
let vt_future = tx_time + chrono::Duration::days(10);
let claim = make_claim(
&agent, "user", "city", serde_json::json!("Paris"),
tx_time, Some(vt_future), 0.3, Cardinality::Functional,
);
let key = ordering_key(&claim, &config);
assert_eq!(key.0, tx_time, "low-confidence claim must use tx_time as ordering key");
}
#[test]
fn supersession_fold_bounded_incumbent_not_live() {
let config = EngineConfig::default();
let agent = agent();
let t_old = Utc::now() - chrono::Duration::hours(5);
let t_new = Utc::now() - chrono::Duration::hours(1);
let bound_time = Utc::now() - chrono::Duration::hours(3);
let query_now = now();
let incumbent = make_claim(
&agent, "user", "role", serde_json::json!("viewer"),
t_old, None, 0.0, Cardinality::Functional,
);
let incumbent_ref = incumbent.claim_ref().clone();
let newer = make_claim(
&agent, "user", "role", serde_json::json!("admin"),
t_new, None, 0.0, Cardinality::Functional,
);
let newer_ref = newer.claim_ref().clone();
let claims = vec![incumbent, newer];
let bound_assertion = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
target_claim: incumbent_ref.clone(),
kind: AssertionKind::Bound { bound_at: bound_time },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
asserted_at: TransactionTime(bound_time),
};
let bound_ref = incumbent_ref.clone();
let assertions_fn = move |cr: &ClaimRef| -> Vec<ValidityAssertion> {
if *cr == bound_ref {
vec![bound_assertion.clone()]
} else {
vec![]
}
};
let result = fold(claims, assertions_fn, query_now, &config, &no_dispositions());
assert_eq!(result.live_claims.len(), 1, "only the newer claim should be live");
assert_eq!(
*result.live_claims[0].claim.claim_ref(), newer_ref,
"live claim should be the newer one"
);
assert!(!result.has_conflict, "no conflict when only one live claim remains");
}
#[test]
fn supersession_fold_incumbent_retained_in_history() {
let config = EngineConfig::default();
let agent = agent();
let t_old = Utc::now() - chrono::Duration::hours(5);
let t_new = Utc::now() - chrono::Duration::hours(1);
let bound_time = Utc::now() - chrono::Duration::hours(3);
let query_now = now();
let incumbent = make_claim(
&agent, "user", "role", serde_json::json!("viewer"),
t_old, None, 0.0, Cardinality::Functional,
);
let incumbent_ref = incumbent.claim_ref().clone();
let newer = make_claim(
&agent, "user", "role", serde_json::json!("admin"),
t_new, None, 0.0, Cardinality::Functional,
);
let claims = vec![incumbent, newer];
let bound_assertion = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
target_claim: incumbent_ref.clone(),
kind: AssertionKind::Bound { bound_at: bound_time },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
asserted_at: TransactionTime(bound_time),
};
let bound_ref = incumbent_ref.clone();
let assertions_fn = move |cr: &ClaimRef| -> Vec<ValidityAssertion> {
if *cr == bound_ref {
vec![bound_assertion.clone()]
} else {
vec![]
}
};
let result = fold(claims, assertions_fn, query_now, &config, &no_dispositions());
assert_eq!(result.live_claims.len(), 1, "one live; incumbent is bounded not deleted");
}
#[test]
fn contested_two_live_functional_claims_has_conflict() {
let config = EngineConfig::default();
let agent = agent();
let t1 = Utc::now() - chrono::Duration::hours(5);
let t2 = Utc::now() - chrono::Duration::hours(1);
let c1 = make_claim(&agent, "user", "role", serde_json::json!("admin"), t1, None, 0.0, Cardinality::Functional);
let c2 = make_claim(&agent, "user", "role", serde_json::json!("viewer"), t2, None, 0.0, Cardinality::Functional);
let result = fold(vec![c1, c2], no_assertions, now(), &config, &no_dispositions());
assert!(result.has_conflict, "two live Functional claims must produce has_conflict=true (I7)");
assert_eq!(result.live_claims.len(), 2, "both live claims retained (I7 never silently picks)");
}
#[test]
fn set_valued_multiple_live_not_conflict() {
let config = EngineConfig::default();
let agent = agent();
let t1 = Utc::now() - chrono::Duration::hours(5);
let t2 = Utc::now() - chrono::Duration::hours(1);
let c1 = make_claim(&agent, "user", "tag", serde_json::json!("rust"), t1, None, 0.0, Cardinality::SetValued);
let c2 = make_claim(&agent, "user", "tag", serde_json::json!("python"), t2, None, 0.0, Cardinality::SetValued);
let result = fold(vec![c1, c2], no_assertions, now(), &config, &no_dispositions());
assert!(!result.has_conflict, "set-valued claims should not produce conflict");
assert_eq!(result.live_claims.len(), 2, "both set-valued claims are live");
}
#[test]
fn bound_then_reopen_claim_is_live() {
let _config = EngineConfig::default();
let agent = agent();
let t0 = Utc::now() - chrono::Duration::hours(10);
let bound_at = Utc::now() - chrono::Duration::hours(5);
let reopen_at = Utc::now() - chrono::Duration::hours(2);
let claim = make_claim(
&agent, "user", "status", serde_json::json!("active"),
t0, None, 0.0, Cardinality::Functional,
);
let claim_ref = claim.claim_ref().clone();
let bound = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
target_claim: claim_ref.clone(),
kind: AssertionKind::Bound { bound_at },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
asserted_at: TransactionTime(bound_at),
};
let reopen = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
target_claim: claim_ref.clone(),
kind: AssertionKind::Reopen { reopen_at },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
asserted_at: TransactionTime(reopen_at),
};
let assertions = vec![bound, reopen];
let is_live = is_claim_live(&assertions, now());
assert!(is_live, "a claim that was bounded then reopened should be live");
}
#[test]
fn now_injection_different_now_yields_different_liveness() {
let config = EngineConfig::default();
let agent = agent();
let t0 = Utc::now() - chrono::Duration::hours(10);
let bound_at_future_relative_to_past_now = Utc::now() - chrono::Duration::hours(3);
let claim = make_claim(
&agent, "user", "city", serde_json::json!("Berlin"),
t0, None, 0.0, Cardinality::Functional,
);
let claim_ref = claim.claim_ref().clone();
let bound = ValidityAssertion {
assertion_ref: uuid::Uuid::new_v4(),
agent_id: agent.clone(),
target_claim: claim_ref.clone(),
kind: AssertionKind::Bound { bound_at: bound_at_future_relative_to_past_now },
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
asserted_at: TransactionTime(bound_at_future_relative_to_past_now),
};
let assertions = vec![bound];
let assertions_fn_clone = assertions.clone();
let disp = no_dispositions();
let past_now = t0 + chrono::Duration::hours(1);
let result_past = fold(
vec![claim.clone()],
|_| assertions_fn_clone.clone(),
past_now,
&config,
&disp,
);
let present_now = now();
let result_present = fold(
vec![claim],
|_| assertions.clone(),
present_now,
&config,
&disp,
);
assert_eq!(result_past.live_claims.len(), 1, "claim should be live before the bound");
assert_eq!(result_present.live_claims.len(), 0, "claim should be bounded at present");
}
}