memoir_core/graph/edge.rs
1//! Reconciling a new relationship edge against the edges already in the graph.
2//!
3//! Once a triple's subject and object resolve to canonical nodes
4//! ([`super::resolve`]), the *edge* between them must be reconciled against
5//! existing edges. A new fact may **contradict** an existing one — "Alice works
6//! at Acme" then later "Alice works at Globex" — and the graph must record the
7//! change without losing history. [`EdgeResolver`] is the seam that decides what
8//! happens to existing edges when a new one arrives.
9//!
10//! **Contradiction is not Forget.** A contradiction *invalidates* an edge (keeps
11//! it, marks it closed) — it does not delete it. Deletion is the Forget path
12//! (reference-counted, a separate ticket). These stay distinct operations.
13//!
14//! Two implementations ship: [`NaiveAppendResolver`] (the benchmark floor —
15//! every edge is added, nothing invalidated) and [`TemporalEdgeResolver`] (the
16//! production impl — a conflicting edge is closed and the new one opened,
17//! preserving the chain). The temporal model echoes the row-level supersession
18//! model ([`crate::memory::SupersessionInfo`]: a newer fact won, a normal
19//! lifecycle event, not an error) at the edge level — it is not a reuse of that
20//! type, because edges are graph elements, not memory rows.
21
22use std::collections::HashSet;
23use std::future::Future;
24
25use chrono::{DateTime, FixedOffset};
26
27use crate::memory::Scope;
28
29/// Whether a subject may hold one relation to many objects at the same time.
30///
31/// The axis that decides whether a new edge *contradicts* an existing one or
32/// merely *adds* to it. A subject works at one employer at a time
33/// ([`Self::SingleValued`] — a new `works_at` supersedes the old), but deploys
34/// to many environments over time and knows many people at once
35/// ([`Self::MultiValued`] — every `deployed`/`knows` edge coexists).
36///
37/// This is *simultaneous* cardinality, not "does the relation ever repeat":
38/// "we deployed last weekend" and "we deployed Monday" are two true events, so
39/// `deployed` is multi-valued and neither supersedes the other.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum RelationCardinality {
42 /// One live object at a time; a newer edge supersedes the older.
43 SingleValued,
44
45 /// Many live objects at once; every edge coexists, none supersedes.
46 MultiValued,
47}
48
49/// Classifies each relation's [`RelationCardinality`] for conflict detection.
50///
51/// Relations not in the single-valued set default to [`RelationCardinality::MultiValued`]:
52/// appending a duplicate is recoverable (dedup later), whereas wrongly
53/// superseding destroys a true fact, so the safe default is to append.
54#[derive(Debug, Clone, Default)]
55pub struct CardinalityPolicy {
56 single_valued: HashSet<String>,
57}
58
59impl CardinalityPolicy {
60 /// Builds a policy treating `relations` as single-valued, all others multi.
61 ///
62 /// Relations are matched case-insensitively against the lowercased relation
63 /// label, so `"works at"` and `"Works At"` classify alike.
64 pub fn with_single_valued<I, S>(relations: I) -> Self
65 where
66 I: IntoIterator<Item = S>,
67 S: Into<String>,
68 {
69 Self {
70 single_valued: relations.into_iter().map(|relation| relation.into().to_lowercase()).collect(),
71 }
72 }
73
74 /// Returns the cardinality of `relation`.
75 pub fn cardinality(&self, relation: &str) -> RelationCardinality {
76 if self.single_valued.contains(&relation.to_lowercase()) {
77 RelationCardinality::SingleValued
78 } else {
79 RelationCardinality::MultiValued
80 }
81 }
82}
83
84/// A new relationship edge to reconcile against the graph.
85///
86/// Subject and object are the *resolved* node keys ([`super::Resolution`]), not
87/// raw entity strings. `valid_from` is the source memory's event time (when the
88/// fact became true), so "newer" orders by when facts held, not when they were
89/// processed — a backdated memory does not wrongly win over a current one.
90#[derive(Debug, Clone, PartialEq)]
91pub struct Edge {
92 /// Resolved key of the subject node within the scope.
93 pub subject_key: String,
94 /// The relation label (open vocabulary, as the extractor produced it).
95 pub relation: String,
96 /// Resolved key of the object node within the scope.
97 pub object_key: String,
98 /// The extractor's confidence in this edge, on the 0.0-1.0 scale.
99 pub confidence: f32,
100 /// When the fact became true (the source memory's event time).
101 pub valid_from: DateTime<FixedOffset>,
102}
103
104/// An edge already in the graph, as the resolver sees it for conflict checks.
105///
106/// Self-describing: the tuple `(subject_key, relation, object_key, valid_from)`
107/// *is* the edge's identity within a scope — the same tuple the commit path
108/// `MERGE`s on — so a resolver's close decision carries everything needed to
109/// find the edge again, with no backend-internal id. `valid_to` is `None` while
110/// the edge is current and `Some(t)` once it was superseded at `t`. Only
111/// current edges take part in conflict resolution; closed edges are history.
112#[derive(Debug, Clone, PartialEq)]
113pub struct ExistingEdge {
114 /// Resolved key of the subject node.
115 pub subject_key: String,
116 /// The relation label.
117 pub relation: String,
118 /// Resolved key of the object node.
119 pub object_key: String,
120 /// When the fact became true — part of the edge's identity.
121 pub valid_from: DateTime<FixedOffset>,
122 /// `None` while the edge is current; `Some(t)` once superseded at `t`.
123 pub valid_to: Option<DateTime<FixedOffset>>,
124}
125
126/// The resolver's decision: which existing edges to close, and the edge to open.
127///
128/// Echoes the row-level supersession model at the edge level — closing an edge
129/// records "a newer fact won," a normal lifecycle event, not an extraction
130/// error. `close` carries the edges to close *fully described* (see
131/// [`ExistingEdge`]'s identity tuple), so the commit path can match each by its
132/// own properties; their `valid_to` is set to the new edge's `valid_from`.
133/// `open` is the new edge to add as current.
134#[derive(Debug, Clone, PartialEq)]
135pub struct EdgeResolution {
136 /// Current edges to close (mark superseded), self-describing.
137 pub close: Vec<ExistingEdge>,
138 /// The new edge to open as current.
139 pub open: Edge,
140}
141
142/// Yields the current edges a resolver must reconcile a new edge against.
143///
144/// A focused retrieval seam mirroring [`super::EntityCatalog`]: the resolver
145/// needs only the *current* edges (those with `valid_to == None`) that share the
146/// new edge's subject and relation within one [`Scope`], not the whole graph.
147/// This is also where a FalkorDB-backed lookup later slots in behind the same
148/// trait, without changing the resolvers.
149pub trait EdgeCatalog: Send + Sync + 'static {
150 /// Returns the current edges in `scope` with `subject_key` and `relation`.
151 ///
152 /// Implementations return only edges whose `valid_to` is `None` (closed
153 /// edges are history and never reconsidered), confined to `scope`.
154 ///
155 /// # Errors
156 ///
157 /// Returns [`EdgeError::Catalog`] when the backing store cannot be read.
158 fn current_edges(
159 &self,
160 scope: &Scope,
161 subject_key: &str,
162 relation: &str,
163 ) -> impl Future<Output = Result<Vec<ExistingEdge>, EdgeError>> + Send;
164}
165
166/// Reconciles a new edge against the graph's existing edges.
167///
168/// Implementations decide which existing edges a new one invalidates (if any)
169/// and return the [`EdgeResolution`] the commit path applies. Swapping one
170/// implementation for another (naive-append, temporal-invalidate) requires no
171/// caller change, which is what lets the benchmark compare them.
172pub trait EdgeResolver: Send + Sync + 'static {
173 /// Resolves `edge` within `scope` against existing edges.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`EdgeError::Catalog`] when reading existing edges fails.
178 fn resolve(&self, scope: &Scope, edge: Edge) -> impl Future<Output = Result<EdgeResolution, EdgeError>> + Send;
179}
180
181/// Failure modes for [`EdgeResolver`] implementations.
182#[derive(Debug, thiserror::Error)]
183pub enum EdgeError {
184 /// Reading existing edges from the [`EdgeCatalog`] failed.
185 #[error("edge catalog read failed: {0}")]
186 Catalog(String),
187}
188
189/// Appends every new edge, invalidating nothing.
190///
191/// The benchmark floor: a contradiction leaves both the old and new edge
192/// current, so the graph accumulates conflicting facts. Establishes the gap the
193/// benchmark measures [`TemporalEdgeResolver`] against, and never reads the
194/// graph — its resolution is the new edge alone.
195#[derive(Debug, Default, Clone, Copy)]
196pub struct NaiveAppendResolver;
197
198impl NaiveAppendResolver {
199 /// Creates a naive-append resolver.
200 pub fn new() -> Self {
201 Self
202 }
203}
204
205impl EdgeResolver for NaiveAppendResolver {
206 async fn resolve(&self, _scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
207 Ok(EdgeResolution {
208 close: Vec::new(),
209 open: edge,
210 })
211 }
212}
213
214/// Invalidates conflicting edges instead of deleting them, preserving history.
215///
216/// The production impl. A new edge conflicts with a current edge when they share
217/// subject and relation *and* the relation is [`RelationCardinality::SingleValued`]
218/// (one live object at a time). Conflicting edges are closed (added to
219/// [`EdgeResolution::close`]); the new edge opens as current. Multi-valued
220/// relations never conflict, so every edge coexists.
221///
222/// Winner is decided by recency alone — a newer fact supersedes an older one
223/// regardless of confidence, matching the row-level supersession model where "a
224/// newer fact won" is purely temporal. Confidence rides on the edge for the read
225/// path but never gates invalidation.
226///
227/// A *restatement* — the same single-valued fact observed again later — folds
228/// into the current edge rather than opening a parallel one: the resolution's
229/// `open` adopts the existing edge's `valid_from` (the edge's identity), so the
230/// commit's `MERGE` matches it and appends this source's pid. `valid_from` thus
231/// means "when the fact first became true," not "when it was last restated."
232#[derive(Debug, Clone)]
233pub struct TemporalEdgeResolver<C> {
234 catalog: C,
235 policy: CardinalityPolicy,
236}
237
238impl<C: EdgeCatalog> TemporalEdgeResolver<C> {
239 /// Builds a temporal resolver over `catalog` with the cardinality `policy`.
240 pub fn new(catalog: C, policy: CardinalityPolicy) -> Self {
241 Self { catalog, policy }
242 }
243}
244
245impl<C: EdgeCatalog> EdgeResolver for TemporalEdgeResolver<C> {
246 async fn resolve(&self, scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
247 if self.policy.cardinality(&edge.relation) == RelationCardinality::MultiValued {
248 return Ok(EdgeResolution {
249 close: Vec::new(),
250 open: edge,
251 });
252 }
253
254 let current = self.catalog.current_edges(scope, &edge.subject_key, &edge.relation).await?;
255 let mut open = edge;
256 let mut close = Vec::new();
257 for existing in current {
258 if existing.object_key == open.object_key {
259 if existing.valid_from < open.valid_from {
260 open.valid_from = existing.valid_from;
261 }
262 } else {
263 close.push(existing);
264 }
265 }
266
267 Ok(EdgeResolution { close, open })
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use std::sync::Mutex;
274
275 use super::*;
276
277 fn scope() -> Scope {
278 Scope {
279 agent_id: "agent".to_string(),
280 org_id: "org".to_string(),
281 user_id: "user".to_string(),
282 }
283 }
284
285 fn at(day: u32) -> DateTime<FixedOffset> {
286 DateTime::parse_from_rfc3339(&format!("2026-06-{day:02}T00:00:00Z")).expect("valid test date")
287 }
288
289 fn edge(subject: &str, relation: &str, object: &str, day: u32) -> Edge {
290 Edge {
291 subject_key: subject.to_string(),
292 relation: relation.to_string(),
293 object_key: object.to_string(),
294 confidence: 0.9,
295 valid_from: at(day),
296 }
297 }
298
299 /// In-memory [`EdgeCatalog`] returning only current edges (`valid_to == None`).
300 #[derive(Default)]
301 struct InMemoryEdgeCatalog {
302 edges: Mutex<Vec<ExistingEdge>>,
303 }
304
305 impl InMemoryEdgeCatalog {
306 fn with(edges: Vec<ExistingEdge>) -> Self {
307 Self { edges: Mutex::new(edges) }
308 }
309 }
310
311 impl EdgeCatalog for InMemoryEdgeCatalog {
312 async fn current_edges(
313 &self,
314 _scope: &Scope,
315 subject_key: &str,
316 relation: &str,
317 ) -> Result<Vec<ExistingEdge>, EdgeError> {
318 Ok(self
319 .edges
320 .lock()
321 .expect("edge catalog mutex poisoned")
322 .iter()
323 .filter(|existing| {
324 existing.valid_to.is_none() && existing.subject_key == subject_key && existing.relation == relation
325 })
326 .cloned()
327 .collect())
328 }
329 }
330
331 fn existing(subject: &str, relation: &str, object: &str, day: u32) -> ExistingEdge {
332 ExistingEdge {
333 subject_key: subject.to_string(),
334 relation: relation.to_string(),
335 object_key: object.to_string(),
336 valid_from: at(day),
337 valid_to: None,
338 }
339 }
340
341 #[tokio::test(flavor = "current_thread")]
342 async fn should_append_without_closing_under_naive_resolver() {
343 let resolver = NaiveAppendResolver::new();
344
345 let resolution = resolver
346 .resolve(&scope(), edge("alice", "works at", "globex", 2))
347 .await
348 .unwrap();
349
350 assert!(resolution.close.is_empty());
351 assert_eq!(resolution.open.object_key, "globex");
352 }
353
354 #[tokio::test(flavor = "current_thread")]
355 async fn should_close_conflicting_single_valued_edge() {
356 let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
357 let policy = CardinalityPolicy::with_single_valued(["works at"]);
358 let resolver = TemporalEdgeResolver::new(catalog, policy);
359
360 let resolution = resolver
361 .resolve(&scope(), edge("alice", "works at", "globex", 2))
362 .await
363 .unwrap();
364
365 assert_eq!(resolution.close, vec![existing("alice", "works at", "acme", 1)]);
366 assert_eq!(resolution.open.object_key, "globex");
367 }
368
369 #[tokio::test(flavor = "current_thread")]
370 async fn should_not_close_multi_valued_edges() {
371 // The "deploy" case: three deploy events to distinct objects are all
372 // true and coexist; a multi-valued relation never supersedes.
373 let catalog = InMemoryEdgeCatalog::with(vec![
374 existing("team", "deployed", "weekend", 1),
375 existing("team", "deployed", "monday", 3),
376 ]);
377 let policy = CardinalityPolicy::with_single_valued(["works at"]);
378 let resolver = TemporalEdgeResolver::new(catalog, policy);
379
380 let resolution = resolver
381 .resolve(&scope(), edge("team", "deployed", "today", 6))
382 .await
383 .unwrap();
384
385 assert!(resolution.close.is_empty());
386 assert_eq!(resolution.open.object_key, "today");
387 }
388
389 #[tokio::test(flavor = "current_thread")]
390 async fn should_not_close_when_same_object_restated() {
391 // Restating the current fact must not close it against itself.
392 let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
393 let policy = CardinalityPolicy::with_single_valued(["works at"]);
394 let resolver = TemporalEdgeResolver::new(catalog, policy);
395
396 let resolution = resolver
397 .resolve(&scope(), edge("alice", "works at", "acme", 2))
398 .await
399 .unwrap();
400
401 assert!(resolution.close.is_empty());
402 }
403
404 #[tokio::test(flavor = "current_thread")]
405 async fn should_fold_restated_fact_by_adopting_existing_valid_from() {
406 // A restatement opens no parallel edge: the open edge adopts the current
407 // edge's valid_from (its identity), so the commit MERGEs into it.
408 let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
409 let policy = CardinalityPolicy::with_single_valued(["works at"]);
410 let resolver = TemporalEdgeResolver::new(catalog, policy);
411
412 let resolution = resolver
413 .resolve(&scope(), edge("alice", "works at", "acme", 9))
414 .await
415 .unwrap();
416
417 assert_eq!(resolution.open.valid_from, at(1));
418 }
419
420 #[tokio::test(flavor = "current_thread")]
421 async fn should_close_low_confidence_new_edge_over_high_confidence_old() {
422 // Recency wins regardless of confidence (matches row supersession).
423 let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
424 let policy = CardinalityPolicy::with_single_valued(["works at"]);
425 let resolver = TemporalEdgeResolver::new(catalog, policy);
426
427 let mut hedged = edge("alice", "works at", "globex", 2);
428 hedged.confidence = 0.3;
429 let resolution = resolver.resolve(&scope(), hedged).await.unwrap();
430
431 assert_eq!(resolution.close.len(), 1);
432 assert_eq!(resolution.close[0].object_key, "acme");
433 }
434
435 #[test]
436 fn should_default_unknown_relations_to_multi_valued() {
437 let policy = CardinalityPolicy::with_single_valued(["works at"]);
438 assert_eq!(policy.cardinality("knows"), RelationCardinality::MultiValued);
439 assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
440 }
441
442 #[test]
443 fn should_classify_cardinality_case_insensitively() {
444 let policy = CardinalityPolicy::with_single_valued(["Works At"]);
445 assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
446 }
447
448 #[test]
449 fn should_treat_empty_policy_as_all_multi_valued() {
450 let policy = CardinalityPolicy::default();
451 assert_eq!(policy.cardinality("works at"), RelationCardinality::MultiValued);
452 }
453
454 #[test]
455 fn should_carry_event_time_as_valid_from() {
456 let backdated = edge("alice", "works at", "acme", 1);
457 let current = edge("alice", "works at", "globex", 5);
458 assert!(backdated.valid_from < current.valid_from);
459 }
460}