use std::collections::HashSet;
use std::future::Future;
use chrono::{DateTime, FixedOffset};
use crate::memory::Scope;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RelationCardinality {
SingleValued,
MultiValued,
}
#[derive(Debug, Clone, Default)]
pub struct CardinalityPolicy {
single_valued: HashSet<String>,
}
impl CardinalityPolicy {
pub fn with_single_valued<I, S>(relations: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
single_valued: relations.into_iter().map(|relation| relation.into().to_lowercase()).collect(),
}
}
pub fn cardinality(&self, relation: &str) -> RelationCardinality {
if self.single_valued.contains(&relation.to_lowercase()) {
RelationCardinality::SingleValued
} else {
RelationCardinality::MultiValued
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Edge {
pub subject_key: String,
pub relation: String,
pub object_key: String,
pub confidence: f32,
pub valid_from: DateTime<FixedOffset>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ExistingEdge {
pub subject_key: String,
pub relation: String,
pub object_key: String,
pub valid_from: DateTime<FixedOffset>,
pub valid_to: Option<DateTime<FixedOffset>>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct EdgeResolution {
pub close: Vec<ExistingEdge>,
pub open: Edge,
}
pub trait EdgeCatalog: Send + Sync + 'static {
fn current_edges(
&self,
scope: &Scope,
subject_key: &str,
relation: &str,
) -> impl Future<Output = Result<Vec<ExistingEdge>, EdgeError>> + Send;
}
pub trait EdgeResolver: Send + Sync + 'static {
fn resolve(&self, scope: &Scope, edge: Edge) -> impl Future<Output = Result<EdgeResolution, EdgeError>> + Send;
}
#[derive(Debug, thiserror::Error)]
pub enum EdgeError {
#[error("edge catalog read failed: {0}")]
Catalog(String),
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NaiveAppendResolver;
impl NaiveAppendResolver {
pub fn new() -> Self {
Self
}
}
impl EdgeResolver for NaiveAppendResolver {
async fn resolve(&self, _scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
Ok(EdgeResolution {
close: Vec::new(),
open: edge,
})
}
}
#[derive(Debug, Clone)]
pub struct TemporalEdgeResolver<C> {
catalog: C,
policy: CardinalityPolicy,
}
impl<C: EdgeCatalog> TemporalEdgeResolver<C> {
pub fn new(catalog: C, policy: CardinalityPolicy) -> Self {
Self { catalog, policy }
}
}
impl<C: EdgeCatalog> EdgeResolver for TemporalEdgeResolver<C> {
async fn resolve(&self, scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
if self.policy.cardinality(&edge.relation) == RelationCardinality::MultiValued {
return Ok(EdgeResolution {
close: Vec::new(),
open: edge,
});
}
let current = self.catalog.current_edges(scope, &edge.subject_key, &edge.relation).await?;
let mut open = edge;
let mut close = Vec::new();
for existing in current {
if existing.object_key == open.object_key {
if existing.valid_from < open.valid_from {
open.valid_from = existing.valid_from;
}
} else {
close.push(existing);
}
}
Ok(EdgeResolution { close, open })
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
fn scope() -> Scope {
Scope {
agent_id: "agent".to_string(),
org_id: "org".to_string(),
user_id: "user".to_string(),
}
}
fn at(day: u32) -> DateTime<FixedOffset> {
DateTime::parse_from_rfc3339(&format!("2026-06-{day:02}T00:00:00Z")).expect("valid test date")
}
fn edge(subject: &str, relation: &str, object: &str, day: u32) -> Edge {
Edge {
subject_key: subject.to_string(),
relation: relation.to_string(),
object_key: object.to_string(),
confidence: 0.9,
valid_from: at(day),
}
}
#[derive(Default)]
struct InMemoryEdgeCatalog {
edges: Mutex<Vec<ExistingEdge>>,
}
impl InMemoryEdgeCatalog {
fn with(edges: Vec<ExistingEdge>) -> Self {
Self { edges: Mutex::new(edges) }
}
}
impl EdgeCatalog for InMemoryEdgeCatalog {
async fn current_edges(
&self,
_scope: &Scope,
subject_key: &str,
relation: &str,
) -> Result<Vec<ExistingEdge>, EdgeError> {
Ok(self
.edges
.lock()
.expect("edge catalog mutex poisoned")
.iter()
.filter(|existing| {
existing.valid_to.is_none() && existing.subject_key == subject_key && existing.relation == relation
})
.cloned()
.collect())
}
}
fn existing(subject: &str, relation: &str, object: &str, day: u32) -> ExistingEdge {
ExistingEdge {
subject_key: subject.to_string(),
relation: relation.to_string(),
object_key: object.to_string(),
valid_from: at(day),
valid_to: None,
}
}
#[tokio::test(flavor = "current_thread")]
async fn should_append_without_closing_under_naive_resolver() {
let resolver = NaiveAppendResolver::new();
let resolution = resolver
.resolve(&scope(), edge("alice", "works at", "globex", 2))
.await
.unwrap();
assert!(resolution.close.is_empty());
assert_eq!(resolution.open.object_key, "globex");
}
#[tokio::test(flavor = "current_thread")]
async fn should_close_conflicting_single_valued_edge() {
let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
let policy = CardinalityPolicy::with_single_valued(["works at"]);
let resolver = TemporalEdgeResolver::new(catalog, policy);
let resolution = resolver
.resolve(&scope(), edge("alice", "works at", "globex", 2))
.await
.unwrap();
assert_eq!(resolution.close, vec![existing("alice", "works at", "acme", 1)]);
assert_eq!(resolution.open.object_key, "globex");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_close_multi_valued_edges() {
let catalog = InMemoryEdgeCatalog::with(vec![
existing("team", "deployed", "weekend", 1),
existing("team", "deployed", "monday", 3),
]);
let policy = CardinalityPolicy::with_single_valued(["works at"]);
let resolver = TemporalEdgeResolver::new(catalog, policy);
let resolution = resolver
.resolve(&scope(), edge("team", "deployed", "today", 6))
.await
.unwrap();
assert!(resolution.close.is_empty());
assert_eq!(resolution.open.object_key, "today");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_close_when_same_object_restated() {
let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
let policy = CardinalityPolicy::with_single_valued(["works at"]);
let resolver = TemporalEdgeResolver::new(catalog, policy);
let resolution = resolver
.resolve(&scope(), edge("alice", "works at", "acme", 2))
.await
.unwrap();
assert!(resolution.close.is_empty());
}
#[tokio::test(flavor = "current_thread")]
async fn should_fold_restated_fact_by_adopting_existing_valid_from() {
let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
let policy = CardinalityPolicy::with_single_valued(["works at"]);
let resolver = TemporalEdgeResolver::new(catalog, policy);
let resolution = resolver
.resolve(&scope(), edge("alice", "works at", "acme", 9))
.await
.unwrap();
assert_eq!(resolution.open.valid_from, at(1));
}
#[tokio::test(flavor = "current_thread")]
async fn should_close_low_confidence_new_edge_over_high_confidence_old() {
let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
let policy = CardinalityPolicy::with_single_valued(["works at"]);
let resolver = TemporalEdgeResolver::new(catalog, policy);
let mut hedged = edge("alice", "works at", "globex", 2);
hedged.confidence = 0.3;
let resolution = resolver.resolve(&scope(), hedged).await.unwrap();
assert_eq!(resolution.close.len(), 1);
assert_eq!(resolution.close[0].object_key, "acme");
}
#[test]
fn should_default_unknown_relations_to_multi_valued() {
let policy = CardinalityPolicy::with_single_valued(["works at"]);
assert_eq!(policy.cardinality("knows"), RelationCardinality::MultiValued);
assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
}
#[test]
fn should_classify_cardinality_case_insensitively() {
let policy = CardinalityPolicy::with_single_valued(["Works At"]);
assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
}
#[test]
fn should_treat_empty_policy_as_all_multi_valued() {
let policy = CardinalityPolicy::default();
assert_eq!(policy.cardinality("works at"), RelationCardinality::MultiValued);
}
#[test]
fn should_carry_event_time_as_valid_from() {
let backdated = edge("alice", "works at", "acme", 1);
let current = edge("alice", "works at", "globex", 5);
assert!(backdated.valid_from < current.valid_from);
}
}