Skip to main content

memoir_core/graph/
edge.rs

1//! Reconciling a new relationship edge against the edges already in the graph.
2//!
3//! Once a triple's subject and object resolve to canonical nodes
4//! ([`super::resolve`]), the *edge* between them must be reconciled against
5//! existing edges. A new fact may **contradict** an existing one — "Alice works
6//! at Acme" then later "Alice works at Globex" — and the graph must record the
7//! change without losing history. [`EdgeResolver`] is the seam that decides what
8//! happens to existing edges when a new one arrives.
9//!
10//! **Contradiction is not Forget.** A contradiction *invalidates* an edge (keeps
11//! it, marks it closed) — it does not delete it. Deletion is the Forget path
12//! (reference-counted, a separate ticket). These stay distinct operations.
13//!
14//! Two implementations ship: [`NaiveAppendResolver`] (the benchmark floor —
15//! every edge is added, nothing invalidated) and [`TemporalEdgeResolver`] (the
16//! production impl — a conflicting edge is closed and the new one opened,
17//! preserving the chain). The temporal model echoes the row-level supersession
18//! model ([`crate::memory::SupersessionInfo`]: a newer fact won, a normal
19//! lifecycle event, not an error) at the edge level — it is not a reuse of that
20//! type, because edges are graph elements, not memory rows.
21
22use std::collections::HashSet;
23use std::future::Future;
24
25use chrono::{DateTime, FixedOffset};
26
27use crate::memory::Scope;
28
29/// Whether a subject may hold one relation to many objects at the same time.
30///
31/// The axis that decides whether a new edge *contradicts* an existing one or
32/// merely *adds* to it. A subject works at one employer at a time
33/// ([`Self::SingleValued`] — a new `works_at` supersedes the old), but deploys
34/// to many environments over time and knows many people at once
35/// ([`Self::MultiValued`] — every `deployed`/`knows` edge coexists).
36///
37/// This is *simultaneous* cardinality, not "does the relation ever repeat":
38/// "we deployed last weekend" and "we deployed Monday" are two true events, so
39/// `deployed` is multi-valued and neither supersedes the other.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum RelationCardinality {
42    /// One live object at a time; a newer edge supersedes the older.
43    SingleValued,
44
45    /// Many live objects at once; every edge coexists, none supersedes.
46    MultiValued,
47}
48
49/// Classifies each relation's [`RelationCardinality`] for conflict detection.
50///
51/// Relations not in the single-valued set default to [`RelationCardinality::MultiValued`]:
52/// appending a duplicate is recoverable (dedup later), whereas wrongly
53/// superseding destroys a true fact, so the safe default is to append.
54#[derive(Debug, Clone, Default)]
55pub struct CardinalityPolicy {
56    single_valued: HashSet<String>,
57}
58
59impl CardinalityPolicy {
60    /// Builds a policy treating `relations` as single-valued, all others multi.
61    ///
62    /// Relations are matched case-insensitively against the lowercased relation
63    /// label, so `"works at"` and `"Works At"` classify alike.
64    pub fn with_single_valued<I, S>(relations: I) -> Self
65    where
66        I: IntoIterator<Item = S>,
67        S: Into<String>,
68    {
69        Self {
70            single_valued: relations.into_iter().map(|relation| relation.into().to_lowercase()).collect(),
71        }
72    }
73
74    /// Returns the cardinality of `relation`.
75    pub fn cardinality(&self, relation: &str) -> RelationCardinality {
76        if self.single_valued.contains(&relation.to_lowercase()) {
77            RelationCardinality::SingleValued
78        } else {
79            RelationCardinality::MultiValued
80        }
81    }
82}
83
84/// A new relationship edge to reconcile against the graph.
85///
86/// Subject and object are the *resolved* node keys ([`super::Resolution`]), not
87/// raw entity strings. `valid_from` is the source memory's event time (when the
88/// fact became true), so "newer" orders by when facts held, not when they were
89/// processed — a backdated memory does not wrongly win over a current one.
90#[derive(Debug, Clone, PartialEq)]
91pub struct Edge {
92    /// Resolved key of the subject node within the scope.
93    pub subject_key: String,
94    /// The relation label (open vocabulary, as the extractor produced it).
95    pub relation: String,
96    /// Resolved key of the object node within the scope.
97    pub object_key: String,
98    /// The extractor's confidence in this edge, on the 0.0-1.0 scale.
99    pub confidence: f32,
100    /// When the fact became true (the source memory's event time).
101    pub valid_from: DateTime<FixedOffset>,
102}
103
104/// An edge already in the graph, as the resolver sees it for conflict checks.
105///
106/// Self-describing: the tuple `(subject_key, relation, object_key, valid_from)`
107/// *is* the edge's identity within a scope — the same tuple the commit path
108/// `MERGE`s on — so a resolver's close decision carries everything needed to
109/// find the edge again, with no backend-internal id. `valid_to` is `None` while
110/// the edge is current and `Some(t)` once it was superseded at `t`. Only
111/// current edges take part in conflict resolution; closed edges are history.
112#[derive(Debug, Clone, PartialEq)]
113pub struct ExistingEdge {
114    /// Resolved key of the subject node.
115    pub subject_key: String,
116    /// The relation label.
117    pub relation: String,
118    /// Resolved key of the object node.
119    pub object_key: String,
120    /// When the fact became true — part of the edge's identity.
121    pub valid_from: DateTime<FixedOffset>,
122    /// `None` while the edge is current; `Some(t)` once superseded at `t`.
123    pub valid_to: Option<DateTime<FixedOffset>>,
124}
125
126/// The resolver's decision: which existing edges to close, and the edge to open.
127///
128/// Echoes the row-level supersession model at the edge level — closing an edge
129/// records "a newer fact won," a normal lifecycle event, not an extraction
130/// error. `close` carries the edges to close *fully described* (see
131/// [`ExistingEdge`]'s identity tuple), so the commit path can match each by its
132/// own properties; their `valid_to` is set to the new edge's `valid_from`.
133/// `open` is the new edge to add as current.
134#[derive(Debug, Clone, PartialEq)]
135pub struct EdgeResolution {
136    /// Current edges to close (mark superseded), self-describing.
137    pub close: Vec<ExistingEdge>,
138    /// The new edge to open as current.
139    pub open: Edge,
140}
141
142/// Yields the current edges a resolver must reconcile a new edge against.
143///
144/// A focused retrieval seam mirroring [`super::EntityCatalog`]: the resolver
145/// needs only the *current* edges (those with `valid_to == None`) that share the
146/// new edge's subject and relation within one [`Scope`], not the whole graph.
147/// This is also where a FalkorDB-backed lookup later slots in behind the same
148/// trait, without changing the resolvers.
149pub trait EdgeCatalog: Send + Sync + 'static {
150    /// Returns the current edges in `scope` with `subject_key` and `relation`.
151    ///
152    /// Implementations return only edges whose `valid_to` is `None` (closed
153    /// edges are history and never reconsidered), confined to `scope`.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`EdgeError::Catalog`] when the backing store cannot be read.
158    fn current_edges(
159        &self,
160        scope: &Scope,
161        subject_key: &str,
162        relation: &str,
163    ) -> impl Future<Output = Result<Vec<ExistingEdge>, EdgeError>> + Send;
164}
165
166/// Reconciles a new edge against the graph's existing edges.
167///
168/// Implementations decide which existing edges a new one invalidates (if any)
169/// and return the [`EdgeResolution`] the commit path applies. Swapping one
170/// implementation for another (naive-append, temporal-invalidate) requires no
171/// caller change, which is what lets the benchmark compare them.
172pub trait EdgeResolver: Send + Sync + 'static {
173    /// Resolves `edge` within `scope` against existing edges.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`EdgeError::Catalog`] when reading existing edges fails.
178    fn resolve(&self, scope: &Scope, edge: Edge) -> impl Future<Output = Result<EdgeResolution, EdgeError>> + Send;
179}
180
181/// Failure modes for [`EdgeResolver`] implementations.
182#[derive(Debug, thiserror::Error)]
183pub enum EdgeError {
184    /// Reading existing edges from the [`EdgeCatalog`] failed.
185    #[error("edge catalog read failed: {0}")]
186    Catalog(String),
187}
188
189/// Appends every new edge, invalidating nothing.
190///
191/// The benchmark floor: a contradiction leaves both the old and new edge
192/// current, so the graph accumulates conflicting facts. Establishes the gap the
193/// benchmark measures [`TemporalEdgeResolver`] against, and never reads the
194/// graph — its resolution is the new edge alone.
195#[derive(Debug, Default, Clone, Copy)]
196pub struct NaiveAppendResolver;
197
198impl NaiveAppendResolver {
199    /// Creates a naive-append resolver.
200    pub fn new() -> Self {
201        Self
202    }
203}
204
205impl EdgeResolver for NaiveAppendResolver {
206    async fn resolve(&self, _scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
207        Ok(EdgeResolution {
208            close: Vec::new(),
209            open: edge,
210        })
211    }
212}
213
214/// Invalidates conflicting edges instead of deleting them, preserving history.
215///
216/// The production impl. A new edge conflicts with a current edge when they share
217/// subject and relation *and* the relation is [`RelationCardinality::SingleValued`]
218/// (one live object at a time). Conflicting edges are closed (added to
219/// [`EdgeResolution::close`]); the new edge opens as current. Multi-valued
220/// relations never conflict, so every edge coexists.
221///
222/// Winner is decided by recency alone — a newer fact supersedes an older one
223/// regardless of confidence, matching the row-level supersession model where "a
224/// newer fact won" is purely temporal. Confidence rides on the edge for the read
225/// path but never gates invalidation.
226///
227/// A *restatement* — the same single-valued fact observed again later — folds
228/// into the current edge rather than opening a parallel one: the resolution's
229/// `open` adopts the existing edge's `valid_from` (the edge's identity), so the
230/// commit's `MERGE` matches it and appends this source's pid. `valid_from` thus
231/// means "when the fact first became true," not "when it was last restated."
232#[derive(Debug, Clone)]
233pub struct TemporalEdgeResolver<C> {
234    catalog: C,
235    policy: CardinalityPolicy,
236}
237
238impl<C: EdgeCatalog> TemporalEdgeResolver<C> {
239    /// Builds a temporal resolver over `catalog` with the cardinality `policy`.
240    pub fn new(catalog: C, policy: CardinalityPolicy) -> Self {
241        Self { catalog, policy }
242    }
243}
244
245impl<C: EdgeCatalog> EdgeResolver for TemporalEdgeResolver<C> {
246    async fn resolve(&self, scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
247        if self.policy.cardinality(&edge.relation) == RelationCardinality::MultiValued {
248            return Ok(EdgeResolution {
249                close: Vec::new(),
250                open: edge,
251            });
252        }
253
254        let current = self.catalog.current_edges(scope, &edge.subject_key, &edge.relation).await?;
255        let mut open = edge;
256        let mut close = Vec::new();
257        for existing in current {
258            if existing.object_key == open.object_key {
259                if existing.valid_from < open.valid_from {
260                    open.valid_from = existing.valid_from;
261                }
262            } else {
263                close.push(existing);
264            }
265        }
266
267        Ok(EdgeResolution { close, open })
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use std::sync::Mutex;
274
275    use super::*;
276
277    fn scope() -> Scope {
278        Scope {
279            agent_id: "agent".to_string(),
280            org_id: "org".to_string(),
281            user_id: "user".to_string(),
282        }
283    }
284
285    fn at(day: u32) -> DateTime<FixedOffset> {
286        DateTime::parse_from_rfc3339(&format!("2026-06-{day:02}T00:00:00Z")).expect("valid test date")
287    }
288
289    fn edge(subject: &str, relation: &str, object: &str, day: u32) -> Edge {
290        Edge {
291            subject_key: subject.to_string(),
292            relation: relation.to_string(),
293            object_key: object.to_string(),
294            confidence: 0.9,
295            valid_from: at(day),
296        }
297    }
298
299    /// In-memory [`EdgeCatalog`] returning only current edges (`valid_to == None`).
300    #[derive(Default)]
301    struct InMemoryEdgeCatalog {
302        edges: Mutex<Vec<ExistingEdge>>,
303    }
304
305    impl InMemoryEdgeCatalog {
306        fn with(edges: Vec<ExistingEdge>) -> Self {
307            Self { edges: Mutex::new(edges) }
308        }
309    }
310
311    impl EdgeCatalog for InMemoryEdgeCatalog {
312        async fn current_edges(
313            &self,
314            _scope: &Scope,
315            subject_key: &str,
316            relation: &str,
317        ) -> Result<Vec<ExistingEdge>, EdgeError> {
318            Ok(self
319                .edges
320                .lock()
321                .expect("edge catalog mutex poisoned")
322                .iter()
323                .filter(|existing| {
324                    existing.valid_to.is_none() && existing.subject_key == subject_key && existing.relation == relation
325                })
326                .cloned()
327                .collect())
328        }
329    }
330
331    fn existing(subject: &str, relation: &str, object: &str, day: u32) -> ExistingEdge {
332        ExistingEdge {
333            subject_key: subject.to_string(),
334            relation: relation.to_string(),
335            object_key: object.to_string(),
336            valid_from: at(day),
337            valid_to: None,
338        }
339    }
340
341    #[tokio::test(flavor = "current_thread")]
342    async fn should_append_without_closing_under_naive_resolver() {
343        let resolver = NaiveAppendResolver::new();
344
345        let resolution = resolver
346            .resolve(&scope(), edge("alice", "works at", "globex", 2))
347            .await
348            .unwrap();
349
350        assert!(resolution.close.is_empty());
351        assert_eq!(resolution.open.object_key, "globex");
352    }
353
354    #[tokio::test(flavor = "current_thread")]
355    async fn should_close_conflicting_single_valued_edge() {
356        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
357        let policy = CardinalityPolicy::with_single_valued(["works at"]);
358        let resolver = TemporalEdgeResolver::new(catalog, policy);
359
360        let resolution = resolver
361            .resolve(&scope(), edge("alice", "works at", "globex", 2))
362            .await
363            .unwrap();
364
365        assert_eq!(resolution.close, vec![existing("alice", "works at", "acme", 1)]);
366        assert_eq!(resolution.open.object_key, "globex");
367    }
368
369    #[tokio::test(flavor = "current_thread")]
370    async fn should_not_close_multi_valued_edges() {
371        // The "deploy" case: three deploy events to distinct objects are all
372        // true and coexist; a multi-valued relation never supersedes.
373        let catalog = InMemoryEdgeCatalog::with(vec![
374            existing("team", "deployed", "weekend", 1),
375            existing("team", "deployed", "monday", 3),
376        ]);
377        let policy = CardinalityPolicy::with_single_valued(["works at"]);
378        let resolver = TemporalEdgeResolver::new(catalog, policy);
379
380        let resolution = resolver
381            .resolve(&scope(), edge("team", "deployed", "today", 6))
382            .await
383            .unwrap();
384
385        assert!(resolution.close.is_empty());
386        assert_eq!(resolution.open.object_key, "today");
387    }
388
389    #[tokio::test(flavor = "current_thread")]
390    async fn should_not_close_when_same_object_restated() {
391        // Restating the current fact must not close it against itself.
392        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
393        let policy = CardinalityPolicy::with_single_valued(["works at"]);
394        let resolver = TemporalEdgeResolver::new(catalog, policy);
395
396        let resolution = resolver
397            .resolve(&scope(), edge("alice", "works at", "acme", 2))
398            .await
399            .unwrap();
400
401        assert!(resolution.close.is_empty());
402    }
403
404    #[tokio::test(flavor = "current_thread")]
405    async fn should_fold_restated_fact_by_adopting_existing_valid_from() {
406        // A restatement opens no parallel edge: the open edge adopts the current
407        // edge's valid_from (its identity), so the commit MERGEs into it.
408        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
409        let policy = CardinalityPolicy::with_single_valued(["works at"]);
410        let resolver = TemporalEdgeResolver::new(catalog, policy);
411
412        let resolution = resolver
413            .resolve(&scope(), edge("alice", "works at", "acme", 9))
414            .await
415            .unwrap();
416
417        assert_eq!(resolution.open.valid_from, at(1));
418    }
419
420    #[tokio::test(flavor = "current_thread")]
421    async fn should_close_low_confidence_new_edge_over_high_confidence_old() {
422        // Recency wins regardless of confidence (matches row supersession).
423        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
424        let policy = CardinalityPolicy::with_single_valued(["works at"]);
425        let resolver = TemporalEdgeResolver::new(catalog, policy);
426
427        let mut hedged = edge("alice", "works at", "globex", 2);
428        hedged.confidence = 0.3;
429        let resolution = resolver.resolve(&scope(), hedged).await.unwrap();
430
431        assert_eq!(resolution.close.len(), 1);
432        assert_eq!(resolution.close[0].object_key, "acme");
433    }
434
435    #[test]
436    fn should_default_unknown_relations_to_multi_valued() {
437        let policy = CardinalityPolicy::with_single_valued(["works at"]);
438        assert_eq!(policy.cardinality("knows"), RelationCardinality::MultiValued);
439        assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
440    }
441
442    #[test]
443    fn should_classify_cardinality_case_insensitively() {
444        let policy = CardinalityPolicy::with_single_valued(["Works At"]);
445        assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
446    }
447
448    #[test]
449    fn should_treat_empty_policy_as_all_multi_valued() {
450        let policy = CardinalityPolicy::default();
451        assert_eq!(policy.cardinality("works at"), RelationCardinality::MultiValued);
452    }
453
454    #[test]
455    fn should_carry_event_time_as_valid_from() {
456        let backdated = edge("alice", "works at", "acme", 1);
457        let current = edge("alice", "works at", "globex", 5);
458        assert!(backdated.valid_from < current.valid_from);
459    }
460}