polypixel-memoir-core 0.4.0

Memoir memory substrate as an embeddable Rust library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Integration tests for the knowledge-graph write/read/forget/reconcile path.
//!
//! Wires a `TestClient` against live Postgres + Qdrant + FalkorDB + Ollama (via
//! [`common::fresh_graph_client`]). The worker drains an episodic write through
//! `RelationalExtract` → synthesis → graph commit; tests poll the live graph
//! ([`common::wait_until_graph_committed`]) and assert on the committed nodes
//! and edges via `Client::inspect_graph`.
//!
//! Assertions test *structural shape*, not exact model output: extraction runs a
//! real LLM, so the contract under test is "a relationship between Alice and
//! Acme was committed", never "the relation label is exactly `WORKS_AT`". Each
//! test mints its scope via [`common::TestClient::fresh_scope`] so its graph
//! data is wiped on drop — the suite shares one FalkorDB graph name.
//!
//! Requires `--features integration,knowledge-graph` and the env vars
//! `DATABASE_URL`, `QDRANT_URL`, `FALKOR_URL`, `OLLAMA_URL`, `OLLAMA_MODEL`.

#![cfg(all(feature = "integration", feature = "knowledge-graph"))]

mod common;

use std::time::Duration;

use memoir_core::graph::GraphSnapshot;

/// Generous ceiling for a real-LLM relational-extraction + synthesis round.
///
/// The graph commit waits on two LLM passes (semantic extract + relational
/// extract) plus synthesis; a cold large model can take tens of seconds per
/// call. Mirrors the lease/timeout discipline the harness documents.
const GRAPH_COMMIT_TIMEOUT: Duration = Duration::from_secs(120);

/// Returns whether the snapshot holds an edge between two entities, either way.
///
/// Direction-agnostic and label-agnostic on purpose: the LLM decides relation
/// phrasing and triple direction, so a test asserts only that the two entities
/// are related at all. Names match case-insensitively on a substring so
/// "Alice" matches a committed "Alice Smith".
fn has_edge_between(snapshot: &GraphSnapshot, a: &str, b: &str) -> bool {
    let a = a.to_lowercase();
    let b = b.to_lowercase();
    snapshot.edges.iter().any(|e| {
        let s = e.subject.to_lowercase();
        let o = e.object.to_lowercase();
        (s.contains(&a) && o.contains(&b)) || (s.contains(&b) && o.contains(&a))
    })
}

/// Returns whether the snapshot holds an entity whose name contains `name`.
fn has_entity(snapshot: &GraphSnapshot, name: &str) -> bool {
    let name = name.to_lowercase();
    snapshot.nodes.iter().any(|n| n.name.to_lowercase().contains(&name))
}

/// Counts entities whose name contains `name` (case-insensitive substring).
fn count_entities(snapshot: &GraphSnapshot, name: &str) -> usize {
    let name = name.to_lowercase();
    snapshot
        .nodes
        .iter()
        .filter(|n| n.name.to_lowercase().contains(&name))
        .count()
}

/// Returns the edges whose subject and object span the two entities, either way.
fn edges_between<'a>(snapshot: &'a GraphSnapshot, a: &str, b: &str) -> Vec<&'a memoir_core::graph::GraphEdge> {
    let a = a.to_lowercase();
    let b = b.to_lowercase();
    snapshot
        .edges
        .iter()
        .filter(|e| {
            let s = e.subject.to_lowercase();
            let o = e.object.to_lowercase();
            (s.contains(&a) && o.contains(&b)) || (s.contains(&b) && o.contains(&a))
        })
        .collect()
}

// ─── c3: write → commit ──────────────────────────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_commit_entities_and_edge_when_episodic_memory_is_written() -> anyhow::Result<()> {
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    client.remember("Alice works at Acme", scope.clone()).await?;

    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    assert!(
        has_entity(&snapshot, "Alice"),
        "expected an Alice entity, got {:?}",
        snapshot.nodes
    );
    assert!(
        has_entity(&snapshot, "Acme"),
        "expected an Acme entity, got {:?}",
        snapshot.nodes
    );
    assert!(
        has_edge_between(&snapshot, "Alice", "Acme"),
        "expected an Alice<->Acme edge, got {:?}",
        snapshot.edges,
    );
    Ok(())
}

// ─── c4: synthesis is idempotent — one source commits one set of triples ─────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_commit_a_single_node_per_entity_when_one_source_is_synthesized() -> anyhow::Result<()> {
    // Synthesis is the two-parent fan-in (extract ∥ relational → synthesize). Its
    // "fires exactly once" guarantee is observable as: one episodic source leaves
    // exactly one node per entity and no duplicate edge — a second synthesis pass
    // would double-write. (The complementary "zero synthesis on a *failed* parent"
    // case needs job-failure injection and is left to a focused follow-up; here we
    // assert the committed-state idempotency that a live single write proves.)
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    client.remember("Alice works at Acme", scope.clone()).await?;
    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    assert_eq!(
        count_entities(&snapshot, "Alice"),
        1,
        "Alice must be one node, got {:?}",
        snapshot.nodes
    );
    assert_eq!(
        count_entities(&snapshot, "Acme"),
        1,
        "Acme must be one node, got {:?}",
        snapshot.nodes
    );
    assert_eq!(
        edges_between(&snapshot, "Alice", "Acme").len(),
        1,
        "exactly one Alice<->Acme edge, got {:?}",
        snapshot.edges,
    );
    Ok(())
}

// ─── c5: entity resolution dedup ─────────────────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_resolve_to_one_entity_when_a_later_memory_names_it_more_fully() -> anyhow::Result<()> {
    // "Alice" vs "Alice Smith" must clear MIN_ENTITY_SIMILARITY and resolve to one
    // node — the embedding resolver's job, unprovable against a staged store.
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    client.remember("Alice works at Acme", scope.clone()).await?;
    common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    client
        .remember("Alice Smith leads the platform team", scope.clone())
        .await?;
    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        s.edges.iter().any(|e| {
            let pair = format!("{} {}", e.subject, e.object).to_lowercase();
            pair.contains("team") || pair.contains("platform")
        })
    })
    .await?;

    assert_eq!(
        count_entities(&snapshot, "Alice"),
        1,
        "Alice and Alice Smith must dedup to one node, got {:?}",
        snapshot.nodes,
    );
    Ok(())
}

// ─── c6: contradiction = temporal invalidation, not delete ───────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_invalidate_old_edge_and_preserve_history_when_a_fact_is_superseded() -> anyhow::Result<()> {
    // The inspect snapshot deliberately includes superseded edges, so a closed
    // (valid_to set) Acme edge and a current Globex edge must coexist.
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    client.remember("Alice works at Acme", scope.clone()).await?;
    common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    client.remember("Alice now works at Globex", scope.clone()).await?;
    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Globex")
    })
    .await?;

    assert!(
        has_edge_between(&snapshot, "Alice", "Globex"),
        "new Globex edge must exist, got {:?}",
        snapshot.edges,
    );
    let acme_edges = edges_between(&snapshot, "Alice", "Acme");
    assert!(
        !acme_edges.is_empty(),
        "old Acme edge must be preserved as history, not deleted, got {:?}",
        snapshot.edges,
    );
    assert!(
        acme_edges.iter().any(|e| e.valid_to.is_some()),
        "old Acme edge must be closed (valid_to set), got {:?}",
        acme_edges,
    );
    Ok(())
}

// ─── c7: search enrichment ───────────────────────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_enrich_search_results_with_graph_context_when_requested() -> anyhow::Result<()> {
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    client.remember("Alice works at Acme", scope.clone()).await?;
    common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    let enriched = client.search("Alice", scope.clone()).with_graph().limit(10).await?;

    assert!(!enriched.list().is_empty(), "search must return the hit");
    assert!(
        !enriched.graph().relationships.is_empty(),
        "with_graph() must surface graph relationships, got {:?}",
        enriched.graph(),
    );
    Ok(())
}

// ─── c8: forget ref-counts ───────────────────────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_remove_graph_nodes_when_their_only_source_memory_is_forgotten() -> anyhow::Result<()> {
    use memoir_core::memory::ForgetTarget;

    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    let written = client.remember("Alice works at Acme", scope.clone()).await?;
    common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    client.forget(ForgetTarget::Pid(written.pid.clone())).await?;

    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        !has_edge_between(s, "Alice", "Acme")
    })
    .await?;

    assert!(
        !has_edge_between(&snapshot, "Alice", "Acme"),
        "forgetting the only source must remove the edge, got {:?}",
        snapshot.edges,
    );
    Ok(())
}

// ─── inherited 0012: live cross-scope inspect ────────────────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_return_whole_org_when_inspect_scope_omits_agent_and_user() -> anyhow::Result<()> {
    // Two scopes in one org: an org-only inspect must span both — the cross-scope
    // read no staged-store unit test could prove.
    let mut client = common::fresh_graph_client().await?;
    let org = format!("org_{}", nanoid::nanoid!(8));
    let scope_a = client.fresh_scope_in_org(&org);
    let scope_b = client.fresh_scope_in_org(&org);

    client.remember("Alice works at Acme", scope_a.clone()).await?;
    common::wait_until_graph_committed(&client, &scope_a, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Alice", "Acme")
    })
    .await?;
    client.remember("Bob works at Globex", scope_b.clone()).await?;
    common::wait_until_graph_committed(&client, &scope_b, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Bob", "Globex")
    })
    .await?;

    let snapshot = client.inspect_graph().org(org.clone()).await?;

    assert!(
        has_entity(&snapshot, "Alice"),
        "org-wide inspect must see scope_a's Alice, got {:?}",
        snapshot.nodes
    );
    assert!(
        has_entity(&snapshot, "Bob"),
        "org-wide inspect must see scope_b's Bob, got {:?}",
        snapshot.nodes
    );
    Ok(())
}

// ─── c9 + inherited 0013: reconcile rebuild + same-created_at page boundary ──

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_rebuild_every_memory_exactly_once_across_a_same_timestamp_page_boundary() -> anyhow::Result<()> {
    use sea_orm::{ConnectionTrait, Statement, Value};

    // Five distinct episodic facts → five distinct entities. We then clamp two of
    // them to one created_at so a page boundary cuts through the shared timestamp,
    // and rebuild with page_size = 2. Correct pagination re-derives ALL five
    // exactly once: a skip at the seam drops an entity; a double counts one twice.
    let mut client = common::fresh_graph_client().await?;
    let scope = client.fresh_scope();

    let people = ["Alice", "Bob", "Carol", "Dave", "Erin"];
    let mut pids = Vec::new();
    for person in people {
        let written = client
            .remember(format!("{person} works at Acme"), scope.clone())
            .await?;
        pids.push(written.pid);
    }
    common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        has_edge_between(s, "Erin", "Acme")
    })
    .await?;

    // Clamp the middle two pids to one shared created_at (test-crate raw SQL — no
    // production path can set created_at; the reconcile cursor pages by it).
    let shared_pids = vec![pids[1].clone(), pids[2].clone()];
    let raw = client.raw_db().await?;
    let clamp = Statement::from_sql_and_values(
        sea_orm::DatabaseBackend::Postgres,
        "UPDATE memories SET created_at = '2026-01-01T00:00:00+00:00' WHERE pid = ANY($1)",
        [Value::Array(
            sea_orm::sea_query::ArrayType::String,
            Some(Box::new(
                shared_pids.into_iter().map(|p| Value::String(Some(p))).collect(),
            )),
        )],
    );
    raw.execute_raw(clamp).await?;

    // page_size 2 forces the shared-timestamp pair to straddle a page seam.
    let summary = client
        .reconcile()
        .rebuild_graph(scope.clone())
        .rebuild_page_size(2)
        .await?;
    assert_eq!(
        summary.graph_rebuild_enqueued, 5,
        "all five episodic memories must re-enqueue exactly once"
    );

    let snapshot = common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
        people.iter().all(|p| has_entity(s, p))
    })
    .await?;

    for person in people {
        assert_eq!(
            count_entities(&snapshot, person),
            1,
            "{person} must rebuild exactly once (no skip, no dup), got {:?}",
            snapshot.nodes,
        );
    }
    Ok(())
}

// ─── cleanup: the teardown guard actually wipes a scope ──────────────────────

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_leave_no_graph_residue_after_a_tracked_scope_is_dropped() -> anyhow::Result<()> {
    let scope = {
        let mut client = common::fresh_graph_client().await?;
        let scope = client.fresh_scope();
        client.remember("Alice works at Acme", scope.clone()).await?;
        common::wait_until_graph_committed(&client, &scope, GRAPH_COMMIT_TIMEOUT, |s| {
            has_edge_between(s, "Alice", "Acme")
        })
        .await?;
        scope
        // client drops here → Drop forgets `scope` from the shared graph
    };

    let client = common::fresh_graph_client().await?;
    let snapshot = client
        .inspect_graph()
        .agent(scope.agent_id.clone())
        .org(scope.org_id.clone())
        .user(scope.user_id.clone())
        .await?;

    assert!(
        snapshot.is_empty(),
        "dropped TestClient must wipe its scope's graph, found {:?} / {:?}",
        snapshot.nodes,
        snapshot.edges,
    );
    Ok(())
}