polypixel-memoir-core 0.4.0

Memoir memory substrate as an embeddable Rust library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
//! Reconciling a new relationship edge against the edges already in the graph.
//!
//! Once a triple's subject and object resolve to canonical nodes
//! ([`super::resolve`]), the *edge* between them must be reconciled against
//! existing edges. A new fact may **contradict** an existing one — "Alice works
//! at Acme" then later "Alice works at Globex" — and the graph must record the
//! change without losing history. [`EdgeResolver`] is the seam that decides what
//! happens to existing edges when a new one arrives.
//!
//! **Contradiction is not Forget.** A contradiction *invalidates* an edge (keeps
//! it, marks it closed) — it does not delete it. Deletion is the Forget path
//! (reference-counted, a separate ticket). These stay distinct operations.
//!
//! Two implementations ship: [`NaiveAppendResolver`] (the benchmark floor —
//! every edge is added, nothing invalidated) and [`TemporalEdgeResolver`] (the
//! production impl — a conflicting edge is closed and the new one opened,
//! preserving the chain). The temporal model echoes the row-level supersession
//! model ([`crate::memory::SupersessionInfo`]: a newer fact won, a normal
//! lifecycle event, not an error) at the edge level — it is not a reuse of that
//! type, because edges are graph elements, not memory rows.

use std::collections::HashSet;
use std::future::Future;

use chrono::{DateTime, FixedOffset};

use crate::memory::Scope;

/// Whether a subject may hold one relation to many objects at the same time.
///
/// The axis that decides whether a new edge *contradicts* an existing one or
/// merely *adds* to it. A subject works at one employer at a time
/// ([`Self::SingleValued`] — a new `works_at` supersedes the old), but deploys
/// to many environments over time and knows many people at once
/// ([`Self::MultiValued`] — every `deployed`/`knows` edge coexists).
///
/// This is *simultaneous* cardinality, not "does the relation ever repeat":
/// "we deployed last weekend" and "we deployed Monday" are two true events, so
/// `deployed` is multi-valued and neither supersedes the other.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RelationCardinality {
    /// One live object at a time; a newer edge supersedes the older.
    SingleValued,

    /// Many live objects at once; every edge coexists, none supersedes.
    MultiValued,
}

/// Classifies each relation's [`RelationCardinality`] for conflict detection.
///
/// Relations not in the single-valued set default to [`RelationCardinality::MultiValued`]:
/// appending a duplicate is recoverable (dedup later), whereas wrongly
/// superseding destroys a true fact, so the safe default is to append.
#[derive(Debug, Clone, Default)]
pub struct CardinalityPolicy {
    single_valued: HashSet<String>,
}

impl CardinalityPolicy {
    /// Builds a policy treating `relations` as single-valued, all others multi.
    ///
    /// Relations are matched case-insensitively against the lowercased relation
    /// label, so `"works at"` and `"Works At"` classify alike.
    pub fn with_single_valued<I, S>(relations: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        Self {
            single_valued: relations.into_iter().map(|relation| relation.into().to_lowercase()).collect(),
        }
    }

    /// Returns the cardinality of `relation`.
    pub fn cardinality(&self, relation: &str) -> RelationCardinality {
        if self.single_valued.contains(&relation.to_lowercase()) {
            RelationCardinality::SingleValued
        } else {
            RelationCardinality::MultiValued
        }
    }
}

/// A new relationship edge to reconcile against the graph.
///
/// Subject and object are the *resolved* node keys ([`super::Resolution`]), not
/// raw entity strings. `valid_from` is the source memory's event time (when the
/// fact became true), so "newer" orders by when facts held, not when they were
/// processed — a backdated memory does not wrongly win over a current one.
#[derive(Debug, Clone, PartialEq)]
pub struct Edge {
    /// Resolved key of the subject node within the scope.
    pub subject_key: String,
    /// The relation label (open vocabulary, as the extractor produced it).
    pub relation: String,
    /// Resolved key of the object node within the scope.
    pub object_key: String,
    /// The extractor's confidence in this edge, on the 0.0-1.0 scale.
    pub confidence: f32,
    /// When the fact became true (the source memory's event time).
    pub valid_from: DateTime<FixedOffset>,
}

/// An edge already in the graph, as the resolver sees it for conflict checks.
///
/// Self-describing: the tuple `(subject_key, relation, object_key, valid_from)`
/// *is* the edge's identity within a scope — the same tuple the commit path
/// `MERGE`s on — so a resolver's close decision carries everything needed to
/// find the edge again, with no backend-internal id. `valid_to` is `None` while
/// the edge is current and `Some(t)` once it was superseded at `t`. Only
/// current edges take part in conflict resolution; closed edges are history.
#[derive(Debug, Clone, PartialEq)]
pub struct ExistingEdge {
    /// Resolved key of the subject node.
    pub subject_key: String,
    /// The relation label.
    pub relation: String,
    /// Resolved key of the object node.
    pub object_key: String,
    /// When the fact became true — part of the edge's identity.
    pub valid_from: DateTime<FixedOffset>,
    /// `None` while the edge is current; `Some(t)` once superseded at `t`.
    pub valid_to: Option<DateTime<FixedOffset>>,
}

/// The resolver's decision: which existing edges to close, and the edge to open.
///
/// Echoes the row-level supersession model at the edge level — closing an edge
/// records "a newer fact won," a normal lifecycle event, not an extraction
/// error. `close` carries the edges to close *fully described* (see
/// [`ExistingEdge`]'s identity tuple), so the commit path can match each by its
/// own properties; their `valid_to` is set to the new edge's `valid_from`.
/// `open` is the new edge to add as current.
#[derive(Debug, Clone, PartialEq)]
pub struct EdgeResolution {
    /// Current edges to close (mark superseded), self-describing.
    pub close: Vec<ExistingEdge>,
    /// The new edge to open as current.
    pub open: Edge,
}

/// Yields the current edges a resolver must reconcile a new edge against.
///
/// A focused retrieval seam mirroring [`super::EntityCatalog`]: the resolver
/// needs only the *current* edges (those with `valid_to == None`) that share the
/// new edge's subject and relation within one [`Scope`], not the whole graph.
/// This is also where a FalkorDB-backed lookup later slots in behind the same
/// trait, without changing the resolvers.
pub trait EdgeCatalog: Send + Sync + 'static {
    /// Returns the current edges in `scope` with `subject_key` and `relation`.
    ///
    /// Implementations return only edges whose `valid_to` is `None` (closed
    /// edges are history and never reconsidered), confined to `scope`.
    ///
    /// # Errors
    ///
    /// Returns [`EdgeError::Catalog`] when the backing store cannot be read.
    fn current_edges(
        &self,
        scope: &Scope,
        subject_key: &str,
        relation: &str,
    ) -> impl Future<Output = Result<Vec<ExistingEdge>, EdgeError>> + Send;
}

/// Reconciles a new edge against the graph's existing edges.
///
/// Implementations decide which existing edges a new one invalidates (if any)
/// and return the [`EdgeResolution`] the commit path applies. Swapping one
/// implementation for another (naive-append, temporal-invalidate) requires no
/// caller change, which is what lets the benchmark compare them.
pub trait EdgeResolver: Send + Sync + 'static {
    /// Resolves `edge` within `scope` against existing edges.
    ///
    /// # Errors
    ///
    /// Returns [`EdgeError::Catalog`] when reading existing edges fails.
    fn resolve(&self, scope: &Scope, edge: Edge) -> impl Future<Output = Result<EdgeResolution, EdgeError>> + Send;
}

/// Failure modes for [`EdgeResolver`] implementations.
#[derive(Debug, thiserror::Error)]
pub enum EdgeError {
    /// Reading existing edges from the [`EdgeCatalog`] failed.
    #[error("edge catalog read failed: {0}")]
    Catalog(String),
}

/// Appends every new edge, invalidating nothing.
///
/// The benchmark floor: a contradiction leaves both the old and new edge
/// current, so the graph accumulates conflicting facts. Establishes the gap the
/// benchmark measures [`TemporalEdgeResolver`] against, and never reads the
/// graph — its resolution is the new edge alone.
#[derive(Debug, Default, Clone, Copy)]
pub struct NaiveAppendResolver;

impl NaiveAppendResolver {
    /// Creates a naive-append resolver.
    pub fn new() -> Self {
        Self
    }
}

impl EdgeResolver for NaiveAppendResolver {
    async fn resolve(&self, _scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
        Ok(EdgeResolution {
            close: Vec::new(),
            open: edge,
        })
    }
}

/// Invalidates conflicting edges instead of deleting them, preserving history.
///
/// The production impl. A new edge conflicts with a current edge when they share
/// subject and relation *and* the relation is [`RelationCardinality::SingleValued`]
/// (one live object at a time). Conflicting edges are closed (added to
/// [`EdgeResolution::close`]); the new edge opens as current. Multi-valued
/// relations never conflict, so every edge coexists.
///
/// Winner is decided by recency alone — a newer fact supersedes an older one
/// regardless of confidence, matching the row-level supersession model where "a
/// newer fact won" is purely temporal. Confidence rides on the edge for the read
/// path but never gates invalidation.
///
/// A *restatement* — the same single-valued fact observed again later — folds
/// into the current edge rather than opening a parallel one: the resolution's
/// `open` adopts the existing edge's `valid_from` (the edge's identity), so the
/// commit's `MERGE` matches it and appends this source's pid. `valid_from` thus
/// means "when the fact first became true," not "when it was last restated."
#[derive(Debug, Clone)]
pub struct TemporalEdgeResolver<C> {
    catalog: C,
    policy: CardinalityPolicy,
}

impl<C: EdgeCatalog> TemporalEdgeResolver<C> {
    /// Builds a temporal resolver over `catalog` with the cardinality `policy`.
    pub fn new(catalog: C, policy: CardinalityPolicy) -> Self {
        Self { catalog, policy }
    }
}

impl<C: EdgeCatalog> EdgeResolver for TemporalEdgeResolver<C> {
    async fn resolve(&self, scope: &Scope, edge: Edge) -> Result<EdgeResolution, EdgeError> {
        if self.policy.cardinality(&edge.relation) == RelationCardinality::MultiValued {
            return Ok(EdgeResolution {
                close: Vec::new(),
                open: edge,
            });
        }

        let current = self.catalog.current_edges(scope, &edge.subject_key, &edge.relation).await?;
        let mut open = edge;
        let mut close = Vec::new();
        for existing in current {
            if existing.object_key == open.object_key {
                if existing.valid_from < open.valid_from {
                    open.valid_from = existing.valid_from;
                }
            } else {
                close.push(existing);
            }
        }

        Ok(EdgeResolution { close, open })
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Mutex;

    use super::*;

    fn scope() -> Scope {
        Scope {
            agent_id: "agent".to_string(),
            org_id: "org".to_string(),
            user_id: "user".to_string(),
        }
    }

    fn at(day: u32) -> DateTime<FixedOffset> {
        DateTime::parse_from_rfc3339(&format!("2026-06-{day:02}T00:00:00Z")).expect("valid test date")
    }

    fn edge(subject: &str, relation: &str, object: &str, day: u32) -> Edge {
        Edge {
            subject_key: subject.to_string(),
            relation: relation.to_string(),
            object_key: object.to_string(),
            confidence: 0.9,
            valid_from: at(day),
        }
    }

    /// In-memory [`EdgeCatalog`] returning only current edges (`valid_to == None`).
    #[derive(Default)]
    struct InMemoryEdgeCatalog {
        edges: Mutex<Vec<ExistingEdge>>,
    }

    impl InMemoryEdgeCatalog {
        fn with(edges: Vec<ExistingEdge>) -> Self {
            Self { edges: Mutex::new(edges) }
        }
    }

    impl EdgeCatalog for InMemoryEdgeCatalog {
        async fn current_edges(
            &self,
            _scope: &Scope,
            subject_key: &str,
            relation: &str,
        ) -> Result<Vec<ExistingEdge>, EdgeError> {
            Ok(self
                .edges
                .lock()
                .expect("edge catalog mutex poisoned")
                .iter()
                .filter(|existing| {
                    existing.valid_to.is_none() && existing.subject_key == subject_key && existing.relation == relation
                })
                .cloned()
                .collect())
        }
    }

    fn existing(subject: &str, relation: &str, object: &str, day: u32) -> ExistingEdge {
        ExistingEdge {
            subject_key: subject.to_string(),
            relation: relation.to_string(),
            object_key: object.to_string(),
            valid_from: at(day),
            valid_to: None,
        }
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_append_without_closing_under_naive_resolver() {
        let resolver = NaiveAppendResolver::new();

        let resolution = resolver
            .resolve(&scope(), edge("alice", "works at", "globex", 2))
            .await
            .unwrap();

        assert!(resolution.close.is_empty());
        assert_eq!(resolution.open.object_key, "globex");
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_close_conflicting_single_valued_edge() {
        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        let resolver = TemporalEdgeResolver::new(catalog, policy);

        let resolution = resolver
            .resolve(&scope(), edge("alice", "works at", "globex", 2))
            .await
            .unwrap();

        assert_eq!(resolution.close, vec![existing("alice", "works at", "acme", 1)]);
        assert_eq!(resolution.open.object_key, "globex");
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_not_close_multi_valued_edges() {
        // The "deploy" case: three deploy events to distinct objects are all
        // true and coexist; a multi-valued relation never supersedes.
        let catalog = InMemoryEdgeCatalog::with(vec![
            existing("team", "deployed", "weekend", 1),
            existing("team", "deployed", "monday", 3),
        ]);
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        let resolver = TemporalEdgeResolver::new(catalog, policy);

        let resolution = resolver
            .resolve(&scope(), edge("team", "deployed", "today", 6))
            .await
            .unwrap();

        assert!(resolution.close.is_empty());
        assert_eq!(resolution.open.object_key, "today");
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_not_close_when_same_object_restated() {
        // Restating the current fact must not close it against itself.
        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        let resolver = TemporalEdgeResolver::new(catalog, policy);

        let resolution = resolver
            .resolve(&scope(), edge("alice", "works at", "acme", 2))
            .await
            .unwrap();

        assert!(resolution.close.is_empty());
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_fold_restated_fact_by_adopting_existing_valid_from() {
        // A restatement opens no parallel edge: the open edge adopts the current
        // edge's valid_from (its identity), so the commit MERGEs into it.
        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        let resolver = TemporalEdgeResolver::new(catalog, policy);

        let resolution = resolver
            .resolve(&scope(), edge("alice", "works at", "acme", 9))
            .await
            .unwrap();

        assert_eq!(resolution.open.valid_from, at(1));
    }

    #[tokio::test(flavor = "current_thread")]
    async fn should_close_low_confidence_new_edge_over_high_confidence_old() {
        // Recency wins regardless of confidence (matches row supersession).
        let catalog = InMemoryEdgeCatalog::with(vec![existing("alice", "works at", "acme", 1)]);
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        let resolver = TemporalEdgeResolver::new(catalog, policy);

        let mut hedged = edge("alice", "works at", "globex", 2);
        hedged.confidence = 0.3;
        let resolution = resolver.resolve(&scope(), hedged).await.unwrap();

        assert_eq!(resolution.close.len(), 1);
        assert_eq!(resolution.close[0].object_key, "acme");
    }

    #[test]
    fn should_default_unknown_relations_to_multi_valued() {
        let policy = CardinalityPolicy::with_single_valued(["works at"]);
        assert_eq!(policy.cardinality("knows"), RelationCardinality::MultiValued);
        assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
    }

    #[test]
    fn should_classify_cardinality_case_insensitively() {
        let policy = CardinalityPolicy::with_single_valued(["Works At"]);
        assert_eq!(policy.cardinality("works at"), RelationCardinality::SingleValued);
    }

    #[test]
    fn should_treat_empty_policy_as_all_multi_valued() {
        let policy = CardinalityPolicy::default();
        assert_eq!(policy.cardinality("works at"), RelationCardinality::MultiValued);
    }

    #[test]
    fn should_carry_event_time_as_valid_from() {
        let backdated = edge("alice", "works at", "acme", 1);
        let current = edge("alice", "works at", "globex", 5);
        assert!(backdated.valid_from < current.valid_from);
    }
}