Skip to main content

ai_memory/
db.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use rusqlite::{Connection, params};
7use std::collections::HashMap;
8use std::path::Path;
9
10use crate::models::{
11    AGENTS_NAMESPACE, AgentRegistration, Approval, ApproverType, DuplicateCheck, DuplicateMatch,
12    GovernanceDecision, GovernanceLevel, GovernancePolicy, GovernedAction, MAX_NAMESPACE_DEPTH,
13    Memory, MemoryLink, NamespaceCount, PROMOTION_THRESHOLD, PendingAction, Stats, Taxonomy,
14    TaxonomyNode, Tier, TierCount, namespace_ancestors,
15};
16
17/// Computed 4-tuple of visibility prefixes for an agent position (Task 1.5).
18/// Index 0 = agent's own namespace (private), 1 = parent (team),
19/// 2 = grandparent (unit), 3 = great-grandparent (org). Missing = `None`.
20type VisibilityPrefixes = (
21    Option<String>,
22    Option<String>,
23    Option<String>,
24    Option<String>,
25);
26
27fn compute_visibility_prefixes(as_agent: Option<&str>) -> VisibilityPrefixes {
28    let Some(ns) = as_agent else {
29        return (None, None, None, None);
30    };
31    let ancestors = namespace_ancestors(ns);
32    let p = ancestors.first().cloned();
33    let t = ancestors.get(1).cloned();
34    let u = ancestors.get(2).cloned();
35    let o = ancestors.get(3).cloned();
36    (p, t, u, o)
37}
38
39/// Rust-side visibility check for paths that can't easily attach SQL
40/// visibility (the HNSW branch of `recall_hybrid` iterates memories loaded
41/// via `get()`). Returns `true` when `as_agent` is unset (no filter) or
42/// when the memory's scope + namespace grant visibility to the caller.
43fn is_visible(mem: &Memory, prefixes: &VisibilityPrefixes) -> bool {
44    let (p, t, u, o) = prefixes;
45    if p.is_none() {
46        return true;
47    }
48    let scope = mem
49        .metadata
50        .get("scope")
51        .and_then(|v| v.as_str())
52        .unwrap_or("private");
53    match scope {
54        "collective" => true,
55        "private" => p.as_ref().is_some_and(|ns| &mem.namespace == ns),
56        "team" => matches_subtree(&mem.namespace, t.as_deref()),
57        "unit" => matches_subtree(&mem.namespace, u.as_deref()),
58        "org" => matches_subtree(&mem.namespace, o.as_deref()),
59        _ => false,
60    }
61}
62
63fn matches_subtree(namespace: &str, prefix: Option<&str>) -> bool {
64    match prefix {
65        None => false,
66        Some(p) => namespace == p || namespace.starts_with(&format!("{p}/")),
67    }
68}
69
70/// Generate the visibility WHERE-clause fragment starting at placeholder `start`.
71/// Uses placeholders `?start .. ?start+3` for private/team/unit/org prefixes.
72/// See `compute_visibility_prefixes` for the bind order.
73///
74/// Performance (v0.6.0 GA): each scope branch compares against the indexed
75/// generated column `scope_idx` (schema v10) rather than re-evaluating
76/// `json_extract(metadata, '$.scope')` per row. The query planner picks
77/// `idx_memories_scope_idx` whenever the predicate narrows by scope,
78/// dropping recall from "scan every namespace row and parse its JSON" to
79/// an index seek + per-row refinement. See `docs/ARCHITECTURAL_LIMITS.md`
80/// for which `SQLite` limits remain structural.
81///
82/// Security (issue #217): the team/unit/org branches use `LIKE` to expand a
83/// prefix into its sub-tree. Without escaping, a caller who can influence the
84/// prefix could inject SQL `LIKE` meta-characters (`%`, `_`) and broaden the
85/// match across unrelated namespaces. We neutralise this at SQL evaluation
86/// time by `replace()`-escaping `%` and `_` in the bound prefix and pairing
87/// the LIKE with `ESCAPE '\'`. `validate_namespace` already rejects backslash,
88/// so `\` cannot appear in the bound prefix and the escape sentinel is safe.
89/// The `=` equality side is unaffected by LIKE wildcards and binds the raw
90/// value so that legitimate namespaces containing `_` (e.g. `under_score`)
91/// continue to match exactly.
92fn visibility_clause(start: usize, table_alias: &str) -> String {
93    let private_ph = start;
94    let team_ph = start + 1;
95    let unit_ph = start + 2;
96    let org_ph = start + 3;
97    let ta = table_alias;
98    format!(
99        "AND (\
100            ?{private_ph} IS NULL \
101            OR {ta}.scope_idx = 'collective' \
102            OR ({ta}.scope_idx = 'private' AND {ta}.namespace = ?{private_ph}) \
103            OR ({ta}.scope_idx = 'team' AND ?{team_ph} IS NOT NULL AND ({ta}.namespace = ?{team_ph} OR {ta}.namespace LIKE replace(replace(?{team_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\')) \
104            OR ({ta}.scope_idx = 'unit' AND ?{unit_ph} IS NOT NULL AND ({ta}.namespace = ?{unit_ph} OR {ta}.namespace LIKE replace(replace(?{unit_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\')) \
105            OR ({ta}.scope_idx = 'org'  AND ?{org_ph}  IS NOT NULL AND ({ta}.namespace = ?{org_ph}  OR {ta}.namespace LIKE replace(replace(?{org_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\'))\
106        )"
107    )
108}
109
110const SCHEMA: &str = r"
111CREATE TABLE IF NOT EXISTS memories (
112    id               TEXT PRIMARY KEY,
113    tier             TEXT NOT NULL,
114    namespace        TEXT NOT NULL DEFAULT 'global',
115    title            TEXT NOT NULL,
116    content          TEXT NOT NULL,
117    tags             TEXT NOT NULL DEFAULT '[]',
118    priority         INTEGER NOT NULL DEFAULT 5,
119    confidence       REAL NOT NULL DEFAULT 1.0,
120    source           TEXT NOT NULL DEFAULT 'api',
121    access_count     INTEGER NOT NULL DEFAULT 0,
122    created_at       TEXT NOT NULL,
123    updated_at       TEXT NOT NULL,
124    last_accessed_at TEXT,
125    expires_at       TEXT,
126    metadata         TEXT NOT NULL DEFAULT '{}'
127);
128
129CREATE INDEX IF NOT EXISTS idx_memories_tier ON memories(tier);
130CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace);
131CREATE INDEX IF NOT EXISTS idx_memories_priority ON memories(priority DESC);
132CREATE INDEX IF NOT EXISTS idx_memories_expires ON memories(expires_at);
133CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_title_ns ON memories(title, namespace);
134
135CREATE TABLE IF NOT EXISTS memory_links (
136    source_id   TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
137    target_id   TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
138    relation    TEXT NOT NULL DEFAULT 'related_to',
139    created_at  TEXT NOT NULL,
140    PRIMARY KEY (source_id, target_id, relation)
141);
142
143CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
144    title,
145    content,
146    tags,
147    content=memories,
148    content_rowid=rowid
149);
150
151CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
152    INSERT INTO memories_fts(rowid, title, content, tags)
153    VALUES (new.rowid, new.title, new.content, new.tags);
154END;
155
156CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
157    INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
158    VALUES ('delete', old.rowid, old.title, old.content, old.tags);
159END;
160
161CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
162    INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
163    VALUES ('delete', old.rowid, old.title, old.content, old.tags);
164    INSERT INTO memories_fts(rowid, title, content, tags)
165    VALUES (new.rowid, new.title, new.content, new.tags);
166END;
167
168CREATE TABLE IF NOT EXISTS schema_version (
169    version INTEGER NOT NULL
170);
171";
172
173const CURRENT_SCHEMA_VERSION: i64 = 16;
174
175pub fn open(path: &Path) -> Result<Connection> {
176    let conn = Connection::open(path).context("failed to open database")?;
177    apply_sqlcipher_key(&conn)?;
178    conn.pragma_update(None, "journal_mode", "WAL")?;
179    conn.pragma_update(None, "busy_timeout", 5000)?;
180    conn.pragma_update(None, "synchronous", "NORMAL")?;
181    conn.pragma_update(None, "foreign_keys", "ON")?;
182    conn.execute_batch(SCHEMA)
183        .context("failed to initialize schema")?;
184    migrate(&conn)?;
185    Ok(conn)
186}
187
188/// v0.6.0.0 — apply the SQLCipher passphrase (PRAGMA key) when the
189/// `sqlcipher` cargo feature is built-in AND a passphrase has been
190/// provided via `AI_MEMORY_DB_PASSPHRASE` env var. The recommended
191/// way to set the env var is via the `--db-passphrase-file <path>`
192/// CLI flag, which reads the passphrase from a root-readable file
193/// and exports the env for the daemon's lifetime only. Passing the
194/// passphrase directly as an env var works but leaks to the process
195/// list (`ps -E`, `/proc/<pid>/environ`).
196///
197/// When the `sqlcipher` feature is NOT enabled, this function is a
198/// no-op — standard SQLite has no `PRAGMA key` so setting one errors.
199#[cfg(feature = "sqlcipher")]
200fn apply_sqlcipher_key(conn: &Connection) -> Result<()> {
201    let Ok(passphrase) = std::env::var("AI_MEMORY_DB_PASSPHRASE") else {
202        anyhow::bail!(
203            "sqlcipher build requires AI_MEMORY_DB_PASSPHRASE (set via --db-passphrase-file <path>)"
204        );
205    };
206    // PRAGMA key must be the FIRST operation on a new connection. The
207    // passphrase is quoted with SQL string-literal quoting rules.
208    let escaped = passphrase.replace('\'', "''");
209    conn.pragma_update(None, "key", format!("'{escaped}'"))
210        .context("PRAGMA key failed (wrong passphrase or unencrypted DB?)")?;
211    // Verify the key opened the database by running a cheap query.
212    conn.query_row("SELECT count(*) FROM sqlite_master", [], |r| {
213        r.get::<_, i64>(0)
214    })
215    .context("SQLCipher unlock verification failed — wrong passphrase?")?;
216    Ok(())
217}
218
219#[cfg(not(feature = "sqlcipher"))]
220#[allow(clippy::unnecessary_wraps)]
221fn apply_sqlcipher_key(_conn: &Connection) -> Result<()> {
222    Ok(())
223}
224
225const MIGRATION_V15_SQLITE: &str = include_str!("../migrations/sqlite/0010_v063_hierarchy_kg.sql");
226
227#[allow(clippy::too_many_lines)]
228fn migrate(conn: &Connection) -> Result<()> {
229    let version: i64 = conn
230        .query_row(
231            "SELECT COALESCE(MAX(version), 0) FROM schema_version",
232            [],
233            |r| r.get(0),
234        )
235        .unwrap_or(0);
236
237    if version >= CURRENT_SCHEMA_VERSION {
238        return Ok(());
239    }
240
241    conn.execute_batch("BEGIN EXCLUSIVE")?;
242    let result = (|| -> Result<()> {
243        if version < 2 {
244            let mut has_confidence = false;
245            let mut has_source = false;
246            let mut stmt = conn.prepare("PRAGMA table_info(memories)")?;
247            let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
248            for col in cols {
249                match col?.as_str() {
250                    "confidence" => has_confidence = true,
251                    "source" => has_source = true,
252                    _ => {}
253                }
254            }
255            drop(stmt);
256            if !has_confidence {
257                conn.execute(
258                    "ALTER TABLE memories ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0",
259                    [],
260                )?;
261            }
262            if !has_source {
263                conn.execute(
264                    "ALTER TABLE memories ADD COLUMN source TEXT NOT NULL DEFAULT 'api'",
265                    [],
266                )?;
267            }
268        }
269
270        if version < 3 {
271            // Add embedding column for semantic search (Phase 1+2)
272            let mut has_embedding = false;
273            let mut stmt = conn.prepare("PRAGMA table_info(memories)")?;
274            let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
275            for col in cols {
276                if col?.as_str() == "embedding" {
277                    has_embedding = true;
278                }
279            }
280            drop(stmt);
281            if !has_embedding {
282                conn.execute("ALTER TABLE memories ADD COLUMN embedding BLOB", [])?;
283            }
284        }
285        if version < 4 {
286            conn.execute_batch(
287                "CREATE TABLE IF NOT EXISTS archived_memories (
288                    id               TEXT PRIMARY KEY,
289                    tier             TEXT NOT NULL,
290                    namespace        TEXT NOT NULL DEFAULT 'global',
291                    title            TEXT NOT NULL,
292                    content          TEXT NOT NULL,
293                    tags             TEXT NOT NULL DEFAULT '[]',
294                    priority         INTEGER NOT NULL DEFAULT 5,
295                    confidence       REAL NOT NULL DEFAULT 1.0,
296                    source           TEXT NOT NULL DEFAULT 'api',
297                    access_count     INTEGER NOT NULL DEFAULT 0,
298                    created_at       TEXT NOT NULL,
299                    updated_at       TEXT NOT NULL,
300                    last_accessed_at TEXT,
301                    expires_at       TEXT,
302                    archived_at      TEXT NOT NULL,
303                    archive_reason   TEXT NOT NULL DEFAULT 'ttl_expired',
304                    metadata         TEXT NOT NULL DEFAULT '{}'
305                );
306                CREATE INDEX IF NOT EXISTS idx_archived_namespace ON archived_memories(namespace);
307                CREATE INDEX IF NOT EXISTS idx_archived_at ON archived_memories(archived_at);",
308            )?;
309        }
310        if version < 5 {
311            conn.execute_batch(
312                "CREATE TABLE IF NOT EXISTS namespace_meta (
313                    namespace    TEXT PRIMARY KEY,
314                    standard_id  TEXT,
315                    updated_at   TEXT NOT NULL
316                );",
317            )?;
318        }
319        if version < 6 {
320            // Add parent_namespace column for rule layering
321            let has_parent: bool = conn
322                .prepare("SELECT parent_namespace FROM namespace_meta LIMIT 0")
323                .is_ok();
324            if !has_parent {
325                conn.execute_batch("ALTER TABLE namespace_meta ADD COLUMN parent_namespace TEXT;")?;
326            }
327        }
328        if version < 7 {
329            // Add metadata JSON column to memories and archived_memories tables
330            let has_metadata: bool = conn
331                .prepare("SELECT metadata FROM memories LIMIT 0")
332                .is_ok();
333            if !has_metadata {
334                conn.execute(
335                    "ALTER TABLE memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
336                    [],
337                )?;
338            }
339            let has_archive_metadata: bool = conn
340                .prepare("SELECT metadata FROM archived_memories LIMIT 0")
341                .is_ok();
342            if !has_archive_metadata {
343                conn.execute(
344                    "ALTER TABLE archived_memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
345                    [],
346                )?;
347            }
348        }
349        if version < 8 {
350            // Task 1.9: pending_actions table for governance-queued operations
351            conn.execute_batch(
352                "CREATE TABLE IF NOT EXISTS pending_actions (
353                    id            TEXT PRIMARY KEY,
354                    action_type   TEXT NOT NULL,
355                    memory_id     TEXT,
356                    namespace     TEXT NOT NULL,
357                    payload       TEXT NOT NULL DEFAULT '{}',
358                    requested_by  TEXT NOT NULL,
359                    requested_at  TEXT NOT NULL,
360                    status        TEXT NOT NULL DEFAULT 'pending',
361                    decided_by    TEXT,
362                    decided_at    TEXT
363                );
364                CREATE INDEX IF NOT EXISTS idx_pending_status    ON pending_actions(status);
365                CREATE INDEX IF NOT EXISTS idx_pending_namespace ON pending_actions(namespace);",
366            )?;
367        }
368        if version < 9 {
369            // Task 1.10: approvals JSON array for consensus approver type
370            let has_approvals: bool = conn
371                .prepare("SELECT approvals FROM pending_actions LIMIT 0")
372                .is_ok();
373            if !has_approvals {
374                conn.execute(
375                    "ALTER TABLE pending_actions ADD COLUMN approvals TEXT NOT NULL DEFAULT '[]'",
376                    [],
377                )?;
378            }
379        }
380
381        if version < 10 {
382            // v0.6.0 GA: index `scope` so visibility filtering isn't a
383            // JSON scan. Uses a VIRTUAL generated column (no row bytes
384            // spent) plus a conventional B-tree index. The `visibility_clause`
385            // SQL compares against the generated column directly — SQLite's
386            // query planner picks the index because the comparison is on a
387            // real column, not a repeated expression.
388            //
389            // The expression is guarded by `json_valid(metadata)` so rows
390            // with legacy / corrupt metadata (we test this path explicitly
391            // in `metadata_corrupt_column_falls_back_to_empty`) are still
392            // writable — SQLite evaluates generated-column expressions on
393            // every write that touches the source column, and an uncaught
394            // `json_extract` failure would turn every corrupt-row write
395            // into a constraint error.
396            let has_scope_idx: bool = conn
397                .prepare("SELECT scope_idx FROM memories LIMIT 0")
398                .is_ok();
399            if !has_scope_idx {
400                conn.execute(
401                    "ALTER TABLE memories ADD COLUMN scope_idx TEXT \
402                     GENERATED ALWAYS AS (\
403                         CASE WHEN json_valid(metadata) \
404                         THEN COALESCE(json_extract(metadata, '$.scope'), 'private') \
405                         ELSE 'private' END\
406                     ) VIRTUAL",
407                    [],
408                )?;
409            }
410            conn.execute(
411                "CREATE INDEX IF NOT EXISTS idx_memories_scope_idx ON memories(scope_idx)",
412                [],
413            )?;
414        }
415
416        if version < 11 {
417            // Phase 3 foundation (issue #224): vector-clock sync state.
418            // Stores the latest `updated_at` timestamp this peer has seen
419            // from each known remote peer. Used by the future CRDT-lite
420            // merge to skip memories the caller has already seen and to
421            // emit incremental `GET /api/v1/sync/since?...` responses.
422            //
423            // The table is additive — it does NOT change any existing
424            // sync behaviour in v0.6.0 GA. Entries are created lazily by
425            // the HTTP sync endpoints and by `sync --dry-run` telemetry.
426            conn.execute_batch(
427                "CREATE TABLE IF NOT EXISTS sync_state (
428                    agent_id       TEXT NOT NULL,
429                    peer_id        TEXT NOT NULL,
430                    last_seen_at   TEXT NOT NULL,
431                    last_pulled_at TEXT NOT NULL,
432                    PRIMARY KEY (agent_id, peer_id)
433                );
434                CREATE INDEX IF NOT EXISTS idx_sync_state_agent ON sync_state(agent_id);",
435            )?;
436        }
437
438        if version < 12 {
439            // Phase 3 Task 3b.1 (issue #224): track the high-watermark of
440            // local memories this agent has successfully pushed to each
441            // peer. The daemon uses it to stream only deltas on the next
442            // push cycle. Null for rows from v11 that predate this column.
443            let has_last_pushed: bool = conn
444                .prepare("SELECT last_pushed_at FROM sync_state LIMIT 0")
445                .is_ok();
446            if !has_last_pushed {
447                conn.execute("ALTER TABLE sync_state ADD COLUMN last_pushed_at TEXT", [])?;
448            }
449        }
450
451        if version < 13 {
452            // v0.6.0.0 — webhook subscriptions. Events fire on memory_store
453            // (and, in v0.6.1, delete/promote/link) and are dispatched as
454            // HMAC-SHA256-signed POSTs to subscriber URLs. `events` is a
455            // comma-separated whitelist; `*` = all current + future events.
456            // `secret_hash` stores a SHA-256 of the operator-supplied
457            // shared secret — the plaintext never lands in the DB.
458            conn.execute(
459                "CREATE TABLE IF NOT EXISTS subscriptions (
460                    id TEXT PRIMARY KEY,
461                    url TEXT NOT NULL,
462                    events TEXT NOT NULL DEFAULT '*',
463                    secret_hash TEXT,
464                    namespace_filter TEXT,
465                    agent_filter TEXT,
466                    created_by TEXT,
467                    created_at TEXT NOT NULL,
468                    last_dispatched_at TEXT,
469                    dispatch_count INTEGER NOT NULL DEFAULT 0,
470                    failure_count INTEGER NOT NULL DEFAULT 0
471                )",
472                [],
473            )?;
474            conn.execute(
475                "CREATE INDEX IF NOT EXISTS idx_subscriptions_url ON subscriptions(url)",
476                [],
477            )?;
478        }
479
480        if version < 14 {
481            // Ultrareview #342: list / search / recall queries filter by
482            // `json_extract(metadata, '$.agent_id') = ?`, which SQLite
483            // cannot index. On large mesh peers this degenerates to a
484            // full table scan per request and a DoS vector — a single
485            // authenticated client hitting `/memories?agent_id=X` in a
486            // loop pegs CPU and blocks other queries on the shared
487            // connection. Add a VIRTUAL generated column so the
488            // comparison becomes a real column lookup the query planner
489            // can serve from an index.
490            //
491            // Ultrareview #353: also add `created_at` index so export
492            // and snapshot queries stop scanning + sorting full table.
493            let has_agent_id_idx: bool = conn
494                .prepare("SELECT agent_id_idx FROM memories LIMIT 0")
495                .is_ok();
496            if !has_agent_id_idx {
497                conn.execute(
498                    "ALTER TABLE memories ADD COLUMN agent_id_idx TEXT \
499                     GENERATED ALWAYS AS (\
500                         CASE WHEN json_valid(metadata) \
501                         THEN json_extract(metadata, '$.agent_id') \
502                         ELSE NULL END\
503                     ) VIRTUAL",
504                    [],
505                )?;
506            }
507            conn.execute(
508                "CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id_idx)",
509                [],
510            )?;
511            conn.execute(
512                "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at)",
513                [],
514            )?;
515        }
516
517        if version < 15 {
518            // v0.6.3 Stream B — Temporal-Validity KG schema additions.
519            // Charter §"Critical Schema Reference" (lines 686–723):
520            // four temporal columns on `memory_links`, three temporal
521            // indexes for KG traversal queries, and an `entity_aliases`
522            // side table for the upcoming entity registry. Pure additive
523            // — no existing column or index is dropped or renamed, so
524            // existing `link()` / `links_for()` paths keep working with
525            // the new columns NULL on legacy rows. The `valid_from`
526            // backfill matches the charter pre-flight default
527            // (charter line 428): set to the source memory's
528            // `created_at` to avoid null-handling complexity in v0.6.3
529            // KG query code.
530            //
531            // Type note: charter said `TIMESTAMP` for `valid_from` and
532            // `valid_until`. SQLite has no native TIMESTAMP type — it
533            // stores timestamps as TEXT (ISO-8601), REAL (Julian), or
534            // INTEGER (unix). The codebase uses TEXT throughout (matches
535            // every other timestamp column in this schema and matches
536            // chrono's `to_rfc3339()` output). The Postgres adapter at
537            // `src/store/postgres_schema.sql` uses `TIMESTAMPTZ` —
538            // semantically equivalent across both backends.
539            //
540            // The DDL itself lives in migrations/sqlite/0010_v063_hierarchy_kg.sql
541            // (and migrations/postgres/0010_v063_hierarchy_kg.sql for the
542            // Postgres adapter). Loaded via include_str! at compile time
543            // and executed below via execute_batch. The column-existence
544            // checks remain inline here because SQLite cannot do
545            // ALTER TABLE ADD COLUMN IF NOT EXISTS.
546            let has_valid_from = conn
547                .prepare("SELECT valid_from FROM memory_links LIMIT 0")
548                .is_ok();
549            if !has_valid_from {
550                conn.execute("ALTER TABLE memory_links ADD COLUMN valid_from TEXT", [])?;
551            }
552            let has_valid_until = conn
553                .prepare("SELECT valid_until FROM memory_links LIMIT 0")
554                .is_ok();
555            if !has_valid_until {
556                conn.execute("ALTER TABLE memory_links ADD COLUMN valid_until TEXT", [])?;
557            }
558            let has_observed_by = conn
559                .prepare("SELECT observed_by FROM memory_links LIMIT 0")
560                .is_ok();
561            if !has_observed_by {
562                conn.execute("ALTER TABLE memory_links ADD COLUMN observed_by TEXT", [])?;
563            }
564            let has_signature = conn
565                .prepare("SELECT signature FROM memory_links LIMIT 0")
566                .is_ok();
567            if !has_signature {
568                conn.execute("ALTER TABLE memory_links ADD COLUMN signature BLOB", [])?;
569            }
570
571            // All INDEX and TABLE statements are idempotent; batch-run the migration
572            conn.execute_batch(MIGRATION_V15_SQLITE)?;
573        }
574
575        if version < 16 {
576            // v0.6.4 prep: explicitly document that the existing
577            // idx_memories_namespace already supports prefix LIKE under
578            // SQLite's default BINARY collation. Bump version so Postgres
579            // peers' text_pattern_ops index is part of the same migration
580            // generation.
581            // No DDL needed for SQLite — index already prefix-friendly.
582        }
583
584        conn.execute("DELETE FROM schema_version", [])?;
585        conn.execute(
586            "INSERT INTO schema_version (version) VALUES (?1)",
587            params![CURRENT_SCHEMA_VERSION],
588        )?;
589        Ok(())
590    })();
591
592    match result {
593        Ok(()) => {
594            conn.execute_batch("COMMIT")?;
595            Ok(())
596        }
597        Err(e) => {
598            let _ = conn.execute_batch("ROLLBACK");
599            Err(e)
600        }
601    }
602}
603
604fn row_to_memory(row: &rusqlite::Row) -> rusqlite::Result<Memory> {
605    let tags_json: String = row.get("tags")?;
606    let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
607    let tier_str: String = row.get("tier")?;
608    let tier = Tier::from_str(&tier_str).unwrap_or(Tier::Mid);
609    let metadata_str: String = row
610        .get::<_, String>("metadata")
611        .unwrap_or_else(|_| "{}".to_string());
612    let metadata: serde_json::Value = serde_json::from_str(&metadata_str).unwrap_or_else(|e| {
613        tracing::warn!("corrupt metadata in DB row, defaulting to {{}}: {e}");
614        serde_json::json!({})
615    });
616    Ok(Memory {
617        id: row.get("id")?,
618        tier,
619        namespace: row.get("namespace")?,
620        title: row.get("title")?,
621        content: row.get("content")?,
622        tags,
623        priority: row.get("priority")?,
624        confidence: row.get("confidence").unwrap_or(1.0),
625        source: row.get("source").unwrap_or_else(|_| "api".to_string()),
626        access_count: row.get("access_count")?,
627        created_at: row.get("created_at")?,
628        updated_at: row.get("updated_at")?,
629        last_accessed_at: row.get("last_accessed_at")?,
630        expires_at: row.get("expires_at")?,
631        metadata,
632    })
633}
634
635/// Insert with upsert on title+namespace. Returns the ID (existing or new).
636///
637/// Ultrareview #352: collapses the previous `INSERT`/`ON CONFLICT` +
638/// separate `SELECT` into a single `INSERT ... RETURNING id`. Another
639/// concurrent writer could otherwise slot in between the two statements
640/// and the `SELECT` would return the wrong row id. `SQLite` 3.35+
641/// supports `RETURNING`; it executes atomically within the `INSERT`.
642pub fn insert(conn: &Connection, mem: &Memory) -> Result<String> {
643    let tags_json = serde_json::to_string(&mem.tags)?;
644    let metadata_json = serde_json::to_string(&mem.metadata)?;
645    let actual_id: String = conn.query_row(
646        "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, last_accessed_at, expires_at, metadata)
647         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
648         ON CONFLICT(title, namespace) DO UPDATE SET
649            content = excluded.content,
650            tags = excluded.tags,
651            priority = MAX(memories.priority, excluded.priority),
652            confidence = MAX(memories.confidence, excluded.confidence),
653            source = excluded.source,
654            tier = CASE WHEN excluded.tier = 'long' THEN 'long'
655                        WHEN memories.tier = 'long' THEN 'long'
656                        WHEN excluded.tier = 'mid' THEN 'mid'
657                        ELSE memories.tier END,
658            updated_at = excluded.updated_at,
659            expires_at = CASE WHEN excluded.tier = 'long' OR memories.tier = 'long' THEN NULL
660                              ELSE COALESCE(excluded.expires_at, memories.expires_at) END,
661            -- Preserve metadata.agent_id across upsert (NHI provenance is immutable).
662            metadata = CASE
663                WHEN json_extract(memories.metadata, '$.agent_id') IS NOT NULL
664                THEN json_set(
665                    excluded.metadata,
666                    '$.agent_id',
667                    json_extract(memories.metadata, '$.agent_id')
668                )
669                ELSE excluded.metadata
670            END
671         RETURNING id",
672        params![
673            mem.id, mem.tier.as_str(), mem.namespace, mem.title, mem.content,
674            tags_json, mem.priority, mem.confidence, mem.source, mem.access_count,
675            mem.created_at, mem.updated_at, mem.last_accessed_at, mem.expires_at,
676            metadata_json,
677        ],
678        |r| r.get(0),
679    )?;
680    Ok(actual_id)
681}
682
683pub fn get(conn: &Connection, id: &str) -> Result<Option<Memory>> {
684    let mut stmt = conn.prepare("SELECT * FROM memories WHERE id = ?1")?;
685    let mut rows = stmt.query_map(params![id], row_to_memory)?;
686    match rows.next() {
687        Some(Ok(m)) => Ok(Some(m)),
688        Some(Err(e)) => Err(e.into()),
689        None => Ok(None),
690    }
691}
692
693/// Look up a memory by ID prefix. Returns the memory if exactly one match is found.
694/// Returns `Ok(None)` if no matches. Returns an error if the prefix is ambiguous (>1 match).
695pub fn get_by_prefix(conn: &Connection, prefix: &str) -> Result<Option<Memory>> {
696    // Escape SQL LIKE wildcards in the prefix to prevent % and _ from matching broadly
697    let escaped = prefix.replace('%', "\\%").replace('_', "\\_");
698    let pattern = format!("{escaped}%");
699    let mut stmt = conn.prepare("SELECT * FROM memories WHERE id LIKE ?1 ESCAPE '\\'")?;
700    let rows: Vec<Memory> = stmt
701        .query_map(params![pattern], row_to_memory)?
702        .filter_map(Result::ok)
703        .collect();
704    match rows.len() {
705        0 => Ok(None),
706        1 => Ok(Some(rows.into_iter().next().expect("len checked"))),
707        n => {
708            let ids: Vec<String> = rows.iter().map(|m| m.id.clone()).collect();
709            anyhow::bail!(
710                "ambiguous ID prefix '{prefix}': {n} matches\n{}",
711                ids.join("\n")
712            );
713        }
714    }
715}
716
717/// Resolve an ID that may be a prefix. Tries exact match first, then prefix match.
718pub fn resolve_id(conn: &Connection, id: &str) -> Result<Option<Memory>> {
719    if let Some(mem) = get(conn, id)? {
720        return Ok(Some(mem));
721    }
722    get_by_prefix(conn, id)
723}
724
725/// Bump access count, extend TTL, auto-promote — atomic via transaction.
726pub fn touch(conn: &Connection, id: &str, short_extend: i64, mid_extend: i64) -> Result<()> {
727    let now = Utc::now();
728    let now_str = now.to_rfc3339();
729    let short_expires = (now + chrono::Duration::seconds(short_extend)).to_rfc3339();
730    let mid_expires = (now + chrono::Duration::seconds(mid_extend)).to_rfc3339();
731
732    conn.execute_batch("BEGIN IMMEDIATE")?;
733
734    let result = (|| -> Result<()> {
735        conn.execute(
736            "UPDATE memories SET
737                access_count = MIN(access_count + 1, 1000000),
738                last_accessed_at = ?1,
739                expires_at = CASE
740                    WHEN tier = 'long' THEN expires_at
741                    WHEN tier = 'short' AND expires_at IS NOT NULL THEN ?2
742                    WHEN tier = 'mid' AND expires_at IS NOT NULL THEN ?3
743                    ELSE expires_at
744                END
745             WHERE id = ?4",
746            params![now_str, short_expires, mid_expires, id],
747        )?;
748
749        conn.execute(
750            "UPDATE memories SET tier = 'long', expires_at = NULL, updated_at = ?1
751             WHERE id = ?2 AND tier = 'mid' AND access_count >= ?3",
752            params![now_str, id, PROMOTION_THRESHOLD],
753        )?;
754
755        conn.execute(
756            "UPDATE memories SET priority = MIN(priority + 1, 10)
757             WHERE id = ?1 AND access_count > 0 AND access_count % 10 = 0 AND priority < 10",
758            params![id],
759        )?;
760
761        Ok(())
762    })();
763
764    match result {
765        Ok(()) => {
766            conn.execute_batch("COMMIT")?;
767            Ok(())
768        }
769        Err(e) => {
770            if let Err(rb) = conn.execute_batch("ROLLBACK") {
771                tracing::error!("ROLLBACK failed in touch: {}", rb);
772            }
773            Err(e)
774        }
775    }
776}
777
778#[allow(clippy::too_many_arguments)]
779/// Update a memory by ID. Returns (found, `content_changed`) so callers can
780/// re-generate embeddings when the searchable text has changed.
781pub fn update(
782    conn: &Connection,
783    id: &str,
784    title: Option<&str>,
785    content: Option<&str>,
786    tier: Option<&Tier>,
787    namespace: Option<&str>,
788    tags: Option<&Vec<String>>,
789    priority: Option<i32>,
790    confidence: Option<f64>,
791    expires_at: Option<&str>,
792    metadata: Option<&serde_json::Value>,
793) -> Result<(bool, bool)> {
794    let mut stmt = conn.prepare("SELECT * FROM memories WHERE id = ?1")?;
795    let mut rows = stmt.query_map(params![id], row_to_memory)?;
796    let Some(Ok(existing)) = rows.next() else {
797        return Ok((false, false));
798    };
799    drop(rows);
800    drop(stmt);
801
802    let new_title = title.unwrap_or(&existing.title);
803    let new_content = content.unwrap_or(&existing.content);
804    let content_changed = new_title != existing.title || new_content != existing.content;
805
806    // Tier downgrade protection: never downgrade, consistent with insert path.
807    let effective_tier = match (tier, &existing.tier) {
808        (Some(requested), existing_tier) => match (existing_tier, requested) {
809            (Tier::Long, _) => &Tier::Long,         // long never downgrades
810            (Tier::Mid, Tier::Short) => &Tier::Mid, // mid never downgrades to short
811            (_, requested) => requested,            // upgrades and same-tier are fine
812        },
813        (None, existing_tier) => existing_tier,
814    };
815
816    let namespace = namespace.unwrap_or(&existing.namespace);
817    let tags = tags.unwrap_or(&existing.tags);
818    let priority = priority.unwrap_or(existing.priority);
819    let confidence = confidence.unwrap_or(existing.confidence);
820    // Treat empty string as None (clear expiry) — don't store "" in the DB
821    let expires_at = match expires_at {
822        Some("" | "null") => None,
823        Some(v) => Some(v),
824        None => existing.expires_at.as_deref(),
825    };
826    let metadata = metadata.unwrap_or(&existing.metadata);
827    let tags_json = serde_json::to_string(tags)?;
828    let metadata_json = serde_json::to_string(metadata)?;
829    let now = Utc::now().to_rfc3339();
830
831    // Ultrareview #354: rely on the UNIQUE INDEX on (title, namespace)
832    // to enforce collision atomically at the DB layer. The previous
833    // check-then-update sequence had a race — another transaction
834    // could insert a colliding row between the SELECT and the UPDATE,
835    // and the UPDATE would surface as a generic SQLite constraint
836    // error to the caller. Now the collision check is inline: the
837    // UPDATE fails with a well-scoped UniqueViolation, and we re-
838    // query the colliding row's id only on that specific error for
839    // the friendly message.
840    let update_res = conn.execute(
841        "UPDATE memories SET tier=?1, namespace=?2, title=?3, content=?4, tags=?5, priority=?6, confidence=?7, updated_at=?8, expires_at=?9, metadata=?10
842         WHERE id=?11",
843        params![effective_tier.as_str(), namespace, new_title, new_content, tags_json, priority, confidence, now, expires_at, metadata_json, id],
844    );
845    match update_res {
846        Ok(_) => Ok((true, content_changed)),
847        Err(rusqlite::Error::SqliteFailure(err, _))
848            if err.code == rusqlite::ErrorCode::ConstraintViolation =>
849        {
850            let other: Option<String> = conn
851                .query_row(
852                    "SELECT id FROM memories WHERE title = ?1 AND namespace = ?2 AND id != ?3",
853                    params![new_title, namespace, id],
854                    |r| r.get(0),
855                )
856                .ok();
857            if let Some(other_id) = other {
858                anyhow::bail!(
859                    "title '{new_title}' already exists in namespace '{namespace}' (memory {other_id})"
860                );
861            }
862            Err(anyhow::anyhow!("update failed with constraint violation"))
863        }
864        Err(e) => Err(e.into()),
865    }
866}
867
868pub fn delete(conn: &Connection, id: &str) -> Result<bool> {
869    // Clean up namespace_meta if this memory was a namespace standard
870    conn.execute(
871        "DELETE FROM namespace_meta WHERE standard_id = ?1",
872        params![id],
873    )?;
874    let changed = conn.execute("DELETE FROM memories WHERE id = ?1", params![id])?;
875    Ok(changed > 0)
876}
877
878/// Move a memory from `memories` to `archived_memories`. Used by the
879/// HTTP `/api/v1/archive` explicit-archive endpoint (S29) and by
880/// `sync_push` when a peer pushes an `archives: [id]` record.
881///
882/// Unlike `gc(archive=true)` this does not filter on `expires_at` — the
883/// caller is explicitly asking for the row to be archived right now.
884///
885/// Returns `true` if a row was moved, `false` if no live memory existed
886/// with this id (e.g. it was already archived or never written locally).
887/// A missing-on-peer id is expected during normal fanout and callers
888/// treat it as a no-op.
889///
890/// # Errors
891///
892/// Returns an error if the INSERT-SELECT or DELETE fails.
893pub fn archive_memory(conn: &Connection, id: &str, reason: Option<&str>) -> Result<bool> {
894    let now = Utc::now().to_rfc3339();
895    let reason = reason.unwrap_or("archive");
896    conn.execute_batch("BEGIN IMMEDIATE")?;
897    let result = (|| -> Result<bool> {
898        let exists: bool = conn
899            .query_row(
900                "SELECT COUNT(*) > 0 FROM memories WHERE id = ?1",
901                params![id],
902                |r| r.get(0),
903            )
904            .unwrap_or(false);
905        if !exists {
906            return Ok(false);
907        }
908        conn.execute(
909            "INSERT OR REPLACE INTO archived_memories
910             (id, tier, namespace, title, content, tags, priority, confidence,
911              source, access_count, created_at, updated_at, last_accessed_at,
912              expires_at, archived_at, archive_reason, metadata)
913             SELECT id, tier, namespace, title, content, tags, priority, confidence,
914                    source, access_count, created_at, updated_at, last_accessed_at,
915                    expires_at, ?1, ?2, metadata
916             FROM memories WHERE id = ?3",
917            params![now, reason, id],
918        )?;
919        // Clean up namespace_meta — mirrors `delete`'s cleanup so an archived
920        // row is not still referenced as the namespace standard.
921        conn.execute(
922            "DELETE FROM namespace_meta WHERE standard_id = ?1",
923            params![id],
924        )?;
925        let removed = conn.execute("DELETE FROM memories WHERE id = ?1", params![id])?;
926        Ok(removed > 0)
927    })();
928    match result {
929        Ok(moved) => {
930            conn.execute_batch("COMMIT")?;
931            Ok(moved)
932        }
933        Err(e) => {
934            let _ = conn.execute_batch("ROLLBACK");
935            Err(e)
936        }
937    }
938}
939
940/// Count memories that would be deleted by forget (for `dry_run`).
941pub fn forget_count(
942    conn: &Connection,
943    namespace: Option<&str>,
944    pattern: Option<&str>,
945    tier: Option<&Tier>,
946) -> Result<usize> {
947    if pattern.is_none() && namespace.is_none() && tier.is_none() {
948        anyhow::bail!("at least one of namespace, pattern, or tier is required");
949    }
950    if let Some(pat) = pattern {
951        let fts_query = sanitize_fts_query(pat, true);
952        let tier_str = tier.map(|t| t.as_str().to_string());
953        let count: i64 = conn.query_row(
954            "SELECT COUNT(*) FROM memories WHERE rowid IN (
955                SELECT m.rowid FROM memories_fts fts
956                JOIN memories m ON m.rowid = fts.rowid
957                WHERE memories_fts MATCH ?1
958                  AND (?2 IS NULL OR m.namespace = ?2)
959                  AND (?3 IS NULL OR m.tier = ?3)
960            )",
961            params![fts_query, namespace, tier_str],
962            |r| r.get(0),
963        )?;
964        return Ok(usize::try_from(count).unwrap_or(0));
965    }
966    let tier_str = tier.map(|t| t.as_str().to_string());
967    let count: i64 = conn.query_row(
968        "SELECT COUNT(*) FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
969        params![namespace, tier_str],
970        |r| r.get(0),
971    )?;
972    Ok(usize::try_from(count).unwrap_or(0))
973}
974
975/// Forget by pattern — delete memories matching namespace + FTS pattern + tier.
976/// If `archive` is true, archives memories before deletion.
977pub fn forget(
978    conn: &Connection,
979    namespace: Option<&str>,
980    pattern: Option<&str>,
981    tier: Option<&Tier>,
982    archive: bool,
983) -> Result<usize> {
984    if pattern.is_none() && namespace.is_none() && tier.is_none() {
985        anyhow::bail!("at least one of namespace, pattern, or tier is required");
986    }
987
988    if archive {
989        // Archive matching memories before deletion
990        let now = Utc::now().to_rfc3339();
991        if let Some(pat) = pattern {
992            let fts_query = sanitize_fts_query(pat, true);
993            let tier_str = tier.map(|t| t.as_str().to_string());
994            conn.execute(
995                "INSERT OR REPLACE INTO archived_memories
996                 (id, tier, namespace, title, content, tags, priority, confidence,
997                  source, access_count, created_at, updated_at, last_accessed_at,
998                  expires_at, archived_at, archive_reason)
999                 SELECT id, tier, namespace, title, content, tags, priority, confidence,
1000                        source, access_count, created_at, updated_at, last_accessed_at,
1001                        expires_at, ?4, 'forget'
1002                 FROM memories WHERE rowid IN (
1003                    SELECT m.rowid FROM memories_fts fts
1004                    JOIN memories m ON m.rowid = fts.rowid
1005                    WHERE memories_fts MATCH ?1
1006                      AND (?2 IS NULL OR m.namespace = ?2)
1007                      AND (?3 IS NULL OR m.tier = ?3)
1008                 )",
1009                params![fts_query, namespace, tier_str, now],
1010            )?;
1011        } else {
1012            let tier_str = tier.map(|t| t.as_str().to_string());
1013            conn.execute(
1014                "INSERT OR REPLACE INTO archived_memories
1015                 (id, tier, namespace, title, content, tags, priority, confidence,
1016                  source, access_count, created_at, updated_at, last_accessed_at,
1017                  expires_at, archived_at, archive_reason)
1018                 SELECT id, tier, namespace, title, content, tags, priority, confidence,
1019                        source, access_count, created_at, updated_at, last_accessed_at,
1020                        expires_at, ?3, 'forget'
1021                 FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
1022                params![namespace, tier_str, now],
1023            )?;
1024        }
1025    }
1026
1027    // If pattern provided, use FTS to find matching IDs
1028    if let Some(pat) = pattern {
1029        let fts_query = sanitize_fts_query(pat, true);
1030        let tier_str = tier.map(|t| t.as_str().to_string());
1031        let deleted = conn.execute(
1032            "DELETE FROM memories WHERE rowid IN (
1033                SELECT m.rowid FROM memories_fts fts
1034                JOIN memories m ON m.rowid = fts.rowid
1035                WHERE memories_fts MATCH ?1
1036                  AND (?2 IS NULL OR m.namespace = ?2)
1037                  AND (?3 IS NULL OR m.tier = ?3)
1038            )",
1039            params![fts_query, namespace, tier_str],
1040        )?;
1041        return Ok(deleted);
1042    }
1043
1044    let tier_str = tier.map(|t| t.as_str().to_string());
1045    let deleted = conn.execute(
1046        "DELETE FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
1047        params![namespace, tier_str],
1048    )?;
1049    Ok(deleted)
1050}
1051
1052#[allow(clippy::too_many_arguments)]
1053pub fn list(
1054    conn: &Connection,
1055    namespace: Option<&str>,
1056    tier: Option<&Tier>,
1057    limit: usize,
1058    offset: usize,
1059    min_priority: Option<i32>,
1060    since: Option<&str>,
1061    until: Option<&str>,
1062    tags_filter: Option<&str>,
1063    agent_id: Option<&str>,
1064) -> Result<Vec<Memory>> {
1065    let now = Utc::now().to_rfc3339();
1066    let tier_str = tier.map(|t| t.as_str().to_string());
1067    let mut stmt = conn.prepare(
1068        "SELECT * FROM memories
1069         WHERE (?1 IS NULL OR namespace = ?1)
1070           AND (?2 IS NULL OR tier = ?2)
1071           AND (?3 IS NULL OR priority >= ?3)
1072           AND (expires_at IS NULL OR expires_at > ?4)
1073           AND (?5 IS NULL OR created_at >= ?5)
1074           AND (?6 IS NULL OR created_at <= ?6)
1075           AND (?7 IS NULL OR EXISTS (SELECT 1 FROM json_each(memories.tags) WHERE json_each.value = ?7))
1076           AND (?10 IS NULL OR agent_id_idx = ?10)
1077         ORDER BY priority DESC, updated_at DESC
1078         LIMIT ?8 OFFSET ?9",
1079    )?;
1080    let rows = stmt.query_map(
1081        params![
1082            namespace,
1083            tier_str,
1084            min_priority,
1085            now,
1086            since,
1087            until,
1088            tags_filter,
1089            limit,
1090            offset,
1091            agent_id,
1092        ],
1093        row_to_memory,
1094    )?;
1095    rows.collect::<rusqlite::Result<Vec<_>>>()
1096        .map_err(Into::into)
1097}
1098
1099#[allow(clippy::too_many_arguments)]
1100pub fn search(
1101    conn: &Connection,
1102    query: &str,
1103    namespace: Option<&str>,
1104    tier: Option<&Tier>,
1105    limit: usize,
1106    min_priority: Option<i32>,
1107    since: Option<&str>,
1108    until: Option<&str>,
1109    tags_filter: Option<&str>,
1110    agent_id: Option<&str>,
1111    as_agent: Option<&str>,
1112) -> Result<Vec<Memory>> {
1113    let now = Utc::now().to_rfc3339();
1114    let tier_str = tier.map(|t| t.as_str().to_string());
1115    let fts_query = sanitize_fts_query(query, false);
1116    let (vis_p, vis_t, vis_u, vis_o) = compute_visibility_prefixes(as_agent);
1117
1118    let sql = format!(
1119        "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1120                m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1121                m.last_accessed_at, m.expires_at, m.metadata
1122         FROM memories_fts fts
1123         JOIN memories m ON m.rowid = fts.rowid
1124         WHERE memories_fts MATCH ?1
1125           AND (?2 IS NULL OR m.namespace = ?2)
1126           AND (?3 IS NULL OR m.tier = ?3)
1127           AND (?4 IS NULL OR m.priority >= ?4)
1128           AND (m.expires_at IS NULL OR m.expires_at > ?5)
1129           AND (?6 IS NULL OR m.created_at >= ?6)
1130           AND (?7 IS NULL OR m.created_at <= ?7)
1131           AND (?8 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?8))
1132           AND (?10 IS NULL OR m.agent_id_idx = ?10)
1133           {vis}
1134         ORDER BY (fts.rank * -1)
1135           + (m.priority * 0.5)
1136           + (MIN(m.access_count, 50) * 0.1)
1137           + (m.confidence * 2.0)
1138           + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
1139           DESC
1140         LIMIT ?9",
1141        vis = visibility_clause(11, "m"),
1142    );
1143    let mut stmt = conn.prepare(&sql)?;
1144    let rows = stmt.query_map(
1145        params![
1146            fts_query,
1147            namespace,
1148            tier_str,
1149            min_priority,
1150            now,
1151            since,
1152            until,
1153            tags_filter,
1154            limit,
1155            agent_id,
1156            vis_p,
1157            vis_t,
1158            vis_u,
1159            vis_o,
1160        ],
1161        row_to_memory,
1162    )?;
1163    rows.collect::<rusqlite::Result<Vec<_>>>()
1164        .map_err(Into::into)
1165}
1166
1167/// Task 1.12 — proximity boost applied to a memory's score based on its
1168/// depth distance from the queried agent namespace. Uses the formula
1169/// `1 / (1 + depth_distance * 0.3)` per spec. Distance 0 = full strength
1170/// (1.0), each step up the hierarchy dampens linearly.
1171#[must_use]
1172pub fn proximity_boost(agent_ns: &str, memory_ns: &str) -> f64 {
1173    let agent_depth = crate::models::namespace_depth(agent_ns);
1174    let memory_depth = crate::models::namespace_depth(memory_ns);
1175    let distance = agent_depth.saturating_sub(memory_depth);
1176    #[allow(clippy::cast_precision_loss)]
1177    let d = distance as f64;
1178    1.0 / (1.0 + d * 0.3)
1179}
1180
1181/// Task 1.12 — SQL fragment + boolean indicating whether hierarchy
1182/// expansion is in play. When active the `namespace` SQL param binds
1183/// NULL (so `?N IS NULL OR m.namespace = ?N` passes trivially) and a
1184/// separate `AND m.namespace IN (<ancestors>)` clause narrows to the
1185/// hierarchy. When inactive the returned fragment is empty.
1186///
1187/// Ancestor strings are interpolated because `SQLite` `IN` with a
1188/// variable-length positional list is awkward, and the inputs come
1189/// from `namespace_ancestors()` → `validate_namespace`-approved
1190/// strings. Single-quote doubling is applied defensively.
1191fn hierarchy_in_clause(namespace: Option<&str>) -> (Option<String>, bool) {
1192    let Some(ns) = namespace else {
1193        return (None, false);
1194    };
1195    if !ns.contains('/') {
1196        return (None, false);
1197    }
1198    let ancestors = crate::models::namespace_ancestors(ns);
1199    if ancestors.is_empty() {
1200        return (None, false);
1201    }
1202    let quoted: Vec<String> = ancestors
1203        .iter()
1204        .map(|a| format!("'{}'", a.replace('\'', "''")))
1205        .collect();
1206    (
1207        Some(format!("AND m.namespace IN ({})", quoted.join(","))),
1208        true,
1209    )
1210}
1211
1212/// Task 1.12 — apply proximity boost to scored memories ranked against
1213/// an agent's hierarchical namespace. Re-sorts by boosted score.
1214fn apply_proximity_boost(scored: Vec<(Memory, f64)>, agent_ns: &str) -> Vec<(Memory, f64)> {
1215    let mut boosted: Vec<(Memory, f64)> = scored
1216        .into_iter()
1217        .map(|(mem, score)| {
1218            let boost = proximity_boost(agent_ns, &mem.namespace);
1219            (mem, score * boost)
1220        })
1221        .collect();
1222    boosted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1223    boosted
1224}
1225
1226/// Task 1.11 — rough token estimate for a memory. Uses the "~4 chars per
1227/// token" heuristic on `title + content`. Deliberately byte-length-based:
1228/// fast, deterministic, and correct enough for budget gating.
1229#[must_use]
1230pub fn estimate_memory_tokens(mem: &Memory) -> usize {
1231    (mem.title.len() + mem.content.len()) / 4
1232}
1233
1234/// Task 1.11 — truncate a scored recall list to fit within an optional
1235/// token budget. Iterates in rank order; stops at the first memory whose
1236/// inclusion would exceed the budget. Returns `(truncated, tokens_used)`.
1237/// When `budget_tokens` is `None` the list is returned untouched, still
1238/// with an accurate `tokens_used` tally so callers can surface it in
1239/// response metadata.
1240#[must_use]
1241pub fn apply_token_budget(
1242    scored: Vec<(Memory, f64)>,
1243    budget_tokens: Option<usize>,
1244) -> (Vec<(Memory, f64)>, usize) {
1245    let mut used: usize = 0;
1246    let mut out = Vec::with_capacity(scored.len());
1247    for (mem, score) in scored {
1248        let cost = estimate_memory_tokens(&mem);
1249        if let Some(budget) = budget_tokens
1250            && used.saturating_add(cost) > budget
1251        {
1252            break;
1253        }
1254        used = used.saturating_add(cost);
1255        out.push((mem, score));
1256    }
1257    (out, used)
1258}
1259
1260/// Recall — fuzzy OR search + touch + auto-promote + TTL extension.
1261/// Task 1.11: after ranking, applies optional `budget_tokens` cap.
1262/// Returns `(truncated_list, tokens_used)`.
1263#[allow(clippy::too_many_arguments)]
1264pub fn recall(
1265    conn: &Connection,
1266    context: &str,
1267    namespace: Option<&str>,
1268    limit: usize,
1269    tags_filter: Option<&str>,
1270    since: Option<&str>,
1271    until: Option<&str>,
1272    short_extend: i64,
1273    mid_extend: i64,
1274    as_agent: Option<&str>,
1275    budget_tokens: Option<usize>,
1276) -> Result<(Vec<(Memory, f64)>, usize)> {
1277    let now = Utc::now().to_rfc3339();
1278    let fts_query = sanitize_fts_query(context, true);
1279    let (vis_p, vis_t, vis_u, vis_o) = compute_visibility_prefixes(as_agent);
1280
1281    // Task 1.12: hierarchy expansion. If `namespace` is hierarchical (contains
1282    // `/`), broaden the filter to the full ancestor chain. Flat namespaces
1283    // keep exact-match semantics (backward compat).
1284    let (hierarchy_in, hierarchy_active) = hierarchy_in_clause(namespace);
1285    let hierarchy_fragment = hierarchy_in.unwrap_or_default();
1286    let effective_namespace = if hierarchy_active { None } else { namespace };
1287
1288    let sql = format!(
1289        "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1290                m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1291                m.last_accessed_at, m.expires_at, m.metadata,
1292                (fts.rank * -1)
1293                + (m.priority * 0.5)
1294                + (MIN(m.access_count, 50) * 0.1)
1295                + (m.confidence * 2.0)
1296                + (CASE m.tier WHEN 'long' THEN 3.0 WHEN 'mid' THEN 1.0 ELSE 0.0 END)
1297                + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
1298                AS score
1299         FROM memories_fts fts
1300         JOIN memories m ON m.rowid = fts.rowid
1301         WHERE memories_fts MATCH ?1
1302           AND (?2 IS NULL OR m.namespace = ?2)
1303           {hierarchy_fragment}
1304           AND (m.expires_at IS NULL OR m.expires_at > ?3)
1305           AND (?4 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?4))
1306           AND (?5 IS NULL OR m.created_at >= ?5)
1307           AND (?6 IS NULL OR m.created_at <= ?6)
1308           {vis}
1309         ORDER BY score DESC
1310         LIMIT ?7",
1311        vis = visibility_clause(8, "m"),
1312    );
1313    let mut stmt = conn.prepare(&sql)?;
1314    let rows = stmt.query_map(
1315        params![
1316            fts_query,
1317            effective_namespace,
1318            now,
1319            tags_filter,
1320            since,
1321            until,
1322            limit,
1323            vis_p,
1324            vis_t,
1325            vis_u,
1326            vis_o
1327        ],
1328        |row| {
1329            let mem = row_to_memory(row)?;
1330            let score: f64 = row.get(15)?;
1331            Ok((mem, score))
1332        },
1333    )?;
1334    let results: Vec<(Memory, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
1335
1336    // Task 1.12: proximity boost when hierarchy expansion is active.
1337    let boosted = if let (true, Some(anchor)) = (hierarchy_active, namespace) {
1338        apply_proximity_boost(results, anchor)
1339    } else {
1340        results
1341    };
1342
1343    // Task 1.11: apply optional token budget in rank order (AFTER proximity).
1344    let (budgeted, tokens_used) = apply_token_budget(boosted, budget_tokens);
1345
1346    // Touch all recalled memories that SURVIVED the budget cut — no sense
1347    // bumping access counts on memories the caller will never see.
1348    for (mem, _) in &budgeted {
1349        if let Err(e) = touch(conn, &mem.id, short_extend, mid_extend) {
1350            tracing::warn!("touch failed for memory {}: {}", &mem.id, e);
1351        }
1352    }
1353    Ok((budgeted, tokens_used))
1354}
1355
1356/// Task 1.7 — vertical memory promotion.
1357///
1358/// Clones `source_id` into `to_namespace`, which must be a proper `/`-derived
1359/// ancestor of the memory's current namespace. The original memory is
1360/// **untouched** (vertical promotion is a fan-out, not a move). A
1361/// `derived_from` link is created from the new clone back to the source so
1362/// the promotion trail is queryable.
1363///
1364/// Returns the clone's new ID.
1365///
1366/// Errors when:
1367/// - source doesn't exist
1368/// - `to_namespace` is empty, equal to the source namespace, or not an
1369///   ancestor of it (see `namespace_ancestors`)
1370pub fn promote_to_namespace(
1371    conn: &Connection,
1372    source_id: &str,
1373    to_namespace: &str,
1374) -> Result<String> {
1375    if to_namespace.is_empty() {
1376        anyhow::bail!("to_namespace cannot be empty");
1377    }
1378    let source = get(conn, source_id)?
1379        .ok_or_else(|| anyhow::anyhow!("source memory not found: {source_id}"))?;
1380    if to_namespace == source.namespace {
1381        anyhow::bail!(
1382            "to_namespace must be a proper ancestor of the memory's namespace (got self: {})",
1383            source.namespace
1384        );
1385    }
1386    let ancestors = namespace_ancestors(&source.namespace);
1387    if !ancestors.iter().any(|a| a == to_namespace) {
1388        anyhow::bail!(
1389            "to_namespace '{to_namespace}' is not an ancestor of '{}' (ancestors: {ancestors:?})",
1390            source.namespace
1391        );
1392    }
1393
1394    let now = Utc::now().to_rfc3339();
1395    let clone = Memory {
1396        id: uuid::Uuid::new_v4().to_string(),
1397        tier: source.tier.clone(),
1398        namespace: to_namespace.to_string(),
1399        title: source.title.clone(),
1400        content: source.content.clone(),
1401        tags: source.tags.clone(),
1402        priority: source.priority,
1403        confidence: source.confidence,
1404        source: source.source.clone(),
1405        access_count: 0,
1406        created_at: now.clone(),
1407        updated_at: now,
1408        last_accessed_at: None,
1409        expires_at: source.expires_at.clone(),
1410        metadata: source.metadata.clone(),
1411    };
1412    let actual_id = insert(conn, &clone)?;
1413    // Clone → source: derived_from. Safe to ignore if the link layer
1414    // short-circuits on self-link (impossible here — distinct IDs).
1415    create_link(conn, &actual_id, source_id, "derived_from")?;
1416    Ok(actual_id)
1417}
1418
1419/// Detect potential contradictions: memories in same namespace with similar titles.
1420pub fn find_contradictions(conn: &Connection, title: &str, namespace: &str) -> Result<Vec<Memory>> {
1421    let fts_query = sanitize_fts_query(title, true);
1422    let mut stmt = conn.prepare(
1423        "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1424                m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1425                m.last_accessed_at, m.expires_at, m.metadata
1426         FROM memories_fts fts
1427         JOIN memories m ON m.rowid = fts.rowid
1428         WHERE memories_fts MATCH ?1 AND m.namespace = ?2
1429         ORDER BY fts.rank
1430         LIMIT 5",
1431    )?;
1432    let rows = stmt.query_map(params![fts_query, namespace], row_to_memory)?;
1433    rows.collect::<rusqlite::Result<Vec<_>>>()
1434        .map_err(Into::into)
1435}
1436
1437// --- Links ---
1438
1439pub fn create_link(
1440    conn: &Connection,
1441    source_id: &str,
1442    target_id: &str,
1443    relation: &str,
1444) -> Result<()> {
1445    // Verify both IDs exist before creating link
1446    let source_exists: bool = conn
1447        .query_row(
1448            "SELECT EXISTS(SELECT 1 FROM memories WHERE id = ?1)",
1449            params![source_id],
1450            |r| r.get(0),
1451        )
1452        .unwrap_or(false);
1453    if !source_exists {
1454        anyhow::bail!("source memory not found: {source_id}");
1455    }
1456    let target_exists: bool = conn
1457        .query_row(
1458            "SELECT EXISTS(SELECT 1 FROM memories WHERE id = ?1)",
1459            params![target_id],
1460            |r| r.get(0),
1461        )
1462        .unwrap_or(false);
1463    if !target_exists {
1464        anyhow::bail!("target memory not found: {target_id}");
1465    }
1466    // Schema v15 (Pillar 2 / Stream B) added `valid_from` for temporal
1467    // KG queries. Backfill on migration handled legacy rows; here we
1468    // populate it on the insert path so newly created links are
1469    // visible to `memory_kg_timeline` without a downstream backfill.
1470    let now = Utc::now().to_rfc3339();
1471    conn.execute(
1472        "INSERT OR IGNORE INTO memory_links (source_id, target_id, relation, created_at, valid_from) VALUES (?1, ?2, ?3, ?4, ?4)",
1473        params![source_id, target_id, relation, now],
1474    )?;
1475    Ok(())
1476}
1477
1478pub fn get_links(conn: &Connection, id: &str) -> Result<Vec<MemoryLink>> {
1479    let mut stmt = conn.prepare(
1480        "SELECT source_id, target_id, relation, created_at FROM memory_links
1481         WHERE source_id = ?1 OR target_id = ?1",
1482    )?;
1483    let rows = stmt.query_map(params![id], |row| {
1484        Ok(MemoryLink {
1485            source_id: row.get(0)?,
1486            target_id: row.get(1)?,
1487            relation: row.get(2)?,
1488            created_at: row.get(3)?,
1489        })
1490    })?;
1491    rows.collect::<rusqlite::Result<Vec<_>>>()
1492        .map_err(Into::into)
1493}
1494
1495#[allow(dead_code)]
1496pub fn delete_link(conn: &Connection, source_id: &str, target_id: &str) -> Result<bool> {
1497    let changed = conn.execute(
1498        "DELETE FROM memory_links WHERE source_id = ?1 AND target_id = ?2",
1499        params![source_id, target_id],
1500    )?;
1501    Ok(changed > 0)
1502}
1503
1504// --- Consolidation ---
1505
1506/// Consolidate multiple memories into one. Returns the new memory ID.
1507/// Deletes the source memories and creates links from new → old (`derived_from`).
1508#[allow(clippy::too_many_arguments)]
1509pub fn consolidate(
1510    conn: &Connection,
1511    ids: &[String],
1512    title: &str,
1513    summary: &str,
1514    namespace: &str,
1515    tier: &Tier,
1516    source: &str,
1517    consolidator_agent_id: &str,
1518) -> Result<String> {
1519    let now = Utc::now().to_rfc3339();
1520    let new_id = uuid::Uuid::new_v4().to_string();
1521
1522    conn.execute_batch("BEGIN IMMEDIATE")?;
1523
1524    let result = (|| -> Result<String> {
1525        // Verify all IDs exist and collect metadata in one pass
1526        let mut max_priority = 5i32;
1527        let mut all_tags: Vec<String> = Vec::new();
1528        let mut total_access = 0i64;
1529        let mut merged_metadata = serde_json::Map::new();
1530        // Collect original agent_ids separately — they go into
1531        // `consolidated_from_agents` for forensic attribution.
1532        // The consolidator's own agent_id becomes `agent_id` on the result.
1533        let mut source_agent_ids: Vec<String> = Vec::new();
1534        for id in ids {
1535            match get(conn, id)? {
1536                Some(mem) => {
1537                    max_priority = max_priority.max(mem.priority);
1538                    all_tags.extend(mem.tags);
1539                    total_access = total_access.saturating_add(mem.access_count);
1540                    // Merge metadata: later values overwrite earlier ones on key conflict.
1541                    // Intentionally SKIP `agent_id` to avoid last-write-wins forgery;
1542                    // the consolidator's id is authoritative on the result.
1543                    if let serde_json::Value::Object(map) = mem.metadata {
1544                        for (k, v) in map {
1545                            if k == "agent_id" {
1546                                if let serde_json::Value::String(aid) = &v
1547                                    && !source_agent_ids.contains(aid)
1548                                {
1549                                    source_agent_ids.push(aid.clone());
1550                                }
1551                                continue;
1552                            }
1553                            if let Some(existing) = merged_metadata.get(&k)
1554                                && std::mem::discriminant(existing) != std::mem::discriminant(&v)
1555                            {
1556                                tracing::warn!(
1557                                    "consolidate: key '{}' type changed during merge",
1558                                    k
1559                                );
1560                            }
1561                            merged_metadata.insert(k, v);
1562                        }
1563                    } else {
1564                        tracing::warn!(
1565                            "memory {} has non-object metadata during consolidate, skipping",
1566                            id
1567                        );
1568                    }
1569                }
1570                None => anyhow::bail!("memory not found: {id}"),
1571            }
1572        }
1573        all_tags.sort();
1574        all_tags.dedup();
1575        let tags_json = serde_json::to_string(&all_tags)?;
1576        // Record source IDs in metadata for provenance (links would be CASCADE-deleted)
1577        merged_metadata.insert(
1578            "derived_from".to_string(),
1579            serde_json::Value::Array(
1580                ids.iter()
1581                    .map(|id| serde_json::Value::String(id.clone()))
1582                    .collect(),
1583            ),
1584        );
1585        // NHI: the consolidator owns the new memory (authoritative agent_id);
1586        // original authors are preserved as a separate array for forensics.
1587        merged_metadata.insert(
1588            "agent_id".to_string(),
1589            serde_json::Value::String(consolidator_agent_id.to_string()),
1590        );
1591        if !source_agent_ids.is_empty() {
1592            merged_metadata.insert(
1593                "consolidated_from_agents".to_string(),
1594                serde_json::Value::Array(
1595                    source_agent_ids
1596                        .into_iter()
1597                        .map(serde_json::Value::String)
1598                        .collect(),
1599                ),
1600            );
1601        }
1602        let merged_metadata_value = serde_json::Value::Object(merged_metadata);
1603        crate::validate::validate_metadata(&merged_metadata_value)
1604            .context("merged metadata exceeds size limit")?;
1605        let metadata_json = serde_json::to_string(&merged_metadata_value)?;
1606
1607        conn.execute(
1608            "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, metadata)
1609             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1.0, ?8, ?9, ?10, ?10, ?11)",
1610            params![new_id, tier.as_str(), namespace, title, summary, tags_json, max_priority, source, total_access, now, metadata_json],
1611        )?;
1612
1613        // Delete source memories first. Note: we intentionally do NOT create
1614        // derived_from links before deletion because ON DELETE CASCADE would
1615        // immediately remove them. Instead, source IDs are recorded in the
1616        // consolidated memory's metadata for provenance.
1617        for id in ids {
1618            delete(conn, id)?;
1619        }
1620
1621        Ok(new_id.clone())
1622    })();
1623
1624    match result {
1625        Ok(id) => {
1626            conn.execute_batch("COMMIT")?;
1627            Ok(id)
1628        }
1629        Err(e) => {
1630            if let Err(rb) = conn.execute_batch("ROLLBACK") {
1631                tracing::error!("ROLLBACK failed in consolidate: {}", rb);
1632            }
1633            Err(e)
1634        }
1635    }
1636}
1637
1638/// Strip zero-width and invisible Unicode characters that could bypass FTS search.
1639fn strip_invisible(s: &str) -> String {
1640    s.chars()
1641        .filter(|c| {
1642            !matches!(c,
1643                '\u{200B}' | '\u{200C}' | '\u{200D}' | '\u{FEFF}' |
1644                '\u{00AD}' | '\u{034F}' | '\u{061C}' |
1645                '\u{180E}' | '\u{2060}' | '\u{2061}'..='\u{2064}' |
1646                '\u{FE00}'..='\u{FE0F}' | '\u{200E}' | '\u{200F}' |
1647                '\u{202A}'..='\u{202E}' | '\u{2066}'..='\u{2069}'
1648            )
1649        })
1650        .collect()
1651}
1652
1653fn sanitize_fts_query(input: &str, use_or: bool) -> String {
1654    let joiner = if use_or { " OR " } else { " " };
1655    let cleaned = strip_invisible(input);
1656    let tokens: Vec<String> = cleaned
1657        .split_whitespace()
1658        .filter(|t| !t.is_empty())
1659        .filter(|t| {
1660            // Filter out FTS5 boolean operators as standalone tokens
1661            let upper = t.to_uppercase();
1662            upper != "AND" && upper != "OR" && upper != "NOT" && upper != "NEAR"
1663        })
1664        .map(|token| {
1665            // Strip FTS5 special characters to prevent injection.
1666            // Hyphens are allowed inside words (e.g. "well-known"): the
1667            // unicode61 tokenizer treats `-` as a separator when indexing,
1668            // so `foo-bar` indexes as `foo` + `bar`. Keeping the hyphen in
1669            // the per-token phrase (below we wrap each token in `"…"`)
1670            // produces a phrase query that FTS5 evaluates by matching the
1671            // hyphen-split component terms in order — which is exactly
1672            // what callers expect when searching for hyphenated content.
1673            // Dropping the `'-'` filter here fixes scenario S28 without
1674            // reopening the `+`/`-` exclusion-injection hole (every token
1675            // is already phrase-quoted before being joined, so `-` cannot
1676            // reach FTS5 as a prefix operator).
1677            let clean: String = token
1678                .chars()
1679                .filter(|c| {
1680                    *c != '"'
1681                        && *c != '*'
1682                        && *c != '^'
1683                        && *c != '{'
1684                        && *c != '}'
1685                        && *c != '('
1686                        && *c != ')'
1687                        && *c != ':'
1688                        && *c != '|'
1689                        && *c != '+'
1690                })
1691                .collect();
1692            if clean.is_empty() {
1693                return String::new();
1694            }
1695            format!("\"{clean}\"")
1696        })
1697        .filter(|t| !t.is_empty())
1698        .collect();
1699    if tokens.is_empty() {
1700        return "\"_empty_\"".to_string();
1701    }
1702    tokens.join(joiner)
1703}
1704
1705pub fn list_namespaces(conn: &Connection) -> Result<Vec<NamespaceCount>> {
1706    let now = Utc::now().to_rfc3339();
1707    let mut stmt = conn.prepare(
1708        "SELECT namespace, COUNT(*) FROM memories WHERE expires_at IS NULL OR expires_at > ?1 GROUP BY namespace ORDER BY COUNT(*) DESC",
1709    )?;
1710    let rows = stmt.query_map(params![now], |row| {
1711        Ok(NamespaceCount {
1712            namespace: row.get(0)?,
1713            count: row.get(1)?,
1714        })
1715    })?;
1716    rows.collect::<rusqlite::Result<Vec<_>>>()
1717        .map_err(Into::into)
1718}
1719
1720/// Hard cap on input groups walked when assembling a taxonomy tree.
1721/// Even when callers pass a wildly large `limit`, we never walk more
1722/// than this many `(namespace, count)` rows — bounds memory + time.
1723const TAXONOMY_MAX_LIMIT: usize = 10_000;
1724
1725/// Build a hierarchical namespace taxonomy (Pillar 1 / Stream A).
1726///
1727/// Groups live (non-expired) memories by `namespace`, splits each on
1728/// `/`, and folds them into a `TaxonomyNode` tree. The returned root
1729/// represents `namespace_prefix` (or the synthetic empty-string root if
1730/// no prefix is supplied); each child level descends one segment.
1731///
1732/// `max_depth` is interpreted as "show at most N levels *below the
1733/// prefix*". Memories whose namespace would have required descending
1734/// past the cutoff still contribute to the `subtree_count` of the
1735/// boundary ancestor (their counts are not lost — only the leaf
1736/// rendering is suppressed).
1737///
1738/// `limit` caps the number of input `(namespace, count)` rows we walk
1739/// — when truncated, `total_count` still reflects the full prefix
1740/// total (a separate aggregation), and `truncated` is set so callers
1741/// can warn the user. Hard ceiling: [`TAXONOMY_MAX_LIMIT`].
1742// Body is intentionally one logical pipeline (SQL aggregation → tree
1743// assembly → root materialisation); pulling helpers out hurts
1744// readability more than it helps.
1745#[allow(clippy::too_many_lines)]
1746pub fn get_taxonomy(
1747    conn: &Connection,
1748    namespace_prefix: Option<&str>,
1749    max_depth: usize,
1750    limit: usize,
1751) -> Result<Taxonomy> {
1752    let now = Utc::now().to_rfc3339();
1753    let effective_limit = limit.min(TAXONOMY_MAX_LIMIT);
1754    // Clamp depth so callers asking for "everything" can't construct a
1755    // pathological deep walk; the namespace validator already rejects
1756    // depths > MAX_NAMESPACE_DEPTH on writes.
1757    let effective_depth = max_depth.min(MAX_NAMESPACE_DEPTH);
1758
1759    let prefix = namespace_prefix.unwrap_or("");
1760
1761    // Total count for the prefix is computed independently of the
1762    // truncated row walk so the caller-visible total stays honest even
1763    // when `limit` drops rows from the tree.
1764    let total_count: usize = if prefix.is_empty() {
1765        let v: i64 = conn.query_row(
1766            "SELECT COUNT(*) FROM memories WHERE expires_at IS NULL OR expires_at > ?1",
1767            params![now],
1768            |row| row.get(0),
1769        )?;
1770        usize::try_from(v).unwrap_or(0)
1771    } else {
1772        let v: i64 = conn.query_row(
1773            "SELECT COUNT(*) FROM memories
1774             WHERE (expires_at IS NULL OR expires_at > ?1)
1775               AND (namespace = ?2 OR namespace LIKE ?2 || '/%')",
1776            params![now, prefix],
1777            |row| row.get(0),
1778        )?;
1779        usize::try_from(v).unwrap_or(0)
1780    };
1781
1782    // Group rows ordered by count DESC so a small `limit` keeps the
1783    // densest namespaces, then alphabetic for stable tie-breaking.
1784    let groups: Vec<(String, usize)> = if prefix.is_empty() {
1785        let mut stmt = conn.prepare(
1786            "SELECT namespace, COUNT(*) FROM memories
1787             WHERE expires_at IS NULL OR expires_at > ?1
1788             GROUP BY namespace
1789             ORDER BY COUNT(*) DESC, namespace ASC
1790             LIMIT ?2",
1791        )?;
1792        let rows = stmt.query_map(
1793            params![now, i64::try_from(effective_limit).unwrap_or(i64::MAX)],
1794            |row| {
1795                let ns: String = row.get(0)?;
1796                let c: i64 = row.get(1)?;
1797                Ok((ns, usize::try_from(c).unwrap_or(0)))
1798            },
1799        )?;
1800        rows.collect::<rusqlite::Result<Vec<_>>>()?
1801    } else {
1802        let mut stmt = conn.prepare(
1803            "SELECT namespace, COUNT(*) FROM memories
1804             WHERE (expires_at IS NULL OR expires_at > ?1)
1805               AND (namespace = ?2 OR namespace LIKE ?2 || '/%')
1806             GROUP BY namespace
1807             ORDER BY COUNT(*) DESC, namespace ASC
1808             LIMIT ?3",
1809        )?;
1810        let rows = stmt.query_map(
1811            params![
1812                now,
1813                prefix,
1814                i64::try_from(effective_limit).unwrap_or(i64::MAX)
1815            ],
1816            |row| {
1817                let ns: String = row.get(0)?;
1818                let c: i64 = row.get(1)?;
1819                Ok((ns, usize::try_from(c).unwrap_or(0)))
1820            },
1821        )?;
1822        rows.collect::<rusqlite::Result<Vec<_>>>()?
1823    };
1824
1825    let walked_count: usize = groups.iter().map(|(_, c)| *c).sum();
1826    let truncated = walked_count < total_count;
1827
1828    // Synthesize the root node. `name` is the trailing segment of the
1829    // prefix (or empty for the global root) so renderers can label it.
1830    let root_name = prefix.rsplit('/').next().unwrap_or("").to_string();
1831    let mut root = TaxonomyNode {
1832        namespace: prefix.to_string(),
1833        name: root_name,
1834        count: 0,
1835        subtree_count: 0,
1836        children: Vec::new(),
1837    };
1838
1839    for (ns, c) in groups {
1840        // Compute path segments below the prefix. When prefix is empty,
1841        // the whole namespace becomes the suffix; when ns == prefix
1842        // exactly, segments is empty and the count lands on the root.
1843        let suffix: &str = if prefix.is_empty() {
1844            ns.as_str()
1845        } else if ns == prefix {
1846            ""
1847        } else if ns.len() > prefix.len() + 1
1848            && ns.starts_with(prefix)
1849            && ns.as_bytes()[prefix.len()] == b'/'
1850        {
1851            &ns[prefix.len() + 1..]
1852        } else {
1853            // Defensive: SQL filter shouldn't return this, but skip rather
1854            // than panic if it ever does (e.g. a stray match like
1855            // "alphaone-sibling" matching prefix "alphaone").
1856            continue;
1857        };
1858        let all_segments: Vec<&str> = if suffix.is_empty() {
1859            Vec::new()
1860        } else {
1861            suffix.split('/').collect()
1862        };
1863        let take = all_segments.len().min(effective_depth);
1864        let used = &all_segments[..take];
1865        let exact_match_in_view = take == all_segments.len();
1866
1867        // Walk into the tree. Every ancestor's subtree_count grows by c
1868        // — including the root — and only the deepest visible node's
1869        // `count` does, and only when it represents the exact namespace
1870        // (not a clamped boundary).
1871        root.subtree_count += c;
1872        if used.is_empty() {
1873            root.count += c;
1874            continue;
1875        }
1876
1877        let mut path_so_far = prefix.to_string();
1878        let mut node = &mut root;
1879        for (i, seg) in used.iter().enumerate() {
1880            if !path_so_far.is_empty() {
1881                path_so_far.push('/');
1882            }
1883            path_so_far.push_str(seg);
1884            let pos = node.children.iter().position(|ch| ch.name == *seg);
1885            let idx = if let Some(p) = pos {
1886                p
1887            } else {
1888                node.children.push(TaxonomyNode {
1889                    namespace: path_so_far.clone(),
1890                    name: (*seg).to_string(),
1891                    count: 0,
1892                    subtree_count: 0,
1893                    children: Vec::new(),
1894                });
1895                node.children.len() - 1
1896            };
1897            node = &mut node.children[idx];
1898            node.subtree_count += c;
1899            let is_leaf = i + 1 == used.len();
1900            if is_leaf && exact_match_in_view {
1901                node.count += c;
1902            }
1903        }
1904    }
1905
1906    sort_taxonomy(&mut root);
1907
1908    Ok(Taxonomy {
1909        tree: root,
1910        total_count,
1911        truncated,
1912    })
1913}
1914
1915fn sort_taxonomy(node: &mut TaxonomyNode) {
1916    node.children.sort_by(|a, b| a.name.cmp(&b.name));
1917    for child in &mut node.children {
1918        sort_taxonomy(child);
1919    }
1920}
1921
1922/// Hard floor for duplicate-check threshold. Below this, anything can match
1923/// random unrelated content — refuse to honor the lookup so callers don't
1924/// silently get garbage merge suggestions.
1925pub const DUPLICATE_THRESHOLD_MIN: f32 = 0.5;
1926
1927/// Default cosine similarity threshold for declaring a candidate a
1928/// duplicate. Empirically tuned for MiniLM-L6-v2 (the local embedder):
1929/// near-paraphrases of the same memory tend to land at 0.88+, while
1930/// loosely related content sits well below 0.85. Callers can override.
1931pub const DUPLICATE_THRESHOLD_DEFAULT: f32 = 0.85;
1932
1933/// Find the nearest-neighbor live memory by cosine similarity (Pillar 2 /
1934/// Stream D — `memory_check_duplicate`).
1935///
1936/// Linear scan over `memories.embedding` rows that pass the live-row
1937/// (non-expired) gate and the optional namespace filter. The chosen
1938/// candidate is the highest-cosine match across the pool; the
1939/// caller-supplied `threshold` is used purely to set `is_duplicate` on
1940/// the response — the nearest neighbor is always returned (when the
1941/// pool is non-empty) so callers can show "closest existing memory was
1942/// X at similarity Y" even on a not-quite-duplicate.
1943///
1944/// Threshold is clamped at [`DUPLICATE_THRESHOLD_MIN`] so that wildly
1945/// permissive thresholds can't be used to dress unrelated content as a
1946/// merge suggestion.
1947///
1948/// Returns `(check, scanned)` where `scanned` is the count of embedded
1949/// candidates compared (useful for diagnostics).
1950pub fn check_duplicate(
1951    conn: &Connection,
1952    query_embedding: &[f32],
1953    namespace: Option<&str>,
1954    threshold: f32,
1955) -> Result<DuplicateCheck> {
1956    let effective_threshold = threshold.max(DUPLICATE_THRESHOLD_MIN);
1957    let now = Utc::now().to_rfc3339();
1958
1959    // SQL filter handles the live-row + optional namespace gate; the
1960    // cosine pass happens in Rust because SQLite has no native vector
1961    // op. We only pull rows with non-NULL embeddings — anything missing
1962    // an embedding can't be a near-duplicate by this definition.
1963    let rows: Vec<(String, String, String, Vec<u8>)> = if let Some(ns) = namespace {
1964        let mut stmt = conn.prepare(
1965            "SELECT id, title, namespace, embedding FROM memories
1966             WHERE embedding IS NOT NULL
1967               AND (expires_at IS NULL OR expires_at > ?1)
1968               AND namespace = ?2",
1969        )?;
1970        let mapped = stmt.query_map(params![now, ns], |row| {
1971            Ok((
1972                row.get::<_, String>(0)?,
1973                row.get::<_, String>(1)?,
1974                row.get::<_, String>(2)?,
1975                row.get::<_, Vec<u8>>(3)?,
1976            ))
1977        })?;
1978        mapped.collect::<rusqlite::Result<Vec<_>>>()?
1979    } else {
1980        let mut stmt = conn.prepare(
1981            "SELECT id, title, namespace, embedding FROM memories
1982             WHERE embedding IS NOT NULL
1983               AND (expires_at IS NULL OR expires_at > ?1)",
1984        )?;
1985        let mapped = stmt.query_map(params![now], |row| {
1986            Ok((
1987                row.get::<_, String>(0)?,
1988                row.get::<_, String>(1)?,
1989                row.get::<_, String>(2)?,
1990                row.get::<_, Vec<u8>>(3)?,
1991            ))
1992        })?;
1993        mapped.collect::<rusqlite::Result<Vec<_>>>()?
1994    };
1995
1996    let mut best: Option<DuplicateMatch> = None;
1997    let mut scanned: usize = 0;
1998    for (id, title, ns, bytes) in rows {
1999        if bytes.is_empty() {
2000            continue;
2001        }
2002        // Skip blobs whose length is not a multiple of 4 (corrupted /
2003        // truncated embedding column). chunks_exact silently drops a
2004        // trailing partial chunk; we explicitly bail on the row so a
2005        // bad blob doesn't compute against a shorter candidate vector
2006        // (which would produce a wrong cosine score).
2007        if !bytes.len().is_multiple_of(4) {
2008            tracing::warn!(
2009                memory_id = %id,
2010                blob_len = bytes.len(),
2011                "skipping duplicate-check candidate with malformed embedding length"
2012            );
2013            continue;
2014        }
2015        let candidate: Vec<f32> = bytes
2016            .chunks_exact(4)
2017            .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
2018            .collect();
2019        // Vectors of mismatched dimension would compute against a
2020        // truncated query (Embedder::cosine_similarity zips). Skip
2021        // rather than report a misleading similarity score.
2022        if candidate.len() != query_embedding.len() {
2023            tracing::warn!(
2024                memory_id = %id,
2025                expected = query_embedding.len(),
2026                got = candidate.len(),
2027                "skipping duplicate-check candidate with dimension mismatch"
2028            );
2029            continue;
2030        }
2031        let similarity =
2032            crate::embeddings::Embedder::cosine_similarity(query_embedding, &candidate);
2033        scanned += 1;
2034        let is_better = best.as_ref().is_none_or(|m| similarity > m.similarity);
2035        if is_better {
2036            best = Some(DuplicateMatch {
2037                id,
2038                title,
2039                namespace: ns,
2040                similarity,
2041            });
2042        }
2043    }
2044
2045    let is_duplicate = best
2046        .as_ref()
2047        .is_some_and(|m| m.similarity >= effective_threshold);
2048    Ok(DuplicateCheck {
2049        is_duplicate,
2050        threshold: effective_threshold,
2051        nearest: best,
2052        candidates_scanned: scanned,
2053    })
2054}
2055
2056/// Register an entity (canonical name + aliases) under a namespace
2057/// (Pillar 2 / Stream B).
2058///
2059/// An entity is stored as a long-tier memory:
2060/// - `title = canonical_name`
2061/// - `namespace = namespace`
2062/// - `tags` includes [`ENTITY_TAG`]
2063/// - `metadata.kind = "entity"` (so the resolver can never confuse an
2064///   entity with a regular memory that happens to share a title)
2065///
2066/// Aliases live in the `entity_aliases` side table keyed by
2067/// `(entity_id, alias)`.
2068///
2069/// **Idempotency:** if an entity with this `(canonical_name, namespace)`
2070/// already exists, its ID is reused and `aliases` are merged with
2071/// `INSERT OR IGNORE`. The returned [`EntityRegistration::created`] is
2072/// `false` in that case.
2073///
2074/// **Collision detection:** if a non-entity memory already occupies
2075/// `(title=canonical_name, namespace=namespace)`, the call errors
2076/// rather than silently upgrading it (the upsert path on `insert`
2077/// would otherwise overwrite the existing row's content/tags). Callers
2078/// must rename the entity or its colliding memory.
2079///
2080/// `extra_metadata` is merged into the entity memory's metadata; any
2081/// caller-supplied `kind` field is overwritten with `"entity"` and
2082/// `agent_id` is stamped from the caller (NHI provenance) when
2083/// `extra_metadata` does not already specify one.
2084pub fn entity_register(
2085    conn: &Connection,
2086    canonical_name: &str,
2087    namespace: &str,
2088    aliases: &[String],
2089    extra_metadata: &serde_json::Value,
2090    agent_id: Option<&str>,
2091) -> Result<crate::models::EntityRegistration> {
2092    use crate::models::{ENTITY_KIND, ENTITY_TAG, EntityRegistration};
2093
2094    // Look up an existing entity in this namespace by canonical_name +
2095    // metadata.kind. If a non-entity memory occupies the same
2096    // (title, namespace), surface a hard error instead of upserting.
2097    let existing_id: Option<String> = match conn.query_row(
2098        "SELECT id FROM memories
2099         WHERE namespace = ?1 AND title = ?2
2100           AND COALESCE(json_extract(metadata, '$.kind'), '') = ?3",
2101        params![namespace, canonical_name, ENTITY_KIND],
2102        |r| r.get::<_, String>(0),
2103    ) {
2104        Ok(id) => Some(id),
2105        Err(rusqlite::Error::QueryReturnedNoRows) => None,
2106        Err(e) => return Err(e.into()),
2107    };
2108
2109    let (entity_id, created) = if let Some(id) = existing_id {
2110        (id, false)
2111    } else {
2112        let collision: Option<String> = match conn.query_row(
2113            "SELECT id FROM memories
2114             WHERE namespace = ?1 AND title = ?2
2115               AND COALESCE(json_extract(metadata, '$.kind'), '') != ?3",
2116            params![namespace, canonical_name, ENTITY_KIND],
2117            |r| r.get::<_, String>(0),
2118        ) {
2119            Ok(id) => Some(id),
2120            Err(rusqlite::Error::QueryReturnedNoRows) => None,
2121            Err(e) => return Err(e.into()),
2122        };
2123        if collision.is_some() {
2124            anyhow::bail!(
2125                "entity_register: title '{canonical_name}' in namespace '{namespace}' is already used by a non-entity memory"
2126            );
2127        }
2128
2129        // Build metadata: caller-supplied object merged, kind forced
2130        // to "entity", agent_id preserved from caller when not set.
2131        let mut meta_map = match extra_metadata {
2132            serde_json::Value::Object(m) => m.clone(),
2133            _ => serde_json::Map::new(),
2134        };
2135        meta_map.insert(
2136            "kind".to_string(),
2137            serde_json::Value::String(ENTITY_KIND.to_string()),
2138        );
2139        if let Some(a) = agent_id {
2140            meta_map
2141                .entry("agent_id".to_string())
2142                .or_insert(serde_json::Value::String(a.to_string()));
2143        }
2144        let metadata = serde_json::Value::Object(meta_map);
2145
2146        let now = Utc::now().to_rfc3339();
2147        let mem = Memory {
2148            id: uuid::Uuid::new_v4().to_string(),
2149            tier: Tier::Long,
2150            namespace: namespace.to_string(),
2151            title: canonical_name.to_string(),
2152            content: canonical_name.to_string(),
2153            tags: vec![ENTITY_TAG.to_string()],
2154            priority: 7,
2155            confidence: 1.0,
2156            source: "api".to_string(),
2157            access_count: 0,
2158            created_at: now.clone(),
2159            updated_at: now,
2160            last_accessed_at: None,
2161            expires_at: None,
2162            metadata,
2163        };
2164        let id = insert(conn, &mem).context("insert entity memory")?;
2165        (id, true)
2166    };
2167
2168    let now = Utc::now().to_rfc3339();
2169    {
2170        let mut stmt = conn.prepare(
2171            "INSERT OR IGNORE INTO entity_aliases (entity_id, alias, created_at)
2172             VALUES (?1, ?2, ?3)",
2173        )?;
2174        for alias in aliases {
2175            let trimmed = alias.trim();
2176            if trimmed.is_empty() {
2177                continue;
2178            }
2179            stmt.execute(params![entity_id, trimmed, now])?;
2180        }
2181    }
2182
2183    let aliases_out = list_entity_aliases(conn, &entity_id)?;
2184
2185    Ok(EntityRegistration {
2186        entity_id,
2187        canonical_name: canonical_name.to_string(),
2188        namespace: namespace.to_string(),
2189        aliases: aliases_out,
2190        created,
2191    })
2192}
2193
2194/// Resolve an alias to its registered entity (Pillar 2 / Stream B).
2195///
2196/// When `namespace` is `Some`, only entities in that namespace are
2197/// considered. When `None`, all namespaces are searched and the
2198/// most-recently-created matching entity wins (deterministic
2199/// disambiguation when the same alias was registered in multiple
2200/// namespaces).
2201///
2202/// Returns `Ok(None)` if no entity claims this alias under the given
2203/// filter. Returns the full alias set for the resolved entity.
2204pub fn entity_get_by_alias(
2205    conn: &Connection,
2206    alias: &str,
2207    namespace: Option<&str>,
2208) -> Result<Option<crate::models::EntityRecord>> {
2209    use crate::models::{ENTITY_KIND, EntityRecord};
2210
2211    let trimmed = alias.trim();
2212    if trimmed.is_empty() {
2213        return Ok(None);
2214    }
2215
2216    let row: std::result::Result<(String, String, String), rusqlite::Error> =
2217        if let Some(ns) = namespace {
2218            conn.query_row(
2219                "SELECT m.id, m.title, m.namespace
2220                 FROM entity_aliases ea
2221                 JOIN memories m ON m.id = ea.entity_id
2222                 WHERE ea.alias = ?1
2223                   AND m.namespace = ?2
2224                   AND COALESCE(json_extract(m.metadata, '$.kind'), '') = ?3
2225                 ORDER BY m.created_at DESC
2226                 LIMIT 1",
2227                params![trimmed, ns, ENTITY_KIND],
2228                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2229            )
2230        } else {
2231            conn.query_row(
2232                "SELECT m.id, m.title, m.namespace
2233                 FROM entity_aliases ea
2234                 JOIN memories m ON m.id = ea.entity_id
2235                 WHERE ea.alias = ?1
2236                   AND COALESCE(json_extract(m.metadata, '$.kind'), '') = ?2
2237                 ORDER BY m.created_at DESC
2238                 LIMIT 1",
2239                params![trimmed, ENTITY_KIND],
2240                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2241            )
2242        };
2243
2244    let (entity_id, canonical_name, ns) = match row {
2245        Ok(t) => t,
2246        Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
2247        Err(e) => return Err(e.into()),
2248    };
2249
2250    let aliases = list_entity_aliases(conn, &entity_id)?;
2251    Ok(Some(EntityRecord {
2252        entity_id,
2253        canonical_name,
2254        namespace: ns,
2255        aliases,
2256    }))
2257}
2258
2259/// Default cap on rows returned by `kg_timeline` when the caller does
2260/// not specify one (Pillar 2 / Stream C). Sized to fit a reasonable
2261/// agent context window without paging — callers needing more should
2262/// pass an explicit limit.
2263pub const KG_TIMELINE_DEFAULT_LIMIT: usize = 200;
2264
2265/// Hard ceiling on `kg_timeline` rows. Matches the existing list/recall
2266/// caps to keep the timeline bounded against pathological entities.
2267pub const KG_TIMELINE_MAX_LIMIT: usize = 1000;
2268
2269/// Ordered fact timeline for an entity (Pillar 2 / Stream C —
2270/// `memory_kg_timeline`). Returns outbound assertions from
2271/// `source_id`, ordered by `valid_from ASC` and tie-broken by
2272/// `created_at ASC` for deterministic display.
2273///
2274/// Filters:
2275/// - `since` (RFC3339, inclusive): drop events with `valid_from < since`
2276/// - `until` (RFC3339, inclusive): drop events with `valid_from > until`
2277/// - `limit`: row cap, clamped to [1, [`KG_TIMELINE_MAX_LIMIT`]]
2278///
2279/// Rows with NULL `valid_from` are excluded — a link without a
2280/// valid-from anchor cannot be ordered on the timeline. The schema-v15
2281/// migration backfilled legacy rows to `created_at`, and the `link()`
2282/// path stamps the column on every new insert, so this is a hard
2283/// guarantee for current code; the explicit `IS NOT NULL` guard exists
2284/// to keep external writes (`store/sqlite.rs`, custom migrations) from
2285/// silently producing invisible links.
2286///
2287/// Cross-namespace by design: timelines often span the same canonical
2288/// entity asserted by agents in different namespaces. Callers can
2289/// post-filter by `target_namespace` if they need a namespace-scoped
2290/// view.
2291///
2292/// v0.7 AGE acceleration onramp (charter §"Stream C" bullet 4). When
2293/// the v0.7 SAL ships with Apache AGE, the equivalent property-graph
2294/// query is:
2295///
2296/// ```cypher
2297/// MATCH (s {id: $source_id})-[r {valid_from IS NOT NULL,
2298///        valid_from >= $since, valid_from <= $until}]->(t)
2299/// WHERE t.id <> s.id  // exclude self-loops
2300/// RETURN t.id, r.relation, r.valid_from, r.valid_until, r.observed_by
2301/// ORDER BY r.valid_from ASC, r.created_at ASC
2302/// LIMIT $limit
2303/// ```
2304///
2305/// Stub left here per charter intent so the v0.7 migration has a 1:1
2306/// reference query.
2307pub fn kg_timeline(
2308    conn: &Connection,
2309    source_id: &str,
2310    since: Option<&str>,
2311    until: Option<&str>,
2312    limit: Option<usize>,
2313) -> Result<Vec<crate::models::KgTimelineEvent>> {
2314    use crate::models::KgTimelineEvent;
2315
2316    let cap = limit
2317        .unwrap_or(KG_TIMELINE_DEFAULT_LIMIT)
2318        .clamp(1, KG_TIMELINE_MAX_LIMIT);
2319
2320    // Compose the predicate dynamically for `since` / `until`. Bind
2321    // values are appended in the same order so the placeholders line up.
2322    let mut sql = String::from(
2323        "SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until,
2324                ml.observed_by, m.title, m.namespace, ml.created_at
2325         FROM memory_links ml
2326         JOIN memories m ON m.id = ml.target_id
2327         WHERE ml.source_id = ?1
2328           AND ml.valid_from IS NOT NULL",
2329    );
2330    let mut binds: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(source_id.to_string())];
2331    if let Some(s) = since {
2332        sql.push_str(" AND ml.valid_from >= ?");
2333        sql.push_str(&(binds.len() + 1).to_string());
2334        binds.push(Box::new(s.to_string()));
2335    }
2336    if let Some(u) = until {
2337        sql.push_str(" AND ml.valid_from <= ?");
2338        sql.push_str(&(binds.len() + 1).to_string());
2339        binds.push(Box::new(u.to_string()));
2340    }
2341    sql.push_str(" ORDER BY ml.valid_from ASC, ml.created_at ASC LIMIT ?");
2342    sql.push_str(&(binds.len() + 1).to_string());
2343    binds.push(Box::new(i64::try_from(cap).unwrap_or(i64::MAX)));
2344
2345    let mut stmt = conn.prepare(&sql)?;
2346    let bind_refs: Vec<&dyn rusqlite::ToSql> = binds.iter().map(AsRef::as_ref).collect();
2347    let rows = stmt.query_map(rusqlite::params_from_iter(bind_refs), |row| {
2348        Ok(KgTimelineEvent {
2349            target_id: row.get(0)?,
2350            relation: row.get(1)?,
2351            valid_from: row.get(2)?,
2352            valid_until: row.get(3)?,
2353            observed_by: row.get(4)?,
2354            title: row.get(5)?,
2355            target_namespace: row.get(6)?,
2356        })
2357    })?;
2358    rows.collect::<rusqlite::Result<Vec<_>>>()
2359        .map_err(Into::into)
2360}
2361
2362/// Outcome of [`invalidate_link`] (Pillar 2 / Stream C —
2363/// `memory_kg_invalidate`). `valid_until` is the timestamp now stored on
2364/// the link; `previous_valid_until` is the prior value, or `None` if
2365/// this was the first invalidation. Callers can use the prior value to
2366/// distinguish a fresh supersession from an idempotent retry.
2367#[derive(Debug, Clone, PartialEq, Eq)]
2368pub struct InvalidateResult {
2369    pub valid_until: String,
2370    pub previous_valid_until: Option<String>,
2371}
2372
2373/// Mark a KG link as superseded by setting its `valid_until` column
2374/// (Pillar 2 / Stream C — `memory_kg_invalidate`). Returns `Ok(None)`
2375/// when the `(source_id, target_id, relation)` triple does not match an
2376/// existing link. The supplied `valid_until` defaults to the current
2377/// wall-clock time in RFC3339 form when omitted; callers needing
2378/// historical or future supersession can pass an explicit value.
2379///
2380/// Idempotent: calling repeatedly overwrites the prior `valid_until`
2381/// (the prior value is returned in `previous_valid_until` so callers
2382/// can detect the overwrite). The schema does not yet carry an audit
2383/// column for the supersession reason; that arrives with v0.7
2384/// attestation. Until then, callers should record the rationale in
2385/// their own logs or a paired memory.
2386pub fn invalidate_link(
2387    conn: &Connection,
2388    source_id: &str,
2389    target_id: &str,
2390    relation: &str,
2391    valid_until: Option<&str>,
2392) -> Result<Option<InvalidateResult>> {
2393    let stamp = valid_until.map_or_else(|| Utc::now().to_rfc3339(), str::to_string);
2394
2395    let prior = match conn.query_row(
2396        "SELECT valid_until FROM memory_links \
2397         WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
2398        params![source_id, target_id, relation],
2399        |r| r.get::<_, Option<String>>(0),
2400    ) {
2401        Ok(v) => v,
2402        Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
2403        Err(e) => return Err(e.into()),
2404    };
2405
2406    conn.execute(
2407        "UPDATE memory_links SET valid_until = ?4 \
2408         WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
2409        params![source_id, target_id, relation, &stamp],
2410    )?;
2411
2412    Ok(Some(InvalidateResult {
2413        valid_until: stamp,
2414        previous_valid_until: prior,
2415    }))
2416}
2417
2418/// Default cap on rows returned by `kg_query` when the caller does not
2419/// specify one (Pillar 2 / Stream C). Mirrors `kg_timeline`'s default so
2420/// the two traversal tools behave consistently for agents driving them.
2421pub const KG_QUERY_DEFAULT_LIMIT: usize = 200;
2422
2423/// Hard ceiling on `kg_query` rows. Matches `kg_timeline` and the
2424/// existing list/recall caps to keep traversal bounded against
2425/// pathological fan-out.
2426pub const KG_QUERY_MAX_LIMIT: usize = 1000;
2427
2428/// Maximum traversal depth supported by [`kg_query`]. The recursive-CTE
2429/// implementation enforces an explicit ceiling so a crafted call cannot
2430/// run an unbounded traversal; the charter (`v0.6.3-grand-slam.md`
2431/// § Performance Budgets) sets the published budget at depth ≤ 5.
2432pub const KG_QUERY_MAX_SUPPORTED_DEPTH: usize = 5;
2433
2434/// Outbound KG traversal from a source memory (Pillar 2 / Stream C —
2435/// `memory_kg_query`). Returns one row per link reachable within
2436/// `max_depth` hops, filtered by:
2437///
2438/// - `valid_at` (RFC3339, optional): only links valid at that instant —
2439///   `valid_from <= valid_at AND (valid_until IS NULL OR valid_until > valid_at)`.
2440///   When omitted, the temporal filter is skipped and rows with NULL
2441///   `valid_from` are also returned (legacy / un-anchored links).
2442/// - `allowed_agents` (optional): when provided, only links with
2443///   `observed_by` in the set are returned. An **empty** allowlist
2444///   returns zero rows by design — callers signaling "no agents are
2445///   trusted" must get an empty traversal, not the unfiltered fallback.
2446///   When omitted entirely (`None`), the agent filter is skipped.
2447/// - `limit`: row cap, clamped to [1, [`KG_QUERY_MAX_LIMIT`]].
2448///
2449/// `max_depth` must be in `[1, KG_QUERY_MAX_SUPPORTED_DEPTH]`; passing
2450/// a larger value yields an explicit error rather than a silent
2451/// truncation, so callers learn they hit the ceiling instead of
2452/// receiving a partial graph.
2453///
2454/// Multi-hop traversal uses a recursive CTE with cycle detection on
2455/// the accumulated path, so cycles in the link graph cannot loop the
2456/// traversal indefinitely. Each hop reapplies the same temporal /
2457/// agent filters as the anchor — a chain only extends through links
2458/// that pass every filter on every hop.
2459///
2460/// Ordering is `depth ASC, COALESCE(valid_from, created_at) ASC,
2461/// created_at ASC` — shallower hops first, then time-ordered within
2462/// each level. For depth=1 callers this collapses to the original
2463/// time ordering. The `depth` field reflects the actual hop count and
2464/// `path` is the full `src->mid->target` chain.
2465pub fn kg_query(
2466    conn: &Connection,
2467    source_id: &str,
2468    max_depth: usize,
2469    valid_at: Option<&str>,
2470    allowed_agents: Option<&[String]>,
2471    limit: Option<usize>,
2472) -> Result<Vec<crate::models::KgQueryNode>> {
2473    use crate::models::KgQueryNode;
2474
2475    if max_depth == 0 {
2476        anyhow::bail!("max_depth must be >= 1");
2477    }
2478    if max_depth > KG_QUERY_MAX_SUPPORTED_DEPTH {
2479        anyhow::bail!(
2480            "max_depth={max_depth} exceeds supported depth={KG_QUERY_MAX_SUPPORTED_DEPTH}"
2481        );
2482    }
2483
2484    // Empty allowlist == "no agents are trusted" — short-circuit so we
2485    // don't have to invent a SQL `IN ()` clause (which is invalid).
2486    if let Some(agents) = allowed_agents
2487        && agents.is_empty()
2488    {
2489        return Ok(Vec::new());
2490    }
2491
2492    let cap = limit
2493        .unwrap_or(KG_QUERY_DEFAULT_LIMIT)
2494        .clamp(1, KG_QUERY_MAX_LIMIT);
2495
2496    // Build the per-hop predicate once; the anchor and recursive members
2497    // both apply it to a row aliased `ml`. Bind values are appended in
2498    // resolution order so positional placeholders line up.
2499    let mut binds: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
2500    let mut hop_filter = String::new();
2501    if let Some(t) = valid_at {
2502        hop_filter.push_str(" AND ml.valid_from IS NOT NULL AND ml.valid_from <= ?");
2503        binds.push(Box::new(t.to_string()));
2504        hop_filter.push_str(&binds.len().to_string());
2505        hop_filter.push_str(" AND (ml.valid_until IS NULL OR ml.valid_until > ?");
2506        binds.push(Box::new(t.to_string()));
2507        hop_filter.push_str(&binds.len().to_string());
2508        hop_filter.push(')');
2509    }
2510    if let Some(agents) = allowed_agents {
2511        // Already short-circuited the empty case above.
2512        hop_filter.push_str(" AND ml.observed_by IN (");
2513        for (i, a) in agents.iter().enumerate() {
2514            binds.push(Box::new(a.clone()));
2515            if i > 0 {
2516                hop_filter.push_str(", ");
2517            }
2518            hop_filter.push('?');
2519            hop_filter.push_str(&binds.len().to_string());
2520        }
2521        hop_filter.push(')');
2522    }
2523
2524    // Anchor binds source_id, recursive member binds max_depth, final
2525    // SELECT binds the row cap. Order matters — placeholders are
2526    // resolved by the position they occupy in the assembled string.
2527    binds.push(Box::new(source_id.to_string()));
2528    let source_ph = binds.len();
2529    binds.push(Box::new(i64::try_from(max_depth).unwrap_or(i64::MAX)));
2530    let max_depth_ph = binds.len();
2531    binds.push(Box::new(i64::try_from(cap).unwrap_or(i64::MAX)));
2532    let limit_ph = binds.len();
2533
2534    // v0.7 AGE acceleration onramp (charter §"Stream C — KG Query Layer"
2535    // bullet 4). The recursive CTE below is the v0.6.3 SQLite/Postgres
2536    // implementation. When the v0.7 SAL ships with Apache AGE wired in,
2537    // the equivalent property-graph query will look like:
2538    //
2539    //   MATCH (s {id: $source_id})-[r*1..$max_depth {valid_from <= $t,
2540    //          observed_by IN $allowed_agents}]->(t)
2541    //   WHERE NONE(n IN nodes(path) WHERE n.id = t.id)  -- cycle prune
2542    //   RETURN t.id, last(r).relation, t.title, length(r) AS depth,
2543    //          [n IN nodes(path) | n.id] AS path
2544    //   ORDER BY depth, last(r).valid_from
2545    //   LIMIT $limit
2546    //
2547    // Stub left here per charter intent so the v0.7 migration to AGE
2548    // has a 1:1 reference query alongside the SQL implementation.
2549
2550    let sql = format!(
2551        "WITH RECURSIVE traversal(\
2552            target_id, relation, valid_from, valid_until, observed_by, \
2553            link_created_at, depth, path\
2554         ) AS (\
2555            SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until, \
2556                   ml.observed_by, ml.created_at, 1, \
2557                   json_array(ml.source_id, ml.target_id) \
2558            FROM memory_links ml \
2559            WHERE ml.source_id = ?{source_ph}{hop_filter} \
2560            UNION ALL \
2561            SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until, \
2562                   ml.observed_by, ml.created_at, t.depth + 1, \
2563                   json_insert(t.path, '$[' || json_array_length(t.path) || ']', ml.target_id) \
2564            FROM memory_links ml \
2565            JOIN traversal t ON ml.source_id = t.target_id \
2566            WHERE t.depth < ?{max_depth_ph} \
2567              AND NOT EXISTS (SELECT 1 FROM json_each(t.path) WHERE value = ml.target_id)\
2568              {hop_filter}\
2569         ) \
2570         SELECT t.target_id, t.relation, t.valid_from, t.valid_until, \
2571                t.observed_by, m.title, m.namespace, t.depth, \
2572                (SELECT group_concat(value, '->') FROM json_each(t.path)) \
2573         FROM traversal t \
2574         JOIN memories m ON m.id = t.target_id \
2575         ORDER BY t.depth ASC, COALESCE(t.valid_from, t.link_created_at) ASC, \
2576                  t.link_created_at ASC \
2577         LIMIT ?{limit_ph}",
2578    );
2579
2580    let mut stmt = conn.prepare(&sql)?;
2581    let bind_refs: Vec<&dyn rusqlite::ToSql> = binds.iter().map(AsRef::as_ref).collect();
2582    let rows = stmt.query_map(rusqlite::params_from_iter(bind_refs), |row| {
2583        let target_id: String = row.get(0)?;
2584        let depth: i64 = row.get(7)?;
2585        Ok(KgQueryNode {
2586            target_id,
2587            relation: row.get(1)?,
2588            valid_from: row.get(2)?,
2589            valid_until: row.get(3)?,
2590            observed_by: row.get(4)?,
2591            title: row.get(5)?,
2592            target_namespace: row.get(6)?,
2593            depth: usize::try_from(depth).unwrap_or(0),
2594            path: row.get(8)?,
2595        })
2596    })?;
2597    rows.collect::<rusqlite::Result<Vec<_>>>()
2598        .map_err(Into::into)
2599}
2600
2601/// List all aliases registered for an entity, ordered by registration
2602/// time then alphabetical for stable display.
2603fn list_entity_aliases(conn: &Connection, entity_id: &str) -> Result<Vec<String>> {
2604    let mut stmt = conn.prepare(
2605        "SELECT alias FROM entity_aliases
2606         WHERE entity_id = ?1
2607         ORDER BY created_at ASC, alias ASC",
2608    )?;
2609    let aliases: Vec<String> = stmt
2610        .query_map(params![entity_id], |r| r.get::<_, String>(0))?
2611        .collect::<rusqlite::Result<Vec<_>>>()?;
2612    Ok(aliases)
2613}
2614
2615/// Register or refresh an agent in the reserved `_agents` namespace.
2616///
2617/// Each agent is stored as a long-tier memory with `title = "agent:<agent_id>"`.
2618/// Duplicate registration for the same `agent_id` refreshes `last_seen_at` and
2619/// overwrites `agent_type` + `capabilities`, while preserving the original
2620/// `registered_at` timestamp (caller-observable provenance).
2621///
2622/// Returns the stored memory ID.
2623pub fn register_agent(
2624    conn: &Connection,
2625    agent_id: &str,
2626    agent_type: &str,
2627    capabilities: &[String],
2628) -> Result<String> {
2629    let title = format!("agent:{agent_id}");
2630    let now = Utc::now().to_rfc3339();
2631
2632    // Preserve original registered_at across re-registration.
2633    let registered_at = conn
2634        .query_row(
2635            "SELECT json_extract(metadata, '$.registered_at') FROM memories
2636             WHERE namespace = ?1 AND title = ?2",
2637            params![AGENTS_NAMESPACE, &title],
2638            |row| row.get::<_, Option<String>>(0),
2639        )
2640        .ok()
2641        .flatten()
2642        .unwrap_or_else(|| now.clone());
2643
2644    let caps_json: Vec<serde_json::Value> = capabilities
2645        .iter()
2646        .map(|c| serde_json::Value::String(c.clone()))
2647        .collect();
2648
2649    let metadata = serde_json::json!({
2650        "agent_id": agent_id,
2651        "agent_type": agent_type,
2652        "capabilities": caps_json,
2653        "registered_at": registered_at,
2654        "last_seen_at": now,
2655    });
2656
2657    let content = serde_json::to_string(&metadata)
2658        .context("failed to serialize agent registration content")?;
2659
2660    let mem = Memory {
2661        id: uuid::Uuid::new_v4().to_string(),
2662        tier: Tier::Long,
2663        namespace: AGENTS_NAMESPACE.to_string(),
2664        title,
2665        content,
2666        tags: vec!["agent-registration".to_string()],
2667        priority: 5,
2668        confidence: 1.0,
2669        source: "system".to_string(),
2670        access_count: 0,
2671        created_at: now.clone(),
2672        updated_at: now,
2673        last_accessed_at: None,
2674        expires_at: None,
2675        metadata,
2676    };
2677
2678    insert(conn, &mem)
2679}
2680
2681/// List every registered agent. Rows are drawn from the `_agents` namespace
2682/// and parsed out of each memory's metadata.
2683pub fn list_agents(conn: &Connection) -> Result<Vec<AgentRegistration>> {
2684    let now = Utc::now().to_rfc3339();
2685    let mut stmt = conn.prepare(
2686        "SELECT metadata FROM memories
2687         WHERE namespace = ?1
2688           AND (expires_at IS NULL OR expires_at > ?2)
2689         ORDER BY json_extract(metadata, '$.registered_at') ASC",
2690    )?;
2691    let rows = stmt.query_map(params![AGENTS_NAMESPACE, now], |row| {
2692        row.get::<_, String>(0)
2693    })?;
2694
2695    let mut agents = Vec::new();
2696    for r in rows {
2697        let raw = r?;
2698        let meta: serde_json::Value =
2699            serde_json::from_str(&raw).context("failed to parse agent metadata as JSON")?;
2700        let agent_id = meta
2701            .get("agent_id")
2702            .and_then(serde_json::Value::as_str)
2703            .unwrap_or_default()
2704            .to_string();
2705        let agent_type = meta
2706            .get("agent_type")
2707            .and_then(serde_json::Value::as_str)
2708            .unwrap_or_default()
2709            .to_string();
2710        let capabilities: Vec<String> = meta
2711            .get("capabilities")
2712            .and_then(serde_json::Value::as_array)
2713            .map(|arr| {
2714                arr.iter()
2715                    .filter_map(|v| v.as_str().map(String::from))
2716                    .collect()
2717            })
2718            .unwrap_or_default();
2719        let registered_at = meta
2720            .get("registered_at")
2721            .and_then(serde_json::Value::as_str)
2722            .unwrap_or_default()
2723            .to_string();
2724        let last_seen_at = meta
2725            .get("last_seen_at")
2726            .and_then(serde_json::Value::as_str)
2727            .unwrap_or_default()
2728            .to_string();
2729        agents.push(AgentRegistration {
2730            agent_id,
2731            agent_type,
2732            capabilities,
2733            registered_at,
2734            last_seen_at,
2735        });
2736    }
2737    Ok(agents)
2738}
2739
2740pub fn stats(conn: &Connection, db_path: &Path) -> Result<Stats> {
2741    let total: usize = conn.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))?;
2742
2743    let mut stmt =
2744        conn.prepare("SELECT tier, COUNT(*) FROM memories GROUP BY tier ORDER BY COUNT(*) DESC")?;
2745    let by_tier = stmt
2746        .query_map([], |row| {
2747            Ok(TierCount {
2748                tier: row.get(0)?,
2749                count: row.get(1)?,
2750            })
2751        })?
2752        .collect::<rusqlite::Result<Vec<_>>>()?;
2753
2754    let mut stmt = conn.prepare(
2755        "SELECT namespace, COUNT(*) FROM memories GROUP BY namespace ORDER BY COUNT(*) DESC",
2756    )?;
2757    let by_namespace = stmt
2758        .query_map([], |row| {
2759            Ok(NamespaceCount {
2760                namespace: row.get(0)?,
2761                count: row.get(1)?,
2762            })
2763        })?
2764        .collect::<rusqlite::Result<Vec<_>>>()?;
2765
2766    let now = Utc::now().to_rfc3339();
2767    let one_hour = (Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
2768    let expiring_soon: usize = conn.query_row(
2769        "SELECT COUNT(*) FROM memories WHERE expires_at IS NOT NULL AND expires_at > ?1 AND expires_at <= ?2",
2770        params![now, one_hour], |r| r.get(0),
2771    )?;
2772
2773    let links_count: usize = conn
2774        .query_row("SELECT COUNT(*) FROM memory_links", [], |r| r.get(0))
2775        .unwrap_or(0);
2776    let db_size_bytes = std::fs::metadata(db_path).map_or(0, |m| m.len());
2777
2778    Ok(Stats {
2779        total,
2780        by_tier,
2781        by_namespace,
2782        expiring_soon,
2783        links_count,
2784        db_size_bytes,
2785    })
2786}
2787
2788/// Run GC if there are any expired memories. Lightweight check first.
2789pub fn gc_if_needed(conn: &Connection, archive: bool) -> Result<usize> {
2790    let now = Utc::now().to_rfc3339();
2791    let has_expired: bool = conn
2792        .query_row(
2793            "SELECT EXISTS(SELECT 1 FROM memories WHERE expires_at IS NOT NULL AND expires_at < ?1)",
2794            params![now],
2795            |r| r.get(0),
2796        )
2797        .unwrap_or(false);
2798    if has_expired {
2799        gc(conn, archive)
2800    } else {
2801        Ok(0)
2802    }
2803}
2804
2805/// Purge old archives if `archive_max_days` is configured.
2806pub fn auto_purge_archive(conn: &Connection, max_days: Option<i64>) -> Result<usize> {
2807    match max_days {
2808        Some(days) if days > 0 => purge_archive(conn, Some(days)),
2809        _ => Ok(0),
2810    }
2811}
2812
2813pub fn gc(conn: &Connection, archive: bool) -> Result<usize> {
2814    let now = Utc::now().to_rfc3339();
2815    conn.execute_batch("BEGIN IMMEDIATE")?;
2816    let result = (|| -> Result<usize> {
2817        if archive {
2818            conn.execute(
2819                "INSERT OR REPLACE INTO archived_memories
2820                 (id, tier, namespace, title, content, tags, priority, confidence,
2821                  source, access_count, created_at, updated_at, last_accessed_at,
2822                  expires_at, archived_at, archive_reason, metadata)
2823                 SELECT id, tier, namespace, title, content, tags, priority, confidence,
2824                        source, access_count, created_at, updated_at, last_accessed_at,
2825                        expires_at, ?1, 'ttl_expired', metadata
2826                 FROM memories
2827                 WHERE expires_at IS NOT NULL AND expires_at < ?1",
2828                params![now],
2829            )?;
2830        }
2831        let deleted = conn.execute(
2832            "DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at < ?1",
2833            params![now],
2834        )?;
2835        Ok(deleted)
2836    })();
2837    match result {
2838        Ok(n) => {
2839            conn.execute_batch("COMMIT")?;
2840            // Clean up namespace_meta rows pointing to deleted memories
2841            let _ = conn.execute(
2842                "DELETE FROM namespace_meta WHERE standard_id NOT IN (SELECT id FROM memories)",
2843                [],
2844            );
2845            Ok(n)
2846        }
2847        Err(e) => {
2848            let _ = conn.execute_batch("ROLLBACK");
2849            Err(e)
2850        }
2851    }
2852}
2853
2854// ---------------------------------------------------------------------------
2855// Archive operations
2856// ---------------------------------------------------------------------------
2857
2858pub fn list_archived(
2859    conn: &Connection,
2860    namespace: Option<&str>,
2861    limit: usize,
2862    offset: usize,
2863) -> Result<Vec<serde_json::Value>> {
2864    let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match namespace {
2865        Some(ns) => (
2866            "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
2867             source, access_count, created_at, updated_at, last_accessed_at, \
2868             expires_at, archived_at, archive_reason, metadata \
2869             FROM archived_memories WHERE namespace = ?1 \
2870             ORDER BY archived_at DESC LIMIT ?2 OFFSET ?3"
2871                .to_string(),
2872            vec![Box::new(ns.to_string()), Box::new(limit), Box::new(offset)],
2873        ),
2874        None => (
2875            "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
2876             source, access_count, created_at, updated_at, last_accessed_at, \
2877             expires_at, archived_at, archive_reason, metadata \
2878             FROM archived_memories \
2879             ORDER BY archived_at DESC LIMIT ?1 OFFSET ?2"
2880                .to_string(),
2881            vec![Box::new(limit), Box::new(offset)],
2882        ),
2883    };
2884    let params_refs: Vec<&dyn rusqlite::types::ToSql> =
2885        params_vec.iter().map(std::convert::AsRef::as_ref).collect();
2886    let mut stmt = conn.prepare(&sql)?;
2887    let rows = stmt.query_map(params_refs.as_slice(), |row| {
2888        let metadata_str = row
2889            .get::<_, String>(16)
2890            .unwrap_or_else(|_| "{}".to_string());
2891        let metadata: serde_json::Value =
2892            serde_json::from_str(&metadata_str).unwrap_or_else(|_| serde_json::json!({}));
2893        Ok(serde_json::json!({
2894            "id": row.get::<_, String>(0)?,
2895            "tier": row.get::<_, String>(1)?,
2896            "namespace": row.get::<_, String>(2)?,
2897            "title": row.get::<_, String>(3)?,
2898            "content": row.get::<_, String>(4)?,
2899            "tags": row.get::<_, String>(5)?,
2900            "priority": row.get::<_, i32>(6)?,
2901            "confidence": row.get::<_, f64>(7)?,
2902            "source": row.get::<_, String>(8)?,
2903            "access_count": row.get::<_, i64>(9)?,
2904            "created_at": row.get::<_, String>(10)?,
2905            "updated_at": row.get::<_, String>(11)?,
2906            "last_accessed_at": row.get::<_, Option<String>>(12)?,
2907            "expires_at": row.get::<_, Option<String>>(13)?,
2908            "archived_at": row.get::<_, String>(14)?,
2909            "archive_reason": row.get::<_, String>(15)?,
2910            "metadata": metadata,
2911        }))
2912    })?;
2913    rows.collect::<rusqlite::Result<Vec<_>>>()
2914        .map_err(Into::into)
2915}
2916
2917pub fn restore_archived(conn: &Connection, id: &str) -> Result<bool> {
2918    let now = Utc::now().to_rfc3339();
2919    conn.execute_batch("BEGIN IMMEDIATE")?;
2920    let result = (|| -> Result<bool> {
2921        let exists: bool = conn
2922            .query_row(
2923                "SELECT COUNT(*) > 0 FROM archived_memories WHERE id = ?1",
2924                params![id],
2925                |r| r.get(0),
2926            )
2927            .unwrap_or(false);
2928        if !exists {
2929            return Ok(false);
2930        }
2931        // Check if ID already exists in active memories to prevent silent overwrite
2932        let active_exists: bool = conn
2933            .query_row(
2934                "SELECT COUNT(*) > 0 FROM memories WHERE id = ?1",
2935                params![id],
2936                |r| r.get(0),
2937            )
2938            .unwrap_or(false);
2939        if active_exists {
2940            anyhow::bail!(
2941                "cannot restore: memory {id} already exists in active table (would overwrite)"
2942            );
2943        }
2944        // Validate archived metadata before restoring
2945        let archived_metadata: String = conn
2946            .query_row(
2947                "SELECT metadata FROM archived_memories WHERE id = ?1",
2948                params![id],
2949                |r| r.get(0),
2950            )
2951            .unwrap_or_else(|_| "{}".to_string());
2952        let meta_value: serde_json::Value =
2953            serde_json::from_str(&archived_metadata).unwrap_or_else(|_| serde_json::json!({}));
2954        if let Err(e) = crate::validate::validate_metadata(&meta_value) {
2955            tracing::warn!("archived memory {id} has invalid metadata, resetting to {{}}: {e}");
2956            conn.execute(
2957                "UPDATE archived_memories SET metadata = '{}' WHERE id = ?1",
2958                params![id],
2959            )?;
2960        }
2961
2962        conn.execute(
2963            "INSERT INTO memories
2964             (id, tier, namespace, title, content, tags, priority, confidence,
2965              source, access_count, created_at, updated_at, last_accessed_at, expires_at, metadata)
2966             SELECT id, 'long', namespace, title, content, tags, priority, confidence,
2967                    source, access_count, created_at, ?1, last_accessed_at, NULL, metadata
2968             FROM archived_memories WHERE id = ?2",
2969            params![now, id],
2970        )?;
2971        conn.execute("DELETE FROM archived_memories WHERE id = ?1", params![id])?;
2972        Ok(true)
2973    })();
2974    match result {
2975        Ok(v) => {
2976            conn.execute_batch("COMMIT")?;
2977            Ok(v)
2978        }
2979        Err(e) => {
2980            let _ = conn.execute_batch("ROLLBACK");
2981            Err(e)
2982        }
2983    }
2984}
2985
2986pub fn purge_archive(conn: &Connection, older_than_days: Option<i64>) -> Result<usize> {
2987    match older_than_days {
2988        Some(days) if days < 0 => {
2989            anyhow::bail!("older_than_days must be non-negative (got {days})");
2990        }
2991        Some(days) => {
2992            let cutoff = (Utc::now() - chrono::Duration::days(days)).to_rfc3339();
2993            let deleted = conn.execute(
2994                "DELETE FROM archived_memories WHERE archived_at < ?1",
2995                params![cutoff],
2996            )?;
2997            Ok(deleted)
2998        }
2999        None => {
3000            let deleted = conn.execute("DELETE FROM archived_memories", [])?;
3001            Ok(deleted)
3002        }
3003    }
3004}
3005
3006pub fn archive_stats(conn: &Connection) -> Result<serde_json::Value> {
3007    let total: i64 = conn.query_row("SELECT COUNT(*) FROM archived_memories", [], |r| r.get(0))?;
3008    let mut stmt = conn.prepare(
3009        "SELECT namespace, COUNT(*) FROM archived_memories GROUP BY namespace ORDER BY COUNT(*) DESC",
3010    )?;
3011    let by_ns: Vec<serde_json::Value> = stmt
3012        .query_map([], |row| {
3013            Ok(serde_json::json!({
3014                "namespace": row.get::<_, String>(0)?,
3015                "count": row.get::<_, i64>(1)?,
3016            }))
3017        })?
3018        .collect::<rusqlite::Result<Vec<_>>>()?;
3019    Ok(serde_json::json!({
3020        "archived_total": total,
3021        "by_namespace": by_ns,
3022    }))
3023}
3024
3025pub fn export_all(conn: &Connection) -> Result<Vec<Memory>> {
3026    let now = Utc::now().to_rfc3339();
3027    let mut stmt = conn.prepare(
3028        "SELECT * FROM memories WHERE expires_at IS NULL OR expires_at > ?1 ORDER BY created_at ASC",
3029    )?;
3030    let rows = stmt.query_map(params![now], row_to_memory)?;
3031    rows.collect::<rusqlite::Result<Vec<_>>>()
3032        .map_err(Into::into)
3033}
3034
3035pub fn export_links(conn: &Connection) -> Result<Vec<MemoryLink>> {
3036    let now = Utc::now().to_rfc3339();
3037    let mut stmt = conn.prepare(
3038        "SELECT ml.source_id, ml.target_id, ml.relation, ml.created_at
3039         FROM memory_links ml
3040         JOIN memories ms ON ms.id = ml.source_id AND (ms.expires_at IS NULL OR ms.expires_at > ?1)
3041         JOIN memories mt ON mt.id = ml.target_id AND (mt.expires_at IS NULL OR mt.expires_at > ?1)",
3042    )?;
3043    let rows = stmt.query_map(params![now], |row| {
3044        Ok(MemoryLink {
3045            source_id: row.get(0)?,
3046            target_id: row.get(1)?,
3047            relation: row.get(2)?,
3048            created_at: row.get(3)?,
3049        })
3050    })?;
3051    rows.collect::<rusqlite::Result<Vec<_>>>()
3052        .map_err(Into::into)
3053}
3054
3055/// Insert with timestamp-aware conflict resolution for sync.
3056/// Only overwrites if the incoming memory is newer (by `updated_at`,
3057/// tiebroken by memory.id for a total order across peers —
3058/// ultrareview #344, #345).
3059///
3060/// Rationale: ISO 8601 / RFC 3339 strings compare lexicographically
3061/// as long as all timestamps carry consistent precision + Z suffix.
3062/// Equal timestamps (common when two nodes edit in the same ms, or
3063/// when NTP aligns clocks) previously produced non-deterministic
3064/// winners per peer, causing permanent mesh divergence. Adding the
3065/// memory.id tiebreaker yields a total order every peer agrees on.
3066pub fn insert_if_newer(conn: &Connection, mem: &Memory) -> Result<String> {
3067    let tags_json = serde_json::to_string(&mem.tags)?;
3068    let metadata_json = serde_json::to_string(&mem.metadata)?;
3069    let actual_id: String = conn.query_row(
3070        "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, last_accessed_at, expires_at, metadata)
3071         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3072         ON CONFLICT(title, namespace) DO UPDATE SET
3073            content = CASE WHEN excluded.updated_at > memories.updated_at
3074                             OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3075                           THEN excluded.content ELSE memories.content END,
3076            tags = CASE WHEN excluded.updated_at > memories.updated_at
3077                          OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3078                        THEN excluded.tags ELSE memories.tags END,
3079            priority = MAX(memories.priority, excluded.priority),
3080            confidence = MAX(memories.confidence, excluded.confidence),
3081            source = CASE WHEN excluded.updated_at > memories.updated_at
3082                            OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3083                          THEN excluded.source ELSE memories.source END,
3084            tier = CASE WHEN excluded.tier = 'long' THEN 'long'
3085                        WHEN memories.tier = 'long' THEN 'long'
3086                        WHEN excluded.tier = 'mid' THEN 'mid'
3087                        ELSE memories.tier END,
3088            updated_at = MAX(memories.updated_at, excluded.updated_at),
3089            access_count = MAX(memories.access_count, excluded.access_count),
3090            expires_at = CASE WHEN excluded.tier = 'long' OR memories.tier = 'long' THEN NULL
3091                              ELSE COALESCE(excluded.expires_at, memories.expires_at) END,
3092            -- Preserve metadata.agent_id across newer-wins merge (NHI provenance immutable).
3093            metadata = CASE
3094                WHEN json_extract(memories.metadata, '$.agent_id') IS NOT NULL
3095                THEN json_set(
3096                    CASE WHEN excluded.updated_at > memories.updated_at
3097                              OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3098                         THEN excluded.metadata
3099                         ELSE memories.metadata END,
3100                    '$.agent_id',
3101                    json_extract(memories.metadata, '$.agent_id')
3102                )
3103                ELSE CASE WHEN excluded.updated_at > memories.updated_at
3104                               OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3105                          THEN excluded.metadata
3106                          ELSE memories.metadata END
3107            END
3108         RETURNING id",
3109        params![
3110            mem.id, mem.tier.as_str(), mem.namespace, mem.title, mem.content,
3111            tags_json, mem.priority, mem.confidence, mem.source, mem.access_count,
3112            mem.created_at, mem.updated_at, mem.last_accessed_at, mem.expires_at,
3113            metadata_json,
3114        ],
3115        |r| r.get(0),
3116    )?;
3117    Ok(actual_id)
3118}
3119
3120// --- Embedding support ---
3121
3122/// Store an embedding vector for a memory.
3123pub fn set_embedding(conn: &Connection, id: &str, embedding: &[f32]) -> Result<()> {
3124    let bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
3125    conn.execute(
3126        "UPDATE memories SET embedding = ?1 WHERE id = ?2",
3127        params![bytes, id],
3128    )?;
3129    Ok(())
3130}
3131
3132/// Load an embedding vector for a memory. Returns None if not set.
3133#[allow(clippy::unnecessary_wraps)]
3134pub fn get_embedding(conn: &Connection, id: &str) -> Result<Option<Vec<f32>>> {
3135    let result: Option<Vec<u8>> = conn
3136        .query_row(
3137            "SELECT embedding FROM memories WHERE id = ?1",
3138            params![id],
3139            |row| row.get(0),
3140        )
3141        .ok();
3142    match result {
3143        Some(bytes) if !bytes.is_empty() => {
3144            let floats: Vec<f32> = bytes
3145                .chunks_exact(4)
3146                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
3147                .collect();
3148            Ok(Some(floats))
3149        }
3150        _ => Ok(None),
3151    }
3152}
3153
3154/// Get all memory IDs that are missing embeddings.
3155pub fn get_unembedded_ids(conn: &Connection) -> Result<Vec<(String, String, String)>> {
3156    let mut stmt =
3157        conn.prepare("SELECT id, title, content FROM memories WHERE embedding IS NULL")?;
3158    let rows = stmt.query_map([], |row| {
3159        Ok((
3160            row.get::<_, String>(0)?,
3161            row.get::<_, String>(1)?,
3162            row.get::<_, String>(2)?,
3163        ))
3164    })?;
3165    rows.collect::<rusqlite::Result<Vec<_>>>()
3166        .map_err(Into::into)
3167}
3168
3169/// Get all stored embeddings as (id, embedding) pairs for building the HNSW index.
3170pub fn get_all_embeddings(conn: &Connection) -> Result<Vec<(String, Vec<f32>)>> {
3171    let mut stmt =
3172        conn.prepare("SELECT id, embedding FROM memories WHERE embedding IS NOT NULL")?;
3173    let rows = stmt.query_map([], |row| {
3174        let id: String = row.get(0)?;
3175        let bytes: Vec<u8> = row.get(1)?;
3176        Ok((id, bytes))
3177    })?;
3178    let mut entries = Vec::new();
3179    for row in rows {
3180        let (id, bytes) = row?;
3181        if bytes.is_empty() {
3182            continue;
3183        }
3184        let floats: Vec<f32> = bytes
3185            .chunks_exact(4)
3186            .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
3187            .collect();
3188        entries.push((id, floats));
3189    }
3190    Ok(entries)
3191}
3192
3193/// Hybrid recall — FTS5 keyword search + semantic cosine similarity.
3194/// Returns memories ranked by a blended score of keyword and semantic relevance.
3195/// When an HNSW `vector_index` is provided, uses approximate nearest-neighbor
3196/// search instead of scanning all embeddings linearly.
3197#[allow(clippy::too_many_arguments)]
3198#[allow(clippy::too_many_lines)]
3199pub fn recall_hybrid(
3200    conn: &Connection,
3201    context: &str,
3202    query_embedding: &[f32],
3203    namespace: Option<&str>,
3204    limit: usize,
3205    tags_filter: Option<&str>,
3206    since: Option<&str>,
3207    until: Option<&str>,
3208    vector_index: Option<&crate::hnsw::VectorIndex>,
3209    short_extend: i64,
3210    mid_extend: i64,
3211    as_agent: Option<&str>,
3212    budget_tokens: Option<usize>,
3213    scoring: &crate::config::ResolvedScoring,
3214) -> Result<(Vec<(Memory, f64)>, usize)> {
3215    let now = Utc::now().to_rfc3339();
3216    let fts_query = sanitize_fts_query(context, true);
3217    let prefixes = compute_visibility_prefixes(as_agent);
3218    let (vis_p, vis_t, vis_u, vis_o) = prefixes.clone();
3219
3220    // Task 1.12: hierarchy expansion (same logic as `recall`). Hierarchical
3221    // `namespace` broadens filter to ancestor chain; flat namespaces stay
3222    // exact-match.
3223    let (fts_hierarchy_in, hierarchy_active) = hierarchy_in_clause(namespace);
3224    let fts_hierarchy_fragment = fts_hierarchy_in.unwrap_or_default();
3225    // Semantic stmt has no `m.` alias and binds at slot 1 — compute separately.
3226    let sem_hierarchy_fragment = if hierarchy_active {
3227        if let Some(ns) = namespace {
3228            let ancestors = crate::models::namespace_ancestors(ns);
3229            let quoted: Vec<String> = ancestors
3230                .iter()
3231                .map(|a| format!("'{}'", a.replace('\'', "''")))
3232                .collect();
3233            format!("AND memories.namespace IN ({})", quoted.join(","))
3234        } else {
3235            String::new()
3236        }
3237    } else {
3238        String::new()
3239    };
3240    let effective_namespace = if hierarchy_active { None } else { namespace };
3241
3242    // Step 1: Get FTS candidates (up to 3x limit to have a good pool)
3243    let fts_limit = (limit * 3).max(30);
3244    let fts_sql = format!(
3245        "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
3246                m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
3247                m.last_accessed_at, m.expires_at, m.metadata, m.embedding,
3248                (fts.rank * -1) + (m.priority * 0.5) + (MIN(m.access_count, 50) * 0.1)
3249                + (m.confidence * 2.0)
3250                + (CASE m.tier WHEN 'long' THEN 3.0 WHEN 'mid' THEN 1.0 ELSE 0.0 END)
3251                + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
3252                AS fts_score
3253         FROM memories_fts fts
3254         JOIN memories m ON m.rowid = fts.rowid
3255         WHERE memories_fts MATCH ?1
3256           AND (?2 IS NULL OR m.namespace = ?2)
3257           {fts_hierarchy_fragment}
3258           AND (m.expires_at IS NULL OR m.expires_at > ?3)
3259           AND (?4 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?4))
3260           AND (?5 IS NULL OR m.created_at >= ?5)
3261           AND (?6 IS NULL OR m.created_at <= ?6)
3262           {vis}
3263         ORDER BY fts_score DESC
3264         LIMIT ?7",
3265        vis = visibility_clause(8, "m"),
3266    );
3267    let mut fts_stmt = conn.prepare(&fts_sql)?;
3268
3269    // Step 2: Get semantic candidates — all memories with embeddings
3270    let sem_sql = format!(
3271        "SELECT id, tier, namespace, title, content, tags, priority,
3272                confidence, source, access_count, created_at, updated_at,
3273                last_accessed_at, expires_at, metadata, embedding
3274         FROM memories
3275         WHERE embedding IS NOT NULL
3276           AND (?1 IS NULL OR namespace = ?1)
3277           {sem_hierarchy_fragment}
3278           AND (expires_at IS NULL OR expires_at > ?2)
3279           AND (?3 IS NULL OR EXISTS (SELECT 1 FROM json_each(memories.tags) WHERE json_each.value = ?3))
3280           AND (?4 IS NULL OR created_at >= ?4)
3281           AND (?5 IS NULL OR created_at <= ?5)
3282           {vis}",
3283        vis = visibility_clause(6, "memories"),
3284    );
3285    let mut sem_stmt = conn.prepare(&sem_sql)?;
3286
3287    // Collect FTS results with scores
3288    let mut scored: HashMap<String, (Memory, f64, f64)> = HashMap::new(); // id -> (memory, fts_score, cosine_score)
3289
3290    let fts_rows = fts_stmt.query_map(
3291        params![
3292            fts_query,
3293            effective_namespace,
3294            now,
3295            tags_filter,
3296            since,
3297            until,
3298            fts_limit,
3299            vis_p,
3300            vis_t,
3301            vis_u,
3302            vis_o,
3303        ],
3304        |row| {
3305            let mem = row_to_memory(row)?;
3306            let fts_score: f64 = row.get(16)?;
3307            Ok((mem, fts_score))
3308        },
3309    )?;
3310
3311    let mut max_fts_score: f64 = 1.0;
3312    for row in fts_rows {
3313        let (mem, fts_score) = row?;
3314        if fts_score > max_fts_score {
3315            max_fts_score = fts_score;
3316        }
3317        // Compute cosine similarity if embedding exists
3318        let cosine = get_embedding(conn, &mem.id)?.map_or(0.0, |emb| {
3319            f64::from(crate::embeddings::Embedder::cosine_similarity(
3320                query_embedding,
3321                &emb,
3322            ))
3323        });
3324        scored.insert(mem.id.clone(), (mem, fts_score, cosine));
3325    }
3326
3327    // Semantic-only candidates — use HNSW index for fast ANN if available,
3328    // otherwise fall back to linear scan over all embeddings.
3329    if let Some(idx) = vector_index {
3330        // HNSW approximate nearest-neighbor search
3331        let ann_limit = (limit * 5).max(50);
3332        let hits = idx.search(query_embedding, ann_limit);
3333        for hit in hits {
3334            if scored.contains_key(&hit.id) {
3335                continue;
3336            }
3337            let cosine = f64::from(1.0 - hit.distance);
3338            // v0.6.2 (S18 iteration): cosine gate relaxed 0.3 → 0.2.
3339            // Scenario-18 caught a real-world miss at the old ceiling:
3340            // semantically-related pairs with varied phrasing ("morning
3341            // outdoor exercise routine" vs. "brisk uphill strides along
3342            // the ridge line trails") landed at 0.25-0.29 cosine and
3343            // silently fell below 0.3, returning zero semantic hits.
3344            // 0.2 keeps clearly-unrelated content out (random noise
3345            // hovers near 0) while admitting legitimate semantic
3346            // associations; the blended score + FTS component still
3347            // rank relevance on the way out.
3348            if cosine > 0.2
3349                && let Some(mem) = get(conn, &hit.id)?
3350            {
3351                // Apply namespace/expiry/tag filters. Task 1.12: when
3352                // hierarchy expansion is active, allow any ancestor match
3353                // (namespace_ancestors gives us the set); otherwise exact.
3354                if let Some(ns) = namespace {
3355                    if hierarchy_active {
3356                        let ancestors = crate::models::namespace_ancestors(ns);
3357                        if !ancestors.iter().any(|a| a == &mem.namespace) {
3358                            continue;
3359                        }
3360                    } else if mem.namespace != ns {
3361                        continue;
3362                    }
3363                }
3364                if let Some(exp) = &mem.expires_at
3365                    && exp.as_str() <= now.as_str()
3366                {
3367                    continue;
3368                }
3369                if let Some(tf) = tags_filter
3370                    && !mem.tags.iter().any(|t| t == tf)
3371                {
3372                    continue;
3373                }
3374                if let Some(s) = since
3375                    && mem.created_at.as_str() < s
3376                {
3377                    continue;
3378                }
3379                if let Some(u) = until
3380                    && mem.created_at.as_str() > u
3381                {
3382                    continue;
3383                }
3384                // #151 visibility filter (HNSW branch)
3385                if !is_visible(&mem, &prefixes) {
3386                    continue;
3387                }
3388                scored.insert(mem.id.clone(), (mem, 0.0, cosine));
3389            }
3390        }
3391    } else {
3392        // Fallback: linear scan over all embeddings
3393        let sem_rows = sem_stmt.query_map(
3394            params![
3395                effective_namespace,
3396                now,
3397                tags_filter,
3398                since,
3399                until,
3400                vis_p,
3401                vis_t,
3402                vis_u,
3403                vis_o
3404            ],
3405            |row| {
3406                let mem = row_to_memory(row)?;
3407                let emb_bytes: Option<Vec<u8>> = row.get(15)?;
3408                Ok((mem, emb_bytes))
3409            },
3410        )?;
3411
3412        for row in sem_rows {
3413            let (mem, emb_bytes) = row?;
3414            if scored.contains_key(&mem.id) {
3415                continue;
3416            }
3417            if let Some(bytes) = emb_bytes
3418                && !bytes.is_empty()
3419            {
3420                let emb: Vec<f32> = bytes
3421                    .chunks_exact(4)
3422                    .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
3423                    .collect();
3424                let cosine = f64::from(crate::embeddings::Embedder::cosine_similarity(
3425                    query_embedding,
3426                    &emb,
3427                ));
3428                // v0.6.2 (S18): see matching note above at the HNSW gate.
3429                if cosine > 0.2 {
3430                    scored.insert(mem.id.clone(), (mem, 0.0, cosine));
3431                }
3432            }
3433        }
3434    }
3435
3436    // Normalize FTS scores and compute blended score.
3437    // Adaptive blend: semantic weight decreases for longer content (embeddings
3438    // lose information on long text; FTS stays precise).  Short memories
3439    // (< 500 chars) get 50/50, long memories (> 5 000 chars) get 15/85.
3440    // v0.6.0.0: multiply the blend by a per-tier exponential time-decay with
3441    // half-life defaults 7 d (short) / 30 d (mid) / 365 d (long). The
3442    // `legacy_scoring` config knob short-circuits the decay back to 1.0 for
3443    // A/B comparison and emergency regression rollback.
3444    let now_utc = Utc::now();
3445    let mut results: Vec<(Memory, f64)> = scored
3446        .into_values()
3447        .map(|(mem, fts_score, cosine)| {
3448            let norm_fts = if max_fts_score > 0.0 {
3449                fts_score / max_fts_score
3450            } else {
3451                0.0
3452            };
3453            let content_len = f64::from(i32::try_from(mem.content.len()).expect("usize as i64"));
3454            // Lerp semantic_weight from 0.50 (≤500 chars) to 0.15 (≥5000 chars)
3455            let semantic_weight = if content_len <= 500.0 {
3456                0.50
3457            } else if content_len >= 5000.0 {
3458                0.15
3459            } else {
3460                0.50 - 0.35 * ((content_len - 500.0) / 4500.0)
3461            };
3462            let blended = semantic_weight * cosine + (1.0 - semantic_weight) * norm_fts;
3463            let age_days = chrono::DateTime::parse_from_rfc3339(&mem.created_at)
3464                .ok()
3465                .map_or(0.0, |ts| {
3466                    let secs = (now_utc - ts.with_timezone(&Utc)).num_seconds();
3467                    // Saturate at ~68 y (i32::MAX seconds). Practical: any memory
3468                    // older than that decays all the way down and the exact age
3469                    // doesn't matter. Precision loss here is negligible — we
3470                    // only need ~hour granularity on a 1 e-9..1.0 multiplier.
3471                    #[allow(clippy::cast_precision_loss)]
3472                    {
3473                        secs as f64 / 86_400.0
3474                    }
3475                });
3476            let decay = scoring.decay_multiplier(&mem.tier, age_days);
3477            (mem, blended * decay)
3478        })
3479        .collect();
3480
3481    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3482    results.truncate(limit);
3483
3484    // Task 1.12: proximity boost (if hierarchy expansion is active).
3485    let boosted = if let (true, Some(anchor)) = (hierarchy_active, namespace) {
3486        apply_proximity_boost(results, anchor)
3487    } else {
3488        results
3489    };
3490
3491    // Task 1.11: apply token budget in rank order (AFTER proximity).
3492    let (budgeted, tokens_used) = apply_token_budget(boosted, budget_tokens);
3493
3494    // Touch surviving memories only.
3495    for (mem, _) in &budgeted {
3496        if let Err(e) = touch(conn, &mem.id, short_extend, mid_extend) {
3497            tracing::warn!("touch failed for memory {}: {}", &mem.id, e);
3498        }
3499    }
3500
3501    Ok((budgeted, tokens_used))
3502}
3503
3504/// Checkpoint WAL for clean shutdown.
3505pub fn checkpoint(conn: &Connection) -> Result<()> {
3506    conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
3507    Ok(())
3508}
3509
3510// ---------------------------------------------------------------------------
3511// Phase 3 foundation (issue #224) — sync_state helpers.
3512//
3513// These are additive: they do not change how the existing `ai-memory sync`
3514// command behaves in v0.6.0 GA. They exist so HTTP sync endpoints and the
3515// CRDT-lite merge follow-up can durably track "last updated_at seen from
3516// peer X" per local agent.
3517// ---------------------------------------------------------------------------
3518
3519/// Record the latest `updated_at` this local agent has observed from `peer_id`.
3520/// Monotonic by timestamp — older writes do not overwrite newer ones.
3521/// Lazily creates the row on first observation.
3522pub fn sync_state_observe(
3523    conn: &Connection,
3524    agent_id: &str,
3525    peer_id: &str,
3526    seen_at: &str,
3527) -> Result<()> {
3528    let now = Utc::now().to_rfc3339();
3529    conn.execute(
3530        "INSERT INTO sync_state (agent_id, peer_id, last_seen_at, last_pulled_at) \
3531         VALUES (?1, ?2, ?3, ?4) \
3532         ON CONFLICT(agent_id, peer_id) DO UPDATE SET \
3533            last_seen_at = CASE WHEN excluded.last_seen_at > last_seen_at \
3534                                THEN excluded.last_seen_at \
3535                                ELSE last_seen_at END, \
3536            last_pulled_at = excluded.last_pulled_at",
3537        params![agent_id, peer_id, seen_at, now],
3538    )?;
3539    Ok(())
3540}
3541
3542/// Load the full vector clock for `agent_id` — the set of
3543/// (`peer_id` -> `last_seen_at`) this local agent tracks.
3544pub fn sync_state_load(conn: &Connection, agent_id: &str) -> Result<crate::models::VectorClock> {
3545    let mut stmt =
3546        conn.prepare("SELECT peer_id, last_seen_at FROM sync_state WHERE agent_id = ?1")?;
3547    let rows = stmt.query_map(params![agent_id], |row| {
3548        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
3549    })?;
3550    let mut clock = crate::models::VectorClock::default();
3551    for row in rows {
3552        let (peer, at) = row?;
3553        clock.entries.insert(peer, at);
3554    }
3555    Ok(clock)
3556}
3557
3558/// Look up this peer's last-push watermark for `peer_id`. Returns `None`
3559/// if we've never successfully pushed to them (foundation-era rows also
3560/// return `None` because the column was added in schema v12).
3561#[must_use]
3562#[allow(dead_code)] // called via lib crate (daemon_runtime); bin sees it as unused
3563pub fn sync_state_last_pushed(conn: &Connection, agent_id: &str, peer_id: &str) -> Option<String> {
3564    conn.query_row(
3565        "SELECT last_pushed_at FROM sync_state WHERE agent_id = ?1 AND peer_id = ?2",
3566        params![agent_id, peer_id],
3567        |r| r.get::<_, Option<String>>(0),
3568    )
3569    .ok()
3570    .flatten()
3571}
3572
3573/// Record that local memories up to `updated_at = pushed_at` have been
3574/// accepted by `peer_id`. Creates the row if it doesn't exist; monotonic.
3575#[allow(dead_code)] // called via lib crate (daemon_runtime); bin sees it as unused
3576pub fn sync_state_record_push(
3577    conn: &Connection,
3578    agent_id: &str,
3579    peer_id: &str,
3580    pushed_at: &str,
3581) -> Result<()> {
3582    let now = Utc::now().to_rfc3339();
3583    conn.execute(
3584        "INSERT INTO sync_state (agent_id, peer_id, last_seen_at, last_pulled_at, last_pushed_at) \
3585         VALUES (?1, ?2, ?3, ?3, ?4) \
3586         ON CONFLICT(agent_id, peer_id) DO UPDATE SET \
3587            last_pushed_at = CASE \
3588                WHEN excluded.last_pushed_at IS NULL THEN last_pushed_at \
3589                WHEN last_pushed_at IS NULL THEN excluded.last_pushed_at \
3590                WHEN excluded.last_pushed_at > last_pushed_at THEN excluded.last_pushed_at \
3591                ELSE last_pushed_at END",
3592        params![agent_id, peer_id, now, pushed_at],
3593    )?;
3594    Ok(())
3595}
3596
3597/// Return memories whose `updated_at > since`, ordered by `updated_at`
3598/// ascending. Used by `GET /api/v1/sync/since` to stream incremental
3599/// updates to a peer. Caps at `limit` rows (caller-chosen pagination).
3600pub fn memories_updated_since(
3601    conn: &Connection,
3602    since: Option<&str>,
3603    limit: usize,
3604) -> Result<Vec<Memory>> {
3605    let mut stmt = conn.prepare(
3606        "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
3607                source, access_count, created_at, updated_at, last_accessed_at, \
3608                expires_at, metadata \
3609         FROM memories \
3610         WHERE (?1 IS NULL OR updated_at > ?1) \
3611         ORDER BY updated_at ASC \
3612         LIMIT ?2",
3613    )?;
3614    let rows = stmt.query_map(params![since, limit], row_to_memory)?;
3615    rows.collect::<rusqlite::Result<Vec<_>>>()
3616        .map_err(Into::into)
3617}
3618
3619/// Deep health check — verifies DB is accessible and FTS is functional.
3620pub fn health_check(conn: &Connection) -> Result<bool> {
3621    let _: i64 = conn.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))?;
3622    conn.execute(
3623        "INSERT INTO memories_fts(memories_fts) VALUES('integrity-check')",
3624        [],
3625    )?;
3626    Ok(true)
3627}
3628
3629// ---------------------------------------------------------------------------
3630// Namespace standards
3631// ---------------------------------------------------------------------------
3632
3633/// Set the standard memory for a namespace, with optional parent for rule layering.
3634pub fn set_namespace_standard(
3635    conn: &Connection,
3636    namespace: &str,
3637    standard_id: &str,
3638    parent: Option<&str>,
3639) -> Result<()> {
3640    // Verify the memory exists (but allow cross-namespace — shared policy)
3641    let _mem = get(conn, standard_id)?
3642        .ok_or_else(|| anyhow::anyhow!("memory not found: {standard_id}"))?;
3643    // Resolve parent: explicit > auto-detect by `-` prefix > none
3644    let resolved_parent = match parent {
3645        Some(p) => {
3646            if p == namespace {
3647                anyhow::bail!("namespace cannot be its own parent");
3648            }
3649            Some(p.to_string())
3650        }
3651        None => auto_detect_parent(conn, namespace),
3652    };
3653    let now = chrono::Utc::now().to_rfc3339();
3654    conn.execute(
3655        "INSERT INTO namespace_meta (namespace, standard_id, updated_at, parent_namespace)
3656         VALUES (?1, ?2, ?3, ?4)
3657         ON CONFLICT(namespace) DO UPDATE SET standard_id = ?2, updated_at = ?3, parent_namespace = ?4",
3658        params![namespace, standard_id, now, resolved_parent],
3659    )?;
3660    Ok(())
3661}
3662
3663/// Auto-detect parent namespace by `-` prefix.
3664/// "ai-memory-tests" → checks "ai-memory" → checks "ai" → first match wins.
3665fn auto_detect_parent(conn: &Connection, namespace: &str) -> Option<String> {
3666    let mut candidate = namespace.to_string();
3667    while let Some(pos) = candidate.rfind('-') {
3668        candidate.truncate(pos);
3669        if candidate.is_empty() {
3670            break;
3671        }
3672        // Check if this candidate has a standard set
3673        if get_namespace_standard(conn, &candidate)
3674            .ok()
3675            .flatten()
3676            .is_some()
3677        {
3678            return Some(candidate);
3679        }
3680    }
3681    None
3682}
3683
3684/// Get the standard memory ID for a namespace.
3685#[allow(clippy::unnecessary_wraps)]
3686pub fn get_namespace_standard(conn: &Connection, namespace: &str) -> Result<Option<String>> {
3687    let result = conn
3688        .query_row(
3689            "SELECT standard_id FROM namespace_meta WHERE namespace = ?1",
3690            params![namespace],
3691            |r| r.get(0),
3692        )
3693        .ok();
3694    Ok(result)
3695}
3696
3697/// Get the parent namespace for a given namespace.
3698pub fn get_namespace_parent(conn: &Connection, namespace: &str) -> Option<String> {
3699    conn.query_row(
3700        "SELECT parent_namespace FROM namespace_meta WHERE namespace = ?1 AND parent_namespace IS NOT NULL",
3701        params![namespace],
3702        |r| r.get(0),
3703    )
3704    .ok()
3705}
3706
3707/// v0.6.2 (S35): read the full `namespace_meta` row for a namespace so the
3708/// caller can fan it out to peers. Returns `None` when no standard is set.
3709/// Mirrors the (`namespace`, `standard_id`, `parent_namespace`, `updated_at`)
3710/// tuple used by `set_namespace_standard`.
3711#[allow(clippy::unnecessary_wraps)]
3712pub fn get_namespace_meta_entry(
3713    conn: &Connection,
3714    namespace: &str,
3715) -> Result<Option<crate::models::NamespaceMetaEntry>> {
3716    let row = conn
3717        .query_row(
3718            "SELECT namespace, standard_id, parent_namespace, updated_at
3719             FROM namespace_meta WHERE namespace = ?1",
3720            params![namespace],
3721            |r| {
3722                Ok(crate::models::NamespaceMetaEntry {
3723                    namespace: r.get(0)?,
3724                    standard_id: r.get(1)?,
3725                    parent_namespace: r.get(2)?,
3726                    updated_at: r.get::<_, Option<String>>(3)?.unwrap_or_default(),
3727                })
3728            },
3729        )
3730        .ok();
3731    Ok(row)
3732}
3733
3734/// Clear the standard for a namespace.
3735pub fn clear_namespace_standard(conn: &Connection, namespace: &str) -> Result<bool> {
3736    let changed = conn.execute(
3737        "DELETE FROM namespace_meta WHERE namespace = ?1",
3738        params![namespace],
3739    )?;
3740    Ok(changed > 0)
3741}
3742
3743// ---------------------------------------------------------------------------
3744// Task 1.9 — governance enforcement + pending_actions CRUD
3745// ---------------------------------------------------------------------------
3746
3747/// Resolve the explicit governance policy for a namespace from its standard
3748/// memory's `metadata.governance`. Returns `None` when no policy is set —
3749/// enforcement is **opt-in**, so namespaces without explicit policy skip
3750/// every governance check (historical behavior preserved). The "default
3751/// policy" (`{ write: Any, promote: Any, delete: Owner, approver: Human }`)
3752/// is surfaced by `get_standard` for display purposes only; it does not
3753/// gate operations.
3754pub fn resolve_governance_policy(conn: &Connection, namespace: &str) -> Option<GovernancePolicy> {
3755    let standard_id = get_namespace_standard(conn, namespace).ok()??;
3756    let mem = get(conn, &standard_id).ok()??;
3757    match GovernancePolicy::from_metadata(&mem.metadata) {
3758        Some(Ok(p)) => Some(p),
3759        _ => None,
3760    }
3761}
3762
3763/// Return true if `agent_id` matches a registered agent in `_agents`.
3764fn is_registered_agent(conn: &Connection, agent_id: &str) -> bool {
3765    let title = format!("agent:{agent_id}");
3766    conn.query_row(
3767        "SELECT 1 FROM memories WHERE namespace = ?1 AND title = ?2",
3768        params![AGENTS_NAMESPACE, &title],
3769        |r| r.get::<_, i64>(0),
3770    )
3771    .is_ok()
3772}
3773
3774/// Evaluate a governance level against caller context.
3775/// - `memory_owner`: the existing memory's `metadata.agent_id` (delete/promote paths).
3776///   Pass `None` for store operations.
3777/// - `namespace_owner`: the `metadata.agent_id` of the namespace's standard memory,
3778///   used as the "owner" for store operations. Resolved once by the caller.
3779fn evaluate_level(
3780    conn: &Connection,
3781    level: &GovernanceLevel,
3782    agent_id: &str,
3783    memory_owner: Option<&str>,
3784    namespace_owner: Option<&str>,
3785) -> GovernanceDecision {
3786    match level {
3787        GovernanceLevel::Any => GovernanceDecision::Allow,
3788        GovernanceLevel::Registered => {
3789            if is_registered_agent(conn, agent_id) {
3790                GovernanceDecision::Allow
3791            } else {
3792                GovernanceDecision::Deny(format!(
3793                    "governance: caller '{agent_id}' is not a registered agent"
3794                ))
3795            }
3796        }
3797        GovernanceLevel::Owner => {
3798            let owner = memory_owner.or(namespace_owner);
3799            match owner {
3800                Some(o) if o == agent_id => GovernanceDecision::Allow,
3801                Some(o) => GovernanceDecision::Deny(format!(
3802                    "governance: caller '{agent_id}' is not the owner ('{o}')"
3803                )),
3804                None => GovernanceDecision::Deny(
3805                    "governance: owner-level action has no resolvable owner".into(),
3806                ),
3807            }
3808        }
3809        GovernanceLevel::Approve => {
3810            // Caller translates this into a queued pending_action — the enforcement
3811            // helpers below own the queueing so the db layer is the single source
3812            // of truth for pending ids.
3813            GovernanceDecision::Pending(String::new())
3814        }
3815    }
3816}
3817
3818/// Resolve the namespace-owner (`metadata.agent_id` of the namespace's
3819/// standard memory) used for `Owner`-level store checks.
3820fn namespace_owner(conn: &Connection, namespace: &str) -> Option<String> {
3821    let standard_id = get_namespace_standard(conn, namespace).ok().flatten()?;
3822    let mem = get(conn, &standard_id).ok().flatten()?;
3823    mem.metadata
3824        .get("agent_id")
3825        .and_then(|v| v.as_str())
3826        .map(str::to_string)
3827}
3828
3829/// Enforce governance for a `GovernedAction`. On [`GovernanceDecision::Pending`],
3830/// a row is inserted into `pending_actions` and the returned `pending_id` is
3831/// embedded in the decision.
3832pub fn enforce_governance(
3833    conn: &Connection,
3834    action: GovernedAction,
3835    namespace: &str,
3836    agent_id: &str,
3837    memory_id: Option<&str>,
3838    memory_owner: Option<&str>,
3839    payload: &serde_json::Value,
3840) -> Result<GovernanceDecision> {
3841    // Opt-in enforcement: namespaces without an explicit policy are unaffected.
3842    let Some(policy) = resolve_governance_policy(conn, namespace) else {
3843        return Ok(GovernanceDecision::Allow);
3844    };
3845    let level = match action {
3846        GovernedAction::Store => &policy.write,
3847        GovernedAction::Delete => &policy.delete,
3848        GovernedAction::Promote => &policy.promote,
3849    };
3850    let ns_owner = if matches!(action, GovernedAction::Store) {
3851        namespace_owner(conn, namespace)
3852    } else {
3853        None
3854    };
3855
3856    let decision = evaluate_level(conn, level, agent_id, memory_owner, ns_owner.as_deref());
3857    if let GovernanceDecision::Pending(_) = decision {
3858        let pending_id =
3859            queue_pending_action(conn, action, namespace, memory_id, agent_id, payload)?;
3860        return Ok(GovernanceDecision::Pending(pending_id));
3861    }
3862    Ok(decision)
3863}
3864
3865/// Insert a `pending_actions` row and return its id.
3866pub fn queue_pending_action(
3867    conn: &Connection,
3868    action: GovernedAction,
3869    namespace: &str,
3870    memory_id: Option<&str>,
3871    requested_by: &str,
3872    payload: &serde_json::Value,
3873) -> Result<String> {
3874    let id = uuid::Uuid::new_v4().to_string();
3875    let now = Utc::now().to_rfc3339();
3876    let payload_json = serde_json::to_string(payload)?;
3877    conn.execute(
3878        "INSERT INTO pending_actions (id, action_type, memory_id, namespace, payload, requested_by, requested_at, status)
3879         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 'pending')",
3880        params![
3881            id,
3882            action.as_str(),
3883            memory_id,
3884            namespace,
3885            payload_json,
3886            requested_by,
3887            now,
3888        ],
3889    )?;
3890    Ok(id)
3891}
3892
3893/// v0.6.2 (S34): upsert a `pending_actions` row from a canonical `PendingAction`
3894/// struct — used by `sync_push` to apply a peer-originated pending row so
3895/// governance state is cluster-consistent. Preserves `approvals` and
3896/// decision fields verbatim so re-plays converge. Uses `INSERT ... ON
3897/// CONFLICT(id) DO UPDATE` because the originator's id is stable across
3898/// peers (unlike `queue_pending_action` which mints a fresh UUID per
3899/// queue call).
3900pub fn upsert_pending_action(conn: &Connection, pa: &PendingAction) -> Result<()> {
3901    let payload_json = serde_json::to_string(&pa.payload)?;
3902    let approvals_json = serde_json::to_string(&pa.approvals)?;
3903    conn.execute(
3904        "INSERT INTO pending_actions
3905         (id, action_type, memory_id, namespace, payload, requested_by,
3906          requested_at, status, decided_by, decided_at, approvals)
3907         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3908         ON CONFLICT(id) DO UPDATE SET
3909            action_type  = excluded.action_type,
3910            memory_id    = excluded.memory_id,
3911            namespace    = excluded.namespace,
3912            payload      = excluded.payload,
3913            requested_by = excluded.requested_by,
3914            requested_at = excluded.requested_at,
3915            status       = excluded.status,
3916            decided_by   = excluded.decided_by,
3917            decided_at   = excluded.decided_at,
3918            approvals    = excluded.approvals",
3919        params![
3920            pa.id,
3921            pa.action_type,
3922            pa.memory_id,
3923            pa.namespace,
3924            payload_json,
3925            pa.requested_by,
3926            pa.requested_at,
3927            pa.status,
3928            pa.decided_by,
3929            pa.decided_at,
3930            approvals_json,
3931        ],
3932    )?;
3933    Ok(())
3934}
3935
3936pub fn list_pending_actions(
3937    conn: &Connection,
3938    status: Option<&str>,
3939    limit: usize,
3940) -> Result<Vec<PendingAction>> {
3941    let mut stmt = conn.prepare(
3942        "SELECT id, action_type, memory_id, namespace, payload, requested_by,
3943                requested_at, status, decided_by, decided_at, approvals
3944         FROM pending_actions
3945         WHERE (?1 IS NULL OR status = ?1)
3946         ORDER BY requested_at DESC
3947         LIMIT ?2",
3948    )?;
3949    let rows = stmt.query_map(params![status, limit], |row| {
3950        let payload_str: String = row.get(4)?;
3951        let payload: serde_json::Value =
3952            serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null);
3953        let approvals_str: String = row.get(10)?;
3954        let approvals: Vec<Approval> = serde_json::from_str(&approvals_str).unwrap_or_default();
3955        Ok(PendingAction {
3956            id: row.get(0)?,
3957            action_type: row.get(1)?,
3958            memory_id: row.get(2)?,
3959            namespace: row.get(3)?,
3960            payload,
3961            requested_by: row.get(5)?,
3962            requested_at: row.get(6)?,
3963            status: row.get(7)?,
3964            decided_by: row.get(8)?,
3965            decided_at: row.get(9)?,
3966            approvals,
3967        })
3968    })?;
3969    rows.collect::<rusqlite::Result<Vec<_>>>()
3970        .map_err(Into::into)
3971}
3972
3973pub fn get_pending_action(conn: &Connection, id: &str) -> Result<Option<PendingAction>> {
3974    let row = conn.query_row(
3975        "SELECT id, action_type, memory_id, namespace, payload, requested_by,
3976                requested_at, status, decided_by, decided_at, approvals
3977         FROM pending_actions WHERE id = ?1",
3978        params![id],
3979        |row| {
3980            let payload_str: String = row.get(4)?;
3981            let payload: serde_json::Value =
3982                serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null);
3983            let approvals_str: String = row.get(10)?;
3984            let approvals: Vec<Approval> = serde_json::from_str(&approvals_str).unwrap_or_default();
3985            Ok(PendingAction {
3986                id: row.get(0)?,
3987                action_type: row.get(1)?,
3988                memory_id: row.get(2)?,
3989                namespace: row.get(3)?,
3990                payload,
3991                requested_by: row.get(5)?,
3992                requested_at: row.get(6)?,
3993                status: row.get(7)?,
3994                decided_by: row.get(8)?,
3995                decided_at: row.get(9)?,
3996                approvals,
3997            })
3998        },
3999    );
4000    match row {
4001        Ok(p) => Ok(Some(p)),
4002        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
4003        Err(e) => Err(e.into()),
4004    }
4005}
4006
4007/// Mark a pending action as approved or rejected. Returns true on status
4008/// transition. Does NOT execute the action itself — the caller replays
4009/// the payload on approval (the db layer doesn't know how to execute
4010/// cross-interface write semantics).
4011pub fn decide_pending_action(
4012    conn: &Connection,
4013    id: &str,
4014    approve: bool,
4015    decided_by: &str,
4016) -> Result<bool> {
4017    let new_status = if approve { "approved" } else { "rejected" };
4018    let now = Utc::now().to_rfc3339();
4019    let updated = conn.execute(
4020        "UPDATE pending_actions SET status = ?1, decided_by = ?2, decided_at = ?3
4021         WHERE id = ?4 AND status = 'pending'",
4022        params![new_status, decided_by, now, id],
4023    )?;
4024    Ok(updated > 0)
4025}
4026
4027/// Task 1.10 — outcome of an approver-aware approve call.
4028#[derive(Debug, Clone, PartialEq, Eq)]
4029pub enum ApproveOutcome {
4030    /// Approver check failed; policy identifies the reason.
4031    Rejected(String),
4032    /// Consensus quorum not yet met; vote recorded.
4033    Pending { votes: usize, quorum: u32 },
4034    /// Fully approved (Human single-step, matching Agent, or consensus
4035    /// threshold met). Caller may now replay the payload via
4036    /// `execute_pending_action`.
4037    Approved,
4038}
4039
4040/// Task 1.10 — approver-type aware approve. Enforces the
4041/// `metadata.governance.approver` of the pending action's namespace.
4042pub fn approve_with_approver_type(
4043    conn: &Connection,
4044    pending_id: &str,
4045    approver_agent_id: &str,
4046) -> Result<ApproveOutcome> {
4047    let Some(pa) = get_pending_action(conn, pending_id)? else {
4048        return Ok(ApproveOutcome::Rejected(format!(
4049            "pending action not found: {pending_id}"
4050        )));
4051    };
4052    if pa.status != "pending" {
4053        return Ok(ApproveOutcome::Rejected(format!(
4054            "already decided: status={}",
4055            pa.status
4056        )));
4057    }
4058    // Resolve the namespace's approver type. If no policy, default to Human —
4059    // which accepts any approval (back-compat with 1.9 callers).
4060    let approver =
4061        resolve_governance_policy(conn, &pa.namespace).map_or(ApproverType::Human, |p| p.approver);
4062
4063    match approver {
4064        ApproverType::Human => {
4065            let ok = decide_pending_action(conn, pending_id, true, approver_agent_id)?;
4066            if ok {
4067                Ok(ApproveOutcome::Approved)
4068            } else {
4069                Ok(ApproveOutcome::Rejected("decision write failed".into()))
4070            }
4071        }
4072        ApproverType::Agent(required) => {
4073            if approver_agent_id != required {
4074                return Ok(ApproveOutcome::Rejected(format!(
4075                    "designated approver is '{required}'; got '{approver_agent_id}'"
4076                )));
4077            }
4078            let ok = decide_pending_action(conn, pending_id, true, approver_agent_id)?;
4079            if ok {
4080                Ok(ApproveOutcome::Approved)
4081            } else {
4082                Ok(ApproveOutcome::Rejected("decision write failed".into()))
4083            }
4084        }
4085        ApproverType::Consensus(quorum) => {
4086            // Issue #216: a single caller could previously satisfy any
4087            // Consensus(n) quorum by varying the unauthenticated `agent_id`
4088            // (`alice`, `bob`, `Alice`/`alice` were three distinct votes).
4089            // Two changes harden the path:
4090            //   1. Require each voter to be a registered agent — raises the
4091            //      bar from "claim any string" to "operator pre-registered
4092            //      this id". Combined with auth on the approve endpoint
4093            //      (operator-deployed) this gives a real multi-party gate.
4094            //   2. Canonicalize the agent_id to lowercase for both the
4095            //      duplicate-vote check and storage so case-variants of the
4096            //      same id collapse to a single vote.
4097            if !is_registered_agent(conn, approver_agent_id) {
4098                return Ok(ApproveOutcome::Rejected(format!(
4099                    "consensus voter '{approver_agent_id}' is not a registered agent"
4100                )));
4101            }
4102            let canonical_id = approver_agent_id.to_ascii_lowercase();
4103            let mut approvals = pa.approvals.clone();
4104            if approvals
4105                .iter()
4106                .any(|a| a.agent_id.eq_ignore_ascii_case(&canonical_id))
4107            {
4108                return Ok(ApproveOutcome::Pending {
4109                    votes: approvals.len(),
4110                    quorum,
4111                });
4112            }
4113            approvals.push(Approval {
4114                agent_id: canonical_id.clone(),
4115                approved_at: Utc::now().to_rfc3339(),
4116            });
4117            let approvals_json = serde_json::to_string(&approvals)?;
4118            conn.execute(
4119                "UPDATE pending_actions SET approvals = ?1 WHERE id = ?2 AND status = 'pending'",
4120                params![approvals_json, pending_id],
4121            )?;
4122            let votes = approvals.len();
4123            if u32::try_from(votes).unwrap_or(u32::MAX) >= quorum {
4124                // Threshold met — transition status so the caller can replay.
4125                let ok = decide_pending_action(conn, pending_id, true, &canonical_id)?;
4126                if ok {
4127                    return Ok(ApproveOutcome::Approved);
4128                }
4129                return Ok(ApproveOutcome::Rejected(
4130                    "decision write failed at consensus threshold".into(),
4131                ));
4132            }
4133            Ok(ApproveOutcome::Pending { votes, quorum })
4134        }
4135    }
4136}
4137
4138/// Task 1.10 — Execute an approved pending action's payload. Callers invoke
4139/// this after `approve_with_approver_type` returns `Approved`. Returns the
4140/// affected memory id (new id for store, existing id for delete/promote).
4141pub fn execute_pending_action(conn: &Connection, pending_id: &str) -> Result<Option<String>> {
4142    let Some(pa) = get_pending_action(conn, pending_id)? else {
4143        anyhow::bail!("pending action not found: {pending_id}");
4144    };
4145    if pa.status != "approved" {
4146        anyhow::bail!("cannot execute non-approved action (status={})", pa.status);
4147    }
4148    match pa.action_type.as_str() {
4149        "store" => {
4150            let mut mem: Memory = serde_json::from_value(pa.payload.clone())
4151                .map_err(|e| anyhow::anyhow!("invalid store payload: {e}"))?;
4152            // Stamp fresh id + timestamps so the execution is idempotent on replay.
4153            mem.id = uuid::Uuid::new_v4().to_string();
4154            let now = Utc::now().to_rfc3339();
4155            mem.created_at.clone_from(&now);
4156            mem.updated_at = now;
4157            mem.access_count = 0;
4158            let actual_id = insert(conn, &mem)?;
4159            Ok(Some(actual_id))
4160        }
4161        "delete" => {
4162            if let Some(mid) = pa.memory_id.clone() {
4163                delete(conn, &mid)?;
4164                Ok(Some(mid))
4165            } else {
4166                Ok(None)
4167            }
4168        }
4169        "promote" => {
4170            if let Some(mid) = pa.memory_id.clone() {
4171                if let Some(to_ns) = pa.payload.get("to_namespace").and_then(|v| v.as_str()) {
4172                    // Vertical promotion to ancestor.
4173                    let clone_id = promote_to_namespace(conn, &mid, to_ns)?;
4174                    return Ok(Some(clone_id));
4175                }
4176                // Tier bump to long + clear expiry.
4177                let (_found, _changed) = update(
4178                    conn,
4179                    &mid,
4180                    None,
4181                    None,
4182                    Some(&Tier::Long),
4183                    None,
4184                    None,
4185                    None,
4186                    None,
4187                    Some(""),
4188                    None,
4189                )?;
4190                Ok(Some(mid))
4191            } else {
4192                Ok(None)
4193            }
4194        }
4195        other => anyhow::bail!("unknown action_type: {other}"),
4196    }
4197}
4198
4199/// Check if a memory ID is a namespace standard (used by consolidate to warn).
4200pub fn is_namespace_standard(conn: &Connection, id: &str) -> bool {
4201    conn.query_row(
4202        "SELECT COUNT(*) FROM namespace_meta WHERE standard_id = ?1",
4203        params![id],
4204        |r| r.get::<_, i64>(0),
4205    )
4206    .unwrap_or(0)
4207        > 0
4208}
4209
4210/// v0.6.3 (capabilities schema v2): count namespace standards whose
4211/// `metadata.governance` is non-null. A "rule" here means a namespace
4212/// has an explicit governance policy attached to its standard memory.
4213/// The count is a transparent passthrough — the full permission system
4214/// arrives in v0.7 (arch-enhancement-spec §3).
4215pub fn count_active_governance_rules(conn: &Connection) -> Result<usize> {
4216    let count: i64 = conn
4217        .query_row(
4218            "SELECT COUNT(*) FROM memories m
4219             INNER JOIN namespace_meta nm ON nm.standard_id = m.id
4220             WHERE json_extract(m.metadata, '$.governance') IS NOT NULL",
4221            [],
4222            |r| r.get(0),
4223        )
4224        .unwrap_or(0);
4225    Ok(usize::try_from(count.max(0)).unwrap_or(0))
4226}
4227
4228/// v0.6.3 (capabilities schema v2): count rows in the `subscriptions`
4229/// table. Used by `handle_capabilities` as a proxy for "registered
4230/// hooks" — the hook pipeline itself is v0.7 Bucket 0 work.
4231pub fn count_subscriptions(conn: &Connection) -> Result<usize> {
4232    let count: i64 = conn
4233        .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
4234        .unwrap_or(0);
4235    Ok(usize::try_from(count.max(0)).unwrap_or(0))
4236}
4237
4238/// v0.6.3 (capabilities schema v2): count `pending_actions` rows whose
4239/// `status` matches the predicate. Used by `handle_capabilities` to
4240/// surface live approval queue depth.
4241pub fn count_pending_actions_by_status(conn: &Connection, status: &str) -> Result<usize> {
4242    let count: i64 = conn
4243        .query_row(
4244            "SELECT COUNT(*) FROM pending_actions WHERE status = ?1",
4245            params![status],
4246            |r| r.get(0),
4247        )
4248        .unwrap_or(0);
4249    Ok(usize::try_from(count.max(0)).unwrap_or(0))
4250}
4251
4252#[cfg(test)]
4253mod tests {
4254    use super::*;
4255    use crate::models::{MID_TTL_EXTEND_SECS, Memory, SHORT_TTL_EXTEND_SECS, Tier};
4256
4257    fn test_db() -> Connection {
4258        open(std::path::Path::new(":memory:")).unwrap()
4259    }
4260
4261    fn make_memory(title: &str, ns: &str, tier: Tier, priority: i32) -> Memory {
4262        let now = chrono::Utc::now().to_rfc3339();
4263        Memory {
4264            id: uuid::Uuid::new_v4().to_string(),
4265            tier: tier.clone(),
4266            namespace: ns.to_string(),
4267            title: title.to_string(),
4268            content: format!("Content for {title}"),
4269            tags: vec![],
4270            priority,
4271            confidence: 1.0,
4272            source: "test".to_string(),
4273            access_count: 0,
4274            created_at: now.clone(),
4275            updated_at: now,
4276            last_accessed_at: None,
4277            expires_at: tier
4278                .default_ttl_secs()
4279                .map(|s| (chrono::Utc::now() + chrono::Duration::seconds(s)).to_rfc3339()),
4280            metadata: serde_json::json!({}),
4281        }
4282    }
4283
4284    #[test]
4285    fn open_creates_schema() {
4286        let conn = test_db();
4287        let count: i64 = conn
4288            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
4289            .unwrap();
4290        assert_eq!(count, 0);
4291    }
4292
4293    #[test]
4294    fn insert_and_get() {
4295        let conn = test_db();
4296        let mem = make_memory("Test insert", "test", Tier::Long, 5);
4297        let id = insert(&conn, &mem).unwrap();
4298        let got = get(&conn, &id).unwrap().unwrap();
4299        assert_eq!(got.title, "Test insert");
4300        assert_eq!(got.namespace, "test");
4301        assert_eq!(got.priority, 5);
4302    }
4303
4304    #[test]
4305    fn get_nonexistent() {
4306        let conn = test_db();
4307        let got = get(&conn, "nonexistent-id").unwrap();
4308        assert!(got.is_none());
4309    }
4310
4311    #[test]
4312    fn update_partial_fields() {
4313        let conn = test_db();
4314        let mem = make_memory("Original", "test", Tier::Mid, 5);
4315        let id = insert(&conn, &mem).unwrap();
4316
4317        let (found, content_changed) = update(
4318            &conn,
4319            &id,
4320            Some("Updated Title"),
4321            None,
4322            None,
4323            None,
4324            None,
4325            Some(9),
4326            None,
4327            None,
4328            None,
4329        )
4330        .unwrap();
4331        assert!(found);
4332        assert!(content_changed); // title changed
4333
4334        let got = get(&conn, &id).unwrap().unwrap();
4335        assert_eq!(got.title, "Updated Title");
4336        assert_eq!(got.priority, 9);
4337        assert_eq!(got.content, mem.content); // unchanged
4338    }
4339
4340    #[test]
4341    fn update_content_changed_flag() {
4342        let conn = test_db();
4343        let mem = make_memory("Stable", "test", Tier::Mid, 5);
4344        let id = insert(&conn, &mem).unwrap();
4345
4346        // Updating only priority — content_changed should be false
4347        let (found, content_changed) = update(
4348            &conn,
4349            &id,
4350            None,
4351            None,
4352            None,
4353            None,
4354            None,
4355            Some(8),
4356            None,
4357            None,
4358            None,
4359        )
4360        .unwrap();
4361        assert!(found);
4362        assert!(!content_changed);
4363
4364        // Updating content — content_changed should be true
4365        let (found, content_changed) = update(
4366            &conn,
4367            &id,
4368            None,
4369            Some("New content"),
4370            None,
4371            None,
4372            None,
4373            None,
4374            None,
4375            None,
4376            None,
4377        )
4378        .unwrap();
4379        assert!(found);
4380        assert!(content_changed);
4381    }
4382
4383    #[test]
4384    fn update_nonexistent_returns_false() {
4385        let conn = test_db();
4386        let (found, _) = update(
4387            &conn,
4388            "bad-id",
4389            Some("New"),
4390            None,
4391            None,
4392            None,
4393            None,
4394            None,
4395            None,
4396            None,
4397            None,
4398        )
4399        .unwrap();
4400        assert!(!found);
4401    }
4402
4403    #[test]
4404    fn update_tier_downgrade_protection() {
4405        let conn = test_db();
4406        // Long-tier memory should never be downgraded
4407        let mem = make_memory("Permanent", "test", Tier::Long, 9);
4408        let id = insert(&conn, &mem).unwrap();
4409
4410        let (found, _) = update(
4411            &conn,
4412            &id,
4413            None,
4414            None,
4415            Some(&Tier::Short),
4416            None,
4417            None,
4418            None,
4419            None,
4420            None,
4421            None,
4422        )
4423        .unwrap();
4424        assert!(found);
4425        let got = get(&conn, &id).unwrap().unwrap();
4426        assert_eq!(got.tier, Tier::Long); // still long
4427
4428        // Mid-tier should not downgrade to short
4429        let mem2 = make_memory("Working", "test", Tier::Mid, 5);
4430        let id2 = insert(&conn, &mem2).unwrap();
4431
4432        let (found, _) = update(
4433            &conn,
4434            &id2,
4435            None,
4436            None,
4437            Some(&Tier::Short),
4438            None,
4439            None,
4440            None,
4441            None,
4442            None,
4443            None,
4444        )
4445        .unwrap();
4446        assert!(found);
4447        let got2 = get(&conn, &id2).unwrap().unwrap();
4448        assert_eq!(got2.tier, Tier::Mid); // still mid
4449
4450        // Mid-tier CAN upgrade to long
4451        let (found, _) = update(
4452            &conn,
4453            &id2,
4454            None,
4455            None,
4456            Some(&Tier::Long),
4457            None,
4458            None,
4459            None,
4460            None,
4461            None,
4462            None,
4463        )
4464        .unwrap();
4465        assert!(found);
4466        let got3 = get(&conn, &id2).unwrap().unwrap();
4467        assert_eq!(got3.tier, Tier::Long); // upgraded
4468    }
4469
4470    #[test]
4471    fn update_title_collision_returns_error() {
4472        let conn = test_db();
4473        let mem_a = make_memory("Alpha", "test", Tier::Mid, 5);
4474        let mem_b = make_memory("Beta", "test", Tier::Mid, 5);
4475        let id_a = insert(&conn, &mem_a).unwrap();
4476        let _id_b = insert(&conn, &mem_b).unwrap();
4477
4478        // Updating Alpha's title to "Beta" in same namespace should fail
4479        let result = update(
4480            &conn,
4481            &id_a,
4482            Some("Beta"),
4483            None,
4484            None,
4485            None,
4486            None,
4487            None,
4488            None,
4489            None,
4490            None,
4491        );
4492        assert!(result.is_err());
4493        let err = result.unwrap_err().to_string();
4494        assert!(err.contains("already exists in namespace"));
4495    }
4496
4497    #[test]
4498    fn delete_existing() {
4499        let conn = test_db();
4500        let mem = make_memory("To delete", "test", Tier::Short, 3);
4501        let id = insert(&conn, &mem).unwrap();
4502        assert!(delete(&conn, &id).unwrap());
4503        assert!(get(&conn, &id).unwrap().is_none());
4504    }
4505
4506    #[test]
4507    fn delete_nonexistent() {
4508        let conn = test_db();
4509        assert!(!delete(&conn, "bad-id").unwrap());
4510    }
4511
4512    #[test]
4513    fn list_with_namespace_filter() {
4514        let conn = test_db();
4515        insert(&conn, &make_memory("A", "ns1", Tier::Long, 5)).unwrap();
4516        insert(&conn, &make_memory("B", "ns2", Tier::Long, 5)).unwrap();
4517        insert(&conn, &make_memory("C", "ns1", Tier::Long, 5)).unwrap();
4518
4519        let results = list(
4520            &conn,
4521            Some("ns1"),
4522            None,
4523            100,
4524            0,
4525            None,
4526            None,
4527            None,
4528            None,
4529            None,
4530        )
4531        .unwrap();
4532        assert_eq!(results.len(), 2);
4533    }
4534
4535    #[test]
4536    fn list_with_tier_filter() {
4537        let conn = test_db();
4538        insert(&conn, &make_memory("Long", "test", Tier::Long, 5)).unwrap();
4539        insert(&conn, &make_memory("Mid", "test", Tier::Mid, 5)).unwrap();
4540
4541        let results = list(
4542            &conn,
4543            None,
4544            Some(&Tier::Long),
4545            100,
4546            0,
4547            None,
4548            None,
4549            None,
4550            None,
4551            None,
4552        )
4553        .unwrap();
4554        assert_eq!(results.len(), 1);
4555        assert_eq!(results[0].title, "Long");
4556    }
4557
4558    #[test]
4559    fn list_with_limit() {
4560        let conn = test_db();
4561        for i in 0..5 {
4562            insert(
4563                &conn,
4564                &make_memory(&format!("Mem {i}"), "test", Tier::Long, 5),
4565            )
4566            .unwrap();
4567        }
4568        let results = list(&conn, None, None, 3, 0, None, None, None, None, None).unwrap();
4569        assert_eq!(results.len(), 3);
4570    }
4571
4572    #[test]
4573    fn search_keyword_match() {
4574        let conn = test_db();
4575        insert(
4576            &conn,
4577            &make_memory("PostgreSQL config", "test", Tier::Long, 5),
4578        )
4579        .unwrap();
4580        insert(&conn, &make_memory("Redis cache", "test", Tier::Long, 5)).unwrap();
4581
4582        let results = search(
4583            &conn,
4584            "PostgreSQL",
4585            None,
4586            None,
4587            10,
4588            None,
4589            None,
4590            None,
4591            None,
4592            None,
4593            None,
4594        )
4595        .unwrap();
4596        assert_eq!(results.len(), 1);
4597        assert!(results[0].title.contains("PostgreSQL"));
4598    }
4599
4600    #[test]
4601    fn search_no_match() {
4602        let conn = test_db();
4603        insert(&conn, &make_memory("PostgreSQL", "test", Tier::Long, 5)).unwrap();
4604        let results = search(
4605            &conn,
4606            "nonexistent_term_xyz",
4607            None,
4608            None,
4609            10,
4610            None,
4611            None,
4612            None,
4613            None,
4614            None,
4615            None,
4616        )
4617        .unwrap();
4618        assert_eq!(results.len(), 0);
4619    }
4620
4621    #[test]
4622    fn recall_returns_scored() {
4623        let conn = test_db();
4624        insert(
4625            &conn,
4626            &make_memory("Rust programming language", "test", Tier::Long, 8),
4627        )
4628        .unwrap();
4629        insert(
4630            &conn,
4631            &make_memory("Python scripting", "test", Tier::Long, 5),
4632        )
4633        .unwrap();
4634
4635        let (results, _tokens) = recall(
4636            &conn,
4637            "Rust programming",
4638            None,
4639            10,
4640            None,
4641            None,
4642            None,
4643            SHORT_TTL_EXTEND_SECS,
4644            MID_TTL_EXTEND_SECS,
4645            None,
4646            None,
4647        )
4648        .unwrap();
4649        assert!(!results.is_empty());
4650        // Score should be present
4651        let (mem, score) = &results[0];
4652        assert!(mem.title.contains("Rust"));
4653        assert!(*score > 0.0);
4654    }
4655
4656    #[test]
4657    fn recall_empty_context() {
4658        let conn = test_db();
4659        insert(&conn, &make_memory("Test", "test", Tier::Long, 5)).unwrap();
4660        // Empty context should not crash
4661        let results = recall(
4662            &conn,
4663            "",
4664            None,
4665            10,
4666            None,
4667            None,
4668            None,
4669            SHORT_TTL_EXTEND_SECS,
4670            MID_TTL_EXTEND_SECS,
4671            None,
4672            None,
4673        );
4674        // May return empty or error, both acceptable
4675        assert!(results.is_ok() || results.is_err());
4676    }
4677
4678    #[test]
4679    fn touch_increments_access_count() {
4680        let conn = test_db();
4681        let mem = make_memory("Touchable", "test", Tier::Mid, 5);
4682        let id = insert(&conn, &mem).unwrap();
4683        assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 0);
4684
4685        touch(&conn, &id, SHORT_TTL_EXTEND_SECS, MID_TTL_EXTEND_SECS).unwrap();
4686        assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 1);
4687
4688        touch(&conn, &id, SHORT_TTL_EXTEND_SECS, MID_TTL_EXTEND_SECS).unwrap();
4689        assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 2);
4690    }
4691
4692    #[test]
4693    fn find_contradictions_similar_titles() {
4694        let conn = test_db();
4695        insert(
4696            &conn,
4697            &make_memory("Database is PostgreSQL", "infra", Tier::Long, 8),
4698        )
4699        .unwrap();
4700        insert(
4701            &conn,
4702            &make_memory("Database is MySQL", "infra", Tier::Long, 5),
4703        )
4704        .unwrap();
4705
4706        let contradictions = find_contradictions(&conn, "Database is PostgreSQL", "infra").unwrap();
4707        assert!(!contradictions.is_empty());
4708    }
4709
4710    #[test]
4711    fn create_and_get_links() {
4712        let conn = test_db();
4713        let id1 = insert(&conn, &make_memory("Memory A", "test", Tier::Long, 5)).unwrap();
4714        let id2 = insert(&conn, &make_memory("Memory B", "test", Tier::Long, 5)).unwrap();
4715
4716        create_link(&conn, &id1, &id2, "related_to").unwrap();
4717        let links = get_links(&conn, &id1).unwrap();
4718        assert_eq!(links.len(), 1);
4719        assert_eq!(links[0].relation, "related_to");
4720    }
4721
4722    #[test]
4723    fn consolidate_merges_memories() {
4724        let conn = test_db();
4725        let id1 = insert(&conn, &make_memory("Part 1", "test", Tier::Mid, 5)).unwrap();
4726        let id2 = insert(&conn, &make_memory("Part 2", "test", Tier::Mid, 5)).unwrap();
4727
4728        let new_id = consolidate(
4729            &conn,
4730            &[id1.clone(), id2.clone()],
4731            "Combined",
4732            "Part 1 + Part 2",
4733            "test",
4734            &Tier::Long,
4735            "test",
4736            "test-consolidator",
4737        )
4738        .unwrap();
4739        // Original memories should be deleted
4740        assert!(get(&conn, &id1).unwrap().is_none());
4741        assert!(get(&conn, &id2).unwrap().is_none());
4742        // New memory should exist
4743        let combined = get(&conn, &new_id).unwrap().unwrap();
4744        assert_eq!(combined.title, "Combined");
4745        assert_eq!(combined.tier, Tier::Long);
4746    }
4747
4748    #[test]
4749    fn stats_counts() {
4750        let conn = test_db();
4751        let path = std::path::Path::new(":memory:");
4752        insert(&conn, &make_memory("A", "ns1", Tier::Long, 5)).unwrap();
4753        insert(&conn, &make_memory("B", "ns1", Tier::Mid, 5)).unwrap();
4754        insert(&conn, &make_memory("C", "ns2", Tier::Short, 5)).unwrap();
4755
4756        let s = stats(&conn, path).unwrap();
4757        assert_eq!(s.total, 3);
4758    }
4759
4760    #[test]
4761    fn gc_removes_expired() {
4762        let conn = test_db();
4763        let mut mem = make_memory("Expired", "test", Tier::Short, 5);
4764        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string()); // past
4765        insert(&conn, &mem).unwrap();
4766
4767        let removed = gc(&conn, false).unwrap();
4768        assert_eq!(removed, 1);
4769    }
4770
4771    #[test]
4772    fn gc_preserves_long_term() {
4773        let conn = test_db();
4774        insert(&conn, &make_memory("Permanent", "test", Tier::Long, 5)).unwrap();
4775        let removed = gc(&conn, false).unwrap();
4776        assert_eq!(removed, 0);
4777    }
4778
4779    #[test]
4780    fn gc_archives_before_delete() {
4781        let conn = test_db();
4782        let mut mem = make_memory("Archivable", "test", Tier::Short, 5);
4783        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4784        insert(&conn, &mem).unwrap();
4785
4786        let removed = gc(&conn, true).unwrap();
4787        assert_eq!(removed, 1);
4788
4789        // Should be in archive
4790        let archived = list_archived(&conn, None, 10, 0).unwrap();
4791        assert_eq!(archived.len(), 1);
4792        assert_eq!(archived[0]["title"], "Archivable");
4793        assert_eq!(archived[0]["archive_reason"], "ttl_expired");
4794    }
4795
4796    #[test]
4797    fn restore_archived_memory() {
4798        let conn = test_db();
4799        let mut mem = make_memory("Restorable", "test", Tier::Short, 5);
4800        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4801        let id = insert(&conn, &mem).unwrap();
4802
4803        gc(&conn, true).unwrap();
4804        assert!(get(&conn, &id).unwrap().is_none()); // gone from active
4805
4806        let restored = restore_archived(&conn, &id).unwrap();
4807        assert!(restored);
4808
4809        let got = get(&conn, &id).unwrap().unwrap();
4810        assert_eq!(got.title, "Restorable");
4811        assert!(got.expires_at.is_none()); // restored without expiry
4812    }
4813
4814    #[test]
4815    fn purge_archive_removes_all() {
4816        let conn = test_db();
4817        let mut mem = make_memory("Purgeable", "test", Tier::Short, 5);
4818        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4819        insert(&conn, &mem).unwrap();
4820        gc(&conn, true).unwrap();
4821
4822        let purged = purge_archive(&conn, None).unwrap();
4823        assert_eq!(purged, 1);
4824        assert_eq!(list_archived(&conn, None, 10, 0).unwrap().len(), 0);
4825    }
4826
4827    #[test]
4828    fn purge_archive_rejects_negative_days() {
4829        let conn = test_db();
4830        let result = purge_archive(&conn, Some(-1));
4831        assert!(result.is_err());
4832        assert!(result.unwrap_err().to_string().contains("non-negative"));
4833    }
4834
4835    #[test]
4836    fn restore_rejects_active_id_collision() {
4837        let conn = test_db();
4838        let mut mem = make_memory("Collision Test", "test", Tier::Short, 5);
4839        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4840        let id = insert(&conn, &mem).unwrap();
4841
4842        // Archive it via GC
4843        gc(&conn, true).unwrap();
4844        assert!(get(&conn, &id).unwrap().is_none());
4845
4846        // Manually insert a memory with the SAME id but different title into active table
4847        conn.execute(
4848            "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at)
4849             VALUES (?1, 'long', 'test', 'Blocker Title', 'blocks restore', '[]', 5, 1.0, 'test', 0, datetime('now'), datetime('now'))",
4850            rusqlite::params![id],
4851        ).unwrap();
4852
4853        // Restore should fail because id exists in active table
4854        let result = restore_archived(&conn, &id);
4855        assert!(result.is_err());
4856        assert!(
4857            result
4858                .unwrap_err()
4859                .to_string()
4860                .contains("already exists in active table")
4861        );
4862    }
4863
4864    #[test]
4865    fn archive_stats_counts() {
4866        let conn = test_db();
4867        let mut m1 = make_memory("Stats A", "ns1", Tier::Short, 5);
4868        m1.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4869        let mut m2 = make_memory("Stats B", "ns1", Tier::Short, 5);
4870        m2.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
4871        insert(&conn, &m1).unwrap();
4872        insert(&conn, &m2).unwrap();
4873        gc(&conn, true).unwrap();
4874
4875        let stats = archive_stats(&conn).unwrap();
4876        assert_eq!(stats["archived_total"], 2);
4877    }
4878
4879    #[test]
4880    fn archive_memory_moves_live_row_to_archive() {
4881        // S29 — explicit archive endpoint must move the row out of
4882        // `memories` and into `archived_memories` with the caller-supplied
4883        // reason. Unlike gc(archive=true), this is NOT gated on
4884        // `expires_at` — the caller is asking for it right now.
4885        let conn = test_db();
4886        let mem = make_memory("Archive me", "s29", Tier::Long, 5);
4887        let id = insert(&conn, &mem).unwrap();
4888
4889        let moved = archive_memory(&conn, &id, Some("explicit")).unwrap();
4890        assert!(moved, "live row must be archived on first call");
4891        assert!(
4892            get(&conn, &id).unwrap().is_none(),
4893            "row must be removed from active table"
4894        );
4895
4896        let archived = list_archived(&conn, None, 10, 0).unwrap();
4897        assert_eq!(archived.len(), 1);
4898        assert_eq!(archived[0]["id"], id);
4899        assert_eq!(archived[0]["archive_reason"], "explicit");
4900
4901        // Second call is a no-op — row is already out of `memories`.
4902        let second = archive_memory(&conn, &id, Some("explicit")).unwrap();
4903        assert!(
4904            !second,
4905            "second archive call must report no-op (no live row)"
4906        );
4907    }
4908
4909    #[test]
4910    fn archive_memory_missing_id_returns_false() {
4911        // Peers that never saw M1 must no-op, not error, on sync_push
4912        // archives fanout.
4913        let conn = test_db();
4914        let moved = archive_memory(&conn, "nonexistent-id", None).unwrap();
4915        assert!(!moved);
4916    }
4917
4918    #[test]
4919    fn archive_memory_default_reason_is_archive() {
4920        let conn = test_db();
4921        let mem = make_memory("Default reason", "s29", Tier::Long, 5);
4922        let id = insert(&conn, &mem).unwrap();
4923        assert!(archive_memory(&conn, &id, None).unwrap());
4924        let archived = list_archived(&conn, None, 10, 0).unwrap();
4925        assert_eq!(archived[0]["archive_reason"], "archive");
4926    }
4927
4928    #[test]
4929    fn export_all_and_links() {
4930        let conn = test_db();
4931        let id1 = insert(&conn, &make_memory("Export A", "test", Tier::Long, 5)).unwrap();
4932        let id2 = insert(&conn, &make_memory("Export B", "test", Tier::Long, 5)).unwrap();
4933        create_link(&conn, &id1, &id2, "supersedes").unwrap();
4934
4935        let mems = export_all(&conn).unwrap();
4936        assert_eq!(mems.len(), 2);
4937        let links = export_links(&conn).unwrap();
4938        assert_eq!(links.len(), 1);
4939    }
4940
4941    #[test]
4942    fn list_namespaces_counts() {
4943        let conn = test_db();
4944        insert(&conn, &make_memory("A", "alpha", Tier::Long, 5)).unwrap();
4945        insert(&conn, &make_memory("B", "alpha", Tier::Long, 5)).unwrap();
4946        insert(&conn, &make_memory("C", "beta", Tier::Long, 5)).unwrap();
4947
4948        let ns = list_namespaces(&conn).unwrap();
4949        assert_eq!(ns.len(), 2);
4950    }
4951
4952    #[test]
4953    fn taxonomy_flat_namespaces_only() {
4954        // No `/` anywhere — every namespace is a direct child of the root.
4955        let conn = test_db();
4956        insert(&conn, &make_memory("A", "alpha", Tier::Long, 5)).unwrap();
4957        insert(&conn, &make_memory("B", "alpha", Tier::Long, 5)).unwrap();
4958        insert(&conn, &make_memory("C", "beta", Tier::Long, 5)).unwrap();
4959
4960        let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
4961        assert_eq!(tax.total_count, 3);
4962        assert!(!tax.truncated);
4963        assert_eq!(tax.tree.namespace, "");
4964        assert_eq!(tax.tree.subtree_count, 3);
4965        assert_eq!(tax.tree.count, 0); // no memories at the synthetic root
4966        assert_eq!(tax.tree.children.len(), 2);
4967        let alpha = tax
4968            .tree
4969            .children
4970            .iter()
4971            .find(|c| c.name == "alpha")
4972            .unwrap();
4973        assert_eq!(alpha.count, 2);
4974        assert_eq!(alpha.subtree_count, 2);
4975        assert!(alpha.children.is_empty());
4976        let beta = tax.tree.children.iter().find(|c| c.name == "beta").unwrap();
4977        assert_eq!(beta.count, 1);
4978    }
4979
4980    #[test]
4981    fn taxonomy_hierarchical_tree() {
4982        // Mixed depths: tree must aggregate counts up the spine.
4983        let conn = test_db();
4984        insert(&conn, &make_memory("a", "alphaone", Tier::Long, 5)).unwrap();
4985        insert(&conn, &make_memory("b", "alphaone/eng", Tier::Long, 5)).unwrap();
4986        insert(
4987            &conn,
4988            &make_memory("c", "alphaone/eng/platform", Tier::Long, 5),
4989        )
4990        .unwrap();
4991        insert(
4992            &conn,
4993            &make_memory("d", "alphaone/eng/platform", Tier::Long, 5),
4994        )
4995        .unwrap();
4996        insert(&conn, &make_memory("e", "alphaone/sales", Tier::Long, 5)).unwrap();
4997
4998        let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
4999        assert_eq!(tax.total_count, 5);
5000        assert_eq!(tax.tree.subtree_count, 5);
5001        assert_eq!(tax.tree.children.len(), 1);
5002
5003        let alphaone = &tax.tree.children[0];
5004        assert_eq!(alphaone.name, "alphaone");
5005        assert_eq!(alphaone.namespace, "alphaone");
5006        assert_eq!(alphaone.count, 1); // memory "a" lives at exactly "alphaone"
5007        assert_eq!(alphaone.subtree_count, 5);
5008        assert_eq!(alphaone.children.len(), 2);
5009
5010        let eng = alphaone.children.iter().find(|c| c.name == "eng").unwrap();
5011        assert_eq!(eng.namespace, "alphaone/eng");
5012        assert_eq!(eng.count, 1);
5013        assert_eq!(eng.subtree_count, 3);
5014        let platform = &eng.children[0];
5015        assert_eq!(platform.name, "platform");
5016        assert_eq!(platform.namespace, "alphaone/eng/platform");
5017        assert_eq!(platform.count, 2);
5018        assert_eq!(platform.subtree_count, 2);
5019        assert!(platform.children.is_empty());
5020    }
5021
5022    #[test]
5023    fn taxonomy_prefix_scopes_subtree() {
5024        let conn = test_db();
5025        insert(&conn, &make_memory("a", "alphaone/eng", Tier::Long, 5)).unwrap();
5026        insert(
5027            &conn,
5028            &make_memory("b", "alphaone/eng/platform", Tier::Long, 5),
5029        )
5030        .unwrap();
5031        insert(&conn, &make_memory("c", "alphaone/sales", Tier::Long, 5)).unwrap();
5032        // Sibling that happens to share a string prefix — must NOT bleed in.
5033        insert(&conn, &make_memory("d", "alphaone-sibling", Tier::Long, 5)).unwrap();
5034        insert(&conn, &make_memory("e", "other", Tier::Long, 5)).unwrap();
5035
5036        let tax = get_taxonomy(&conn, Some("alphaone/eng"), 8, 1000).unwrap();
5037        assert_eq!(tax.total_count, 2);
5038        assert_eq!(tax.tree.namespace, "alphaone/eng");
5039        assert_eq!(tax.tree.name, "eng");
5040        assert_eq!(tax.tree.count, 1);
5041        assert_eq!(tax.tree.subtree_count, 2);
5042        assert_eq!(tax.tree.children.len(), 1);
5043        assert_eq!(tax.tree.children[0].name, "platform");
5044        assert_eq!(tax.tree.children[0].count, 1);
5045    }
5046
5047    #[test]
5048    fn taxonomy_depth_clamps_but_preserves_subtree_counts() {
5049        let conn = test_db();
5050        insert(
5051            &conn,
5052            &make_memory("a", "alphaone/eng/platform/db", Tier::Long, 5),
5053        )
5054        .unwrap();
5055        insert(
5056            &conn,
5057            &make_memory("b", "alphaone/eng/platform/api", Tier::Long, 5),
5058        )
5059        .unwrap();
5060
5061        let tax = get_taxonomy(&conn, None, 2, 1000).unwrap();
5062        assert_eq!(tax.total_count, 2);
5063        let alphaone = &tax.tree.children[0];
5064        let eng = &alphaone.children[0];
5065        // Depth=2 below the empty prefix means we descend exactly two
5066        // levels (alphaone → eng); deeper segments are folded into
5067        // `eng.subtree_count` without rendering child nodes.
5068        assert!(eng.children.is_empty());
5069        assert_eq!(eng.subtree_count, 2);
5070        assert_eq!(eng.count, 0); // nothing at exactly "alphaone/eng"
5071    }
5072
5073    #[test]
5074    fn taxonomy_excludes_expired_memories() {
5075        // Mirror of `list_namespaces` semantics — expired rows must not
5076        // count toward either the tree or `total_count`.
5077        let conn = test_db();
5078        let mut alive = make_memory("alive", "alpha", Tier::Long, 5);
5079        let mut dead = make_memory("dead", "alpha", Tier::Short, 5);
5080        // Force the short-tier memory's expiry into the past.
5081        dead.expires_at = Some("2000-01-01T00:00:00Z".to_string());
5082        alive.expires_at = None;
5083        insert(&conn, &alive).unwrap();
5084        insert(&conn, &dead).unwrap();
5085
5086        let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
5087        assert_eq!(tax.total_count, 1);
5088        assert_eq!(tax.tree.children.len(), 1);
5089        assert_eq!(tax.tree.children[0].count, 1);
5090    }
5091
5092    #[test]
5093    fn taxonomy_truncates_at_limit_but_total_stays_honest() {
5094        let conn = test_db();
5095        for ns in ["aa", "bb", "cc", "dd", "ee"] {
5096            insert(&conn, &make_memory("m", ns, Tier::Long, 5)).unwrap();
5097        }
5098        let tax = get_taxonomy(&conn, None, 8, 2).unwrap();
5099        // Limit drops 3 namespaces from the walk; total_count must
5100        // still see all 5 memories so renderers can warn the user.
5101        assert_eq!(tax.total_count, 5);
5102        assert!(tax.truncated);
5103        assert_eq!(tax.tree.children.len(), 2);
5104    }
5105
5106    #[test]
5107    fn forget_by_namespace() {
5108        let conn = test_db();
5109        insert(&conn, &make_memory("A", "delete-me", Tier::Long, 5)).unwrap();
5110        insert(&conn, &make_memory("B", "delete-me", Tier::Long, 5)).unwrap();
5111        insert(&conn, &make_memory("C", "keep", Tier::Long, 5)).unwrap();
5112
5113        let deleted = forget(&conn, Some("delete-me"), None, None, false).unwrap();
5114        assert_eq!(deleted, 2);
5115        let remaining = list(&conn, None, None, 100, 0, None, None, None, None, None).unwrap();
5116        assert_eq!(remaining.len(), 1);
5117    }
5118
5119    #[test]
5120    fn set_and_get_embedding() {
5121        let conn = test_db();
5122        let mem = make_memory("Embed test", "test", Tier::Long, 5);
5123        let id = insert(&conn, &mem).unwrap();
5124
5125        let emb = vec![0.1f32, 0.2, 0.3, 0.4];
5126        set_embedding(&conn, &id, &emb).unwrap();
5127
5128        let got = get_embedding(&conn, &id).unwrap().unwrap();
5129        assert_eq!(got.len(), 4);
5130        assert!((got[0] - 0.1).abs() < 1e-6);
5131    }
5132
5133    // -- Pillar 2 / Stream D — memory_check_duplicate -------------------
5134
5135    fn insert_with_embedding(
5136        conn: &Connection,
5137        title: &str,
5138        ns: &str,
5139        embedding: &[f32],
5140    ) -> String {
5141        let mem = make_memory(title, ns, Tier::Long, 5);
5142        let id = insert(conn, &mem).unwrap();
5143        set_embedding(conn, &id, embedding).unwrap();
5144        id
5145    }
5146
5147    #[test]
5148    fn check_duplicate_empty_db_returns_no_match() {
5149        let conn = test_db();
5150        let q = vec![1.0_f32, 0.0, 0.0];
5151        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5152        assert!(!r.is_duplicate);
5153        assert!(r.nearest.is_none());
5154        assert_eq!(r.candidates_scanned, 0);
5155    }
5156
5157    #[test]
5158    fn check_duplicate_finds_highest_cosine_match() {
5159        let conn = test_db();
5160        // a = [1,0,0]; b = [0,1,0]; c = [0.99,0.01,0]. Query = [1,0,0]
5161        // expects `c` (cos ~0.9999) > `a` (cos =1.0 actually).
5162        // Use distinct vectors: a=[1,0,0] cos 1.0, b=[0.7,0.7,0] cos 0.707,
5163        // c=[0,1,0] cos 0.0. Best should be `a`.
5164        let id_a = insert_with_embedding(&conn, "alpha", "ns", &[1.0, 0.0, 0.0]);
5165        let _id_b = insert_with_embedding(&conn, "beta", "ns", &[0.7, 0.7, 0.0]);
5166        let _id_c = insert_with_embedding(&conn, "gamma", "ns", &[0.0, 1.0, 0.0]);
5167
5168        let q = vec![1.0_f32, 0.0, 0.0];
5169        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5170        let nearest = r.nearest.expect("expected a nearest match");
5171        assert_eq!(nearest.id, id_a);
5172        assert!(nearest.similarity > 0.99);
5173        assert_eq!(r.candidates_scanned, 3);
5174        assert!(r.is_duplicate);
5175        assert!((r.threshold - 0.85).abs() < 1e-6);
5176    }
5177
5178    #[test]
5179    fn check_duplicate_below_threshold_not_flagged_but_returns_nearest() {
5180        let conn = test_db();
5181        let id_b = insert_with_embedding(&conn, "beta", "ns", &[0.7, 0.7, 0.0]);
5182
5183        // Cosine([1,0,0], [0.7,0.7,0]) ~ 0.707 — below default 0.85.
5184        let q = vec![1.0_f32, 0.0, 0.0];
5185        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5186        let nearest = r
5187            .nearest
5188            .expect("nearest must surface even when below threshold");
5189        assert_eq!(nearest.id, id_b);
5190        assert!(!r.is_duplicate);
5191    }
5192
5193    #[test]
5194    fn check_duplicate_threshold_clamped_to_floor() {
5195        let conn = test_db();
5196        // Caller passes a permissive 0.0; the response threshold must
5197        // be clamped to DUPLICATE_THRESHOLD_MIN so unrelated content
5198        // can't be dressed as a merge candidate.
5199        let _ = insert_with_embedding(&conn, "x", "ns", &[1.0, 0.0, 0.0]);
5200        let q = vec![0.0_f32, 1.0, 0.0]; // orthogonal — cosine 0.0
5201        let r = check_duplicate(&conn, &q, None, 0.0).unwrap();
5202        assert!((r.threshold - DUPLICATE_THRESHOLD_MIN).abs() < 1e-6);
5203        assert!(!r.is_duplicate);
5204    }
5205
5206    #[test]
5207    fn check_duplicate_namespace_filter_isolates_scan() {
5208        let conn = test_db();
5209        let _hit_in_other_ns = insert_with_embedding(&conn, "x", "other", &[1.0, 0.0, 0.0]);
5210        let id_target = insert_with_embedding(&conn, "y", "ns", &[0.6, 0.8, 0.0]);
5211
5212        let q = vec![1.0_f32, 0.0, 0.0];
5213        let r = check_duplicate(&conn, &q, Some("ns"), 0.85).unwrap();
5214        assert_eq!(r.candidates_scanned, 1);
5215        assert_eq!(r.nearest.expect("namespace filter ignored").id, id_target);
5216    }
5217
5218    #[test]
5219    fn check_duplicate_skips_expired_rows() {
5220        let conn = test_db();
5221        // Short-tier memory with a backdated `expires_at` is past the
5222        // live-row gate and must not be a candidate.
5223        let mut mem = make_memory("expired", "ns", Tier::Short, 5);
5224        mem.expires_at = Some((chrono::Utc::now() - chrono::Duration::seconds(60)).to_rfc3339());
5225        let id = insert(&conn, &mem).unwrap();
5226        set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
5227
5228        let q = vec![1.0_f32, 0.0, 0.0];
5229        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5230        assert_eq!(r.candidates_scanned, 0);
5231        assert!(r.nearest.is_none());
5232    }
5233
5234    #[test]
5235    fn check_duplicate_skips_unembedded_rows() {
5236        let conn = test_db();
5237        // One memory with an embedding, one without — only the embedded
5238        // row should appear in `candidates_scanned`.
5239        let id_embedded = insert_with_embedding(&conn, "with-emb", "ns", &[1.0, 0.0, 0.0]);
5240        let mem = make_memory("no-emb", "ns", Tier::Long, 5);
5241        let _ = insert(&conn, &mem).unwrap();
5242
5243        let q = vec![1.0_f32, 0.0, 0.0];
5244        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5245        assert_eq!(r.candidates_scanned, 1);
5246        assert_eq!(r.nearest.expect("embedded match").id, id_embedded);
5247    }
5248
5249    #[test]
5250    fn check_duplicate_skips_blob_with_non_multiple_of_4_length() {
5251        // Regression: pre-fix, an embedding blob whose length was not
5252        // a multiple of 4 would silently drop a trailing partial chunk
5253        // via chunks_exact and compute cosine against a shorter
5254        // candidate vector — producing a misleading score. The bounds
5255        // check now skips the row entirely.
5256        let conn = test_db();
5257        let mem = make_memory("malformed-blob", "ns", Tier::Long, 5);
5258        let id = insert(&conn, &mem).unwrap();
5259        // Write a 7-byte blob (1 short of 8 = 2 f32s) directly to
5260        // sqlite, bypassing set_embedding which only takes &[f32].
5261        conn.execute(
5262            "UPDATE memories SET embedding = ?1 WHERE id = ?2",
5263            params![&[0u8; 7][..], &id],
5264        )
5265        .unwrap();
5266
5267        let q = vec![1.0_f32, 0.0];
5268        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5269        assert_eq!(
5270            r.candidates_scanned, 0,
5271            "malformed blob must be skipped, not silently truncated"
5272        );
5273        assert!(r.nearest.is_none());
5274    }
5275
5276    #[test]
5277    fn check_duplicate_skips_blob_with_dimension_mismatch() {
5278        // Regression: a blob with a valid length (multiple of 4) but
5279        // wrong dimension vs the query embedding must NOT be scored;
5280        // cosine_similarity zips and would silently truncate to the
5281        // shorter input, producing a wrong similarity.
5282        let conn = test_db();
5283        // Insert a memory with a 3-dim embedding via the normal path.
5284        let _id = insert_with_embedding(&conn, "different-dim", "ns", &[1.0, 0.0, 0.0]);
5285
5286        // Query with a 4-dim embedding — different from the candidate.
5287        let q = vec![1.0_f32, 0.0, 0.0, 0.0];
5288        let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
5289        assert_eq!(
5290            r.candidates_scanned, 0,
5291            "dimension-mismatched candidate must be skipped"
5292        );
5293        assert!(r.nearest.is_none());
5294    }
5295
5296    #[test]
5297    fn get_unembedded_returns_memoryless() {
5298        let conn = test_db();
5299        let mem = make_memory("No embed", "test", Tier::Long, 5);
5300        insert(&conn, &mem).unwrap();
5301
5302        let unembedded = get_unembedded_ids(&conn).unwrap();
5303        assert_eq!(unembedded.len(), 1);
5304    }
5305
5306    #[test]
5307    fn health_check_passes() {
5308        let conn = test_db();
5309        assert!(health_check(&conn).unwrap());
5310    }
5311
5312    #[test]
5313    fn sanitize_fts_strips_operators_and_quotes() {
5314        // FTS5 special chars: " * ^ { } ( ) : - | are stripped
5315        let sanitized = sanitize_fts_query("test* \"injection\" (drop)", true);
5316        assert!(!sanitized.contains('*'));
5317        assert!(!sanitized.contains('('));
5318        assert!(!sanitized.contains(')'));
5319        // Standalone boolean operators are removed
5320        let sanitized2 = sanitize_fts_query("hello AND world OR NOT NEAR test", true);
5321        assert!(sanitized2.contains("hello"));
5322        assert!(sanitized2.contains("world"));
5323        assert!(sanitized2.contains("test"));
5324        // Empty input returns placeholder
5325        let sanitized3 = sanitize_fts_query("", true);
5326        assert_eq!(sanitized3, "\"_empty_\"");
5327        // `+` prefix operator is stripped (prevents exclusion injection);
5328        // `-` is now preserved inside phrase-quoted tokens so hyphenated
5329        // content ("well-known", "foo-bar") searches correctly against
5330        // the unicode61 tokenizer. Phrase-quoting keeps `-` from reaching
5331        // FTS5 as a prefix operator, closing the injection hole.
5332        let sanitized4 = sanitize_fts_query("-secret +required", true);
5333        assert!(!sanitized4.contains('+'));
5334        assert!(sanitized4.contains("secret"));
5335        assert!(sanitized4.contains("required"));
5336        // Hyphenated tokens pass through as phrase searches.
5337        let sanitized5 = sanitize_fts_query("well-known", true);
5338        assert!(sanitized5.contains("well-known"));
5339    }
5340
5341    #[test]
5342    fn get_by_prefix_8char() {
5343        let conn = test_db();
5344        let mem = make_memory("Prefix test", "test", Tier::Long, 5);
5345        let id = insert(&conn, &mem).unwrap();
5346        let prefix = &id[..8];
5347        let got = get_by_prefix(&conn, prefix).unwrap().unwrap();
5348        assert_eq!(got.id, id);
5349        assert_eq!(got.title, "Prefix test");
5350    }
5351
5352    #[test]
5353    fn get_by_prefix_full_uuid() {
5354        let conn = test_db();
5355        let mem = make_memory("Full UUID prefix", "test", Tier::Long, 5);
5356        let id = insert(&conn, &mem).unwrap();
5357        // Full UUID used as prefix still works (LIKE 'full-uuid%' matches exact)
5358        let got = get_by_prefix(&conn, &id).unwrap().unwrap();
5359        assert_eq!(got.id, id);
5360    }
5361
5362    #[test]
5363    fn get_by_prefix_nonexistent() {
5364        let conn = test_db();
5365        let got = get_by_prefix(&conn, "ffffffff").unwrap();
5366        assert!(got.is_none());
5367    }
5368
5369    #[test]
5370    fn get_by_prefix_ambiguous() {
5371        let conn = test_db();
5372        // Insert two memories with IDs sharing a common prefix
5373        let mut mem1 = make_memory("Ambig A", "test", Tier::Long, 5);
5374        mem1.id = "aaaa1111-0000-0000-0000-000000000001".to_string();
5375        insert(&conn, &mem1).unwrap();
5376        let mut mem2 = make_memory("Ambig B", "test2", Tier::Long, 5);
5377        mem2.id = "aaaa2222-0000-0000-0000-000000000002".to_string();
5378        insert(&conn, &mem2).unwrap();
5379        let result = get_by_prefix(&conn, "aaaa");
5380        assert!(result.is_err());
5381        let err_msg = result.unwrap_err().to_string();
5382        assert!(err_msg.contains("ambiguous"));
5383        assert!(err_msg.contains("2 matches"));
5384        // Error should list the matching full IDs so the user can pick one
5385        assert!(
5386            err_msg.contains("aaaa1111-0000-0000-0000-000000000001"),
5387            "error should list matching IDs, got: {err_msg}"
5388        );
5389        assert!(err_msg.contains("aaaa2222-0000-0000-0000-000000000002"));
5390    }
5391
5392    #[test]
5393    fn resolve_id_exact_then_prefix() {
5394        let conn = test_db();
5395        let mem = make_memory("Resolve test", "test", Tier::Long, 5);
5396        let id = insert(&conn, &mem).unwrap();
5397        // Exact match
5398        let got = resolve_id(&conn, &id).unwrap().unwrap();
5399        assert_eq!(got.id, id);
5400        // Prefix match
5401        let got2 = resolve_id(&conn, &id[..8]).unwrap().unwrap();
5402        assert_eq!(got2.id, id);
5403        // Nonexistent
5404        let got3 = resolve_id(&conn, "zzzzzzzz").unwrap();
5405        assert!(got3.is_none());
5406    }
5407
5408    #[test]
5409    fn insert_if_newer_updates() {
5410        let conn = test_db();
5411        let mut mem = make_memory("Sync test", "test", Tier::Long, 5);
5412        let id = insert(&conn, &mem).unwrap();
5413
5414        mem.id = id.clone();
5415        mem.content = "Updated via sync".to_string();
5416        mem.updated_at = (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
5417        let result_id = insert_if_newer(&conn, &mem).unwrap();
5418        assert_eq!(result_id, id);
5419
5420        let got = get(&conn, &id).unwrap().unwrap();
5421        assert_eq!(got.content, "Updated via sync");
5422    }
5423
5424    // --- Metadata tests (Task 1.1) ---
5425
5426    #[test]
5427    fn metadata_default_empty_object() {
5428        let conn = test_db();
5429        let mem = make_memory("Default metadata", "test", Tier::Long, 5);
5430        let id = insert(&conn, &mem).unwrap();
5431        let got = get(&conn, &id).unwrap().unwrap();
5432        assert_eq!(got.metadata, serde_json::json!({}));
5433    }
5434
5435    #[test]
5436    fn metadata_store_and_retrieve() {
5437        let conn = test_db();
5438        let mut mem = make_memory("With metadata", "test", Tier::Long, 5);
5439        mem.metadata = serde_json::json!({"agent_id": "claude-1", "session": 42});
5440        let id = insert(&conn, &mem).unwrap();
5441        let got = get(&conn, &id).unwrap().unwrap();
5442        assert_eq!(got.metadata["agent_id"], "claude-1");
5443        assert_eq!(got.metadata["session"], 42);
5444    }
5445
5446    #[test]
5447    fn metadata_roundtrip_nested_json() {
5448        let conn = test_db();
5449        let mut mem = make_memory("Nested metadata", "test", Tier::Long, 5);
5450        mem.metadata = serde_json::json!({
5451            "agent": {"type": "ai:claude", "version": "4.6"},
5452            "tags_extra": ["experimental"],
5453            "score": 0.95
5454        });
5455        let id = insert(&conn, &mem).unwrap();
5456        let got = get(&conn, &id).unwrap().unwrap();
5457        assert_eq!(got.metadata["agent"]["type"], "ai:claude");
5458        assert_eq!(got.metadata["tags_extra"][0], "experimental");
5459        assert!((got.metadata["score"].as_f64().unwrap() - 0.95).abs() < f64::EPSILON);
5460    }
5461
5462    #[test]
5463    fn metadata_preserved_on_update() {
5464        let conn = test_db();
5465        let mut mem = make_memory("Update metadata", "test", Tier::Long, 5);
5466        mem.metadata = serde_json::json!({"key": "original"});
5467        let id = insert(&conn, &mem).unwrap();
5468
5469        // Update without metadata — should preserve existing
5470        let (found, _) = update(
5471            &conn,
5472            &id,
5473            None,
5474            Some("new content"),
5475            None,
5476            None,
5477            None,
5478            None,
5479            None,
5480            None,
5481            None,
5482        )
5483        .unwrap();
5484        assert!(found);
5485        let got = get(&conn, &id).unwrap().unwrap();
5486        assert_eq!(got.metadata["key"], "original");
5487        assert_eq!(got.content, "new content");
5488
5489        // Update with new metadata — should replace
5490        let new_meta = serde_json::json!({"key": "updated", "extra": true});
5491        let (found, _) = update(
5492            &conn,
5493            &id,
5494            None,
5495            None,
5496            None,
5497            None,
5498            None,
5499            None,
5500            None,
5501            None,
5502            Some(&new_meta),
5503        )
5504        .unwrap();
5505        assert!(found);
5506        let got = get(&conn, &id).unwrap().unwrap();
5507        assert_eq!(got.metadata["key"], "updated");
5508        assert_eq!(got.metadata["extra"], true);
5509    }
5510
5511    #[test]
5512    fn metadata_preserved_on_upsert() {
5513        let conn = test_db();
5514        let mut mem = make_memory("Upsert meta", "test", Tier::Long, 5);
5515        mem.metadata = serde_json::json!({"version": 1});
5516        insert(&conn, &mem).unwrap();
5517
5518        // Insert again with same title+namespace — upsert should update metadata
5519        let mut mem2 = make_memory("Upsert meta", "test", Tier::Long, 5);
5520        mem2.metadata = serde_json::json!({"version": 2});
5521        let id = insert(&conn, &mem2).unwrap();
5522        let got = get(&conn, &id).unwrap().unwrap();
5523        assert_eq!(got.metadata["version"], 2);
5524    }
5525
5526    #[test]
5527    fn metadata_in_list_and_search() {
5528        let conn = test_db();
5529        let mut mem = make_memory("Searchable metadata", "test", Tier::Long, 8);
5530        mem.metadata = serde_json::json!({"source_model": "opus"});
5531        insert(&conn, &mem).unwrap();
5532
5533        let results = list(
5534            &conn,
5535            Some("test"),
5536            None,
5537            10,
5538            0,
5539            None,
5540            None,
5541            None,
5542            None,
5543            None,
5544        )
5545        .unwrap();
5546        assert_eq!(results.len(), 1);
5547        assert_eq!(results[0].metadata["source_model"], "opus");
5548
5549        let results = search(
5550            &conn,
5551            "Searchable",
5552            Some("test"),
5553            None,
5554            10,
5555            None,
5556            None,
5557            None,
5558            None,
5559            None,
5560            None,
5561        )
5562        .unwrap();
5563        assert_eq!(results.len(), 1);
5564        assert_eq!(results[0].metadata["source_model"], "opus");
5565    }
5566
5567    #[test]
5568    fn metadata_in_recall() {
5569        let conn = test_db();
5570        let mut mem = make_memory("Recallable metadata", "test", Tier::Long, 8);
5571        mem.metadata = serde_json::json!({"context": "test-recall"});
5572        insert(&conn, &mem).unwrap();
5573
5574        let (results, _tokens) = recall(
5575            &conn,
5576            "Recallable",
5577            Some("test"),
5578            10,
5579            None,
5580            None,
5581            None,
5582            3600,
5583            86400,
5584            None,
5585            None,
5586        )
5587        .unwrap();
5588        assert!(!results.is_empty());
5589        assert_eq!(results[0].0.metadata["context"], "test-recall");
5590    }
5591
5592    #[test]
5593    fn metadata_in_export_import() {
5594        let conn = test_db();
5595        let mut mem = make_memory("Export metadata", "test", Tier::Long, 5);
5596        mem.metadata = serde_json::json!({"exported": true});
5597        insert(&conn, &mem).unwrap();
5598
5599        let exported = export_all(&conn).unwrap();
5600        assert_eq!(exported.len(), 1);
5601        assert_eq!(exported[0].metadata["exported"], true);
5602
5603        // Import into fresh DB
5604        let conn2 = test_db();
5605        insert(&conn2, &exported[0]).unwrap();
5606        let got = get(&conn2, &exported[0].id).unwrap().unwrap();
5607        assert_eq!(got.metadata["exported"], true);
5608    }
5609
5610    #[test]
5611    fn metadata_schema_migration() {
5612        // Simulate a pre-v7 database (no metadata column) by creating one
5613        // and checking that migration adds the column with correct default
5614        let conn = test_db();
5615        let mem = make_memory("Migration test", "test", Tier::Long, 5);
5616        let id = insert(&conn, &mem).unwrap();
5617
5618        // Verify the column exists and has the default value
5619        let metadata_str: String = conn
5620            .query_row(
5621                "SELECT metadata FROM memories WHERE id = ?1",
5622                params![id],
5623                |r| r.get(0),
5624            )
5625            .unwrap();
5626        assert_eq!(metadata_str, "{}");
5627    }
5628
5629    #[test]
5630    fn metadata_survives_archive_restore_cycle() {
5631        let conn = test_db();
5632        let mut mem = make_memory("Archivable", "test", Tier::Short, 5);
5633        mem.metadata = serde_json::json!({"origin": "archive-test"});
5634        // Set expiry in the past so GC will archive it
5635        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5636        let id = insert(&conn, &mem).unwrap();
5637
5638        // Run GC with archive=true — should archive the expired memory
5639        let deleted = gc(&conn, true).unwrap();
5640        assert_eq!(deleted, 1);
5641
5642        // Verify metadata is in the archive
5643        let archived = list_archived(&conn, None, 10, 0).unwrap();
5644        assert_eq!(archived.len(), 1);
5645        assert_eq!(archived[0]["metadata"]["origin"], "archive-test");
5646
5647        // Restore and verify metadata survives the round-trip
5648        let restored = restore_archived(&conn, &id).unwrap();
5649        assert!(restored);
5650        let got = get(&conn, &id).unwrap().unwrap();
5651        assert_eq!(got.metadata["origin"], "archive-test");
5652    }
5653
5654    #[test]
5655    fn metadata_in_insert_if_newer() {
5656        let conn = test_db();
5657        let mut mem = make_memory("Sync metadata", "test", Tier::Long, 5);
5658        mem.metadata = serde_json::json!({"version": 1});
5659        let id = insert(&conn, &mem).unwrap();
5660
5661        // Insert newer version with different metadata
5662        mem.id = id.clone();
5663        mem.metadata = serde_json::json!({"version": 2, "synced": true});
5664        mem.updated_at = (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
5665        insert_if_newer(&conn, &mem).unwrap();
5666
5667        let got = get(&conn, &id).unwrap().unwrap();
5668        assert_eq!(got.metadata["version"], 2);
5669        assert_eq!(got.metadata["synced"], true);
5670
5671        // Insert older version — metadata should NOT be overwritten
5672        mem.metadata = serde_json::json!({"version": 0, "stale": true});
5673        mem.updated_at = "2020-01-01T00:00:00+00:00".to_string();
5674        insert_if_newer(&conn, &mem).unwrap();
5675
5676        let got = get(&conn, &id).unwrap().unwrap();
5677        assert_eq!(got.metadata["version"], 2); // still the newer one
5678        assert!(got.metadata.get("stale").is_none());
5679    }
5680
5681    #[test]
5682    fn metadata_merged_in_consolidate() {
5683        let conn = test_db();
5684        let mut mem_a = make_memory("Consolidate A", "test", Tier::Long, 5);
5685        mem_a.metadata = serde_json::json!({"agent": "claude", "shared": "from_a"});
5686        let id_a = insert(&conn, &mem_a).unwrap();
5687
5688        let mut mem_b = make_memory("Consolidate B", "test", Tier::Long, 7);
5689        mem_b.metadata = serde_json::json!({"model": "opus", "shared": "from_b"});
5690        let id_b = insert(&conn, &mem_b).unwrap();
5691
5692        let new_id = consolidate(
5693            &conn,
5694            &[id_a, id_b],
5695            "Merged",
5696            "Combined content",
5697            "test",
5698            &Tier::Long,
5699            "consolidation",
5700            "test-consolidator",
5701        )
5702        .unwrap();
5703
5704        let got = get(&conn, &new_id).unwrap().unwrap();
5705        // Both keys present; "shared" key takes value from later source (mem_b)
5706        assert_eq!(got.metadata["agent"], "claude");
5707        assert_eq!(got.metadata["model"], "opus");
5708        assert_eq!(got.metadata["shared"], "from_b");
5709    }
5710
5711    #[test]
5712    fn metadata_consolidate_rejects_oversized_merge() {
5713        let conn = test_db();
5714        // Create two memories with large unique-key metadata that together exceed 64KB
5715        let mut mem_a = make_memory("Big meta A", "test", Tier::Long, 5);
5716        let big_val_a: serde_json::Map<String, serde_json::Value> = (0..500)
5717            .map(|i| {
5718                (
5719                    format!("key_a_{i}"),
5720                    serde_json::Value::String("x".repeat(60)),
5721                )
5722            })
5723            .collect();
5724        mem_a.metadata = serde_json::Value::Object(big_val_a);
5725        let id_a = insert(&conn, &mem_a).unwrap();
5726
5727        let mut mem_b = make_memory("Big meta B", "test", Tier::Long, 5);
5728        let big_val_b: serde_json::Map<String, serde_json::Value> = (0..500)
5729            .map(|i| {
5730                (
5731                    format!("key_b_{i}"),
5732                    serde_json::Value::String("x".repeat(60)),
5733                )
5734            })
5735            .collect();
5736        mem_b.metadata = serde_json::Value::Object(big_val_b);
5737        let id_b = insert(&conn, &mem_b).unwrap();
5738
5739        // Consolidate should fail because merged metadata exceeds 64KB
5740        let result = consolidate(
5741            &conn,
5742            &[id_a, id_b],
5743            "Oversized merge",
5744            "Should fail",
5745            "test",
5746            &Tier::Long,
5747            "consolidation",
5748            "test-consolidator",
5749        );
5750        let err = result.expect_err("consolidate should fail for oversized merged metadata");
5751        let msg = err.to_string();
5752        assert!(
5753            msg.contains("merged metadata exceeds size limit"),
5754            "expected metadata size error, got: {msg}"
5755        );
5756    }
5757
5758    #[test]
5759    fn metadata_special_characters_roundtrip() {
5760        let conn = test_db();
5761        let mut mem = make_memory("Special chars metadata", "test", Tier::Long, 5);
5762        mem.metadata = serde_json::json!({
5763            "pipe": "a|b|c",
5764            "newline": "line1\nline2",
5765            "tab": "col1\tcol2",
5766            "backslash": "path\\to\\file",
5767            "unicode": "\u{1F600}\u{1F4A9}",
5768            "cjk": "\u{4e16}\u{754c}",
5769            "empty": "",
5770            "nested_special": {"inner|key": "val\nue"}
5771        });
5772        let id = insert(&conn, &mem).unwrap();
5773        let got = get(&conn, &id).unwrap().unwrap();
5774        assert_eq!(got.metadata["pipe"], "a|b|c");
5775        assert_eq!(got.metadata["newline"], "line1\nline2");
5776        assert_eq!(got.metadata["unicode"], "\u{1F600}\u{1F4A9}");
5777        assert_eq!(got.metadata["cjk"], "\u{4e16}\u{754c}");
5778        assert_eq!(got.metadata["nested_special"]["inner|key"], "val\nue");
5779    }
5780
5781    #[test]
5782    fn metadata_corrupt_column_falls_back_to_empty() {
5783        let conn = test_db();
5784        let mem = make_memory("Corrupt test", "test", Tier::Long, 5);
5785        let id = insert(&conn, &mem).unwrap();
5786
5787        // Manually corrupt the metadata column
5788        conn.execute(
5789            "UPDATE memories SET metadata = 'NOT VALID JSON {{{{' WHERE id = ?1",
5790            params![id],
5791        )
5792        .unwrap();
5793
5794        // row_to_memory should fall back to {} without panicking
5795        let got = get(&conn, &id).unwrap().unwrap();
5796        assert_eq!(got.metadata, serde_json::json!({}));
5797    }
5798
5799    #[test]
5800    fn metadata_restore_resets_corrupt_archived_metadata() {
5801        let conn = test_db();
5802        let mut mem = make_memory("Corrupt archive", "test", Tier::Short, 5);
5803        mem.metadata = serde_json::json!({"valid": true});
5804        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5805        let id = insert(&conn, &mem).unwrap();
5806
5807        // Archive via GC
5808        gc(&conn, true).unwrap();
5809
5810        // Corrupt the archived metadata directly
5811        conn.execute(
5812            "UPDATE archived_memories SET metadata = 'CORRUPT JSON' WHERE id = ?1",
5813            params![id],
5814        )
5815        .unwrap();
5816
5817        // Restore — should reset metadata to {} instead of failing
5818        let restored = restore_archived(&conn, &id).unwrap();
5819        assert!(restored);
5820        let got = get(&conn, &id).unwrap().unwrap();
5821        assert_eq!(got.metadata, serde_json::json!({}));
5822    }
5823
5824    #[test]
5825    fn scope_index_exists_after_migration() {
5826        // v0.6.0 GA (schema v10) — the `scope_idx` generated column and its
5827        // B-tree index must exist after `open()` runs migration.
5828        let conn = test_db();
5829        let has_col: bool = conn
5830            .prepare("SELECT scope_idx FROM memories LIMIT 0")
5831            .is_ok();
5832        assert!(has_col, "scope_idx generated column missing");
5833        let idx_exists: i64 = conn
5834            .query_row(
5835                "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_memories_scope_idx'",
5836                [],
5837                |row| row.get(0),
5838            )
5839            .unwrap();
5840        assert_eq!(idx_exists, 1, "idx_memories_scope_idx missing");
5841    }
5842
5843    #[test]
5844    fn scope_index_used_for_direct_scope_filter() {
5845        // v0.6.0 GA — confirm `idx_memories_scope_idx` is picked for a
5846        // direct `WHERE scope_idx = ?` predicate. This is the shape the
5847        // query planner sees for `scope = 'collective'` fast-paths and
5848        // the branch-local predicate inside `visibility_clause`.
5849        //
5850        // We deliberately do NOT assert the index is used for the full
5851        // visibility_clause OR-chain — SQLite's planner may (correctly)
5852        // choose a scan when the OR-chain has variable selectivity across
5853        // branches. The point of the index is to accelerate the common
5854        // case when a recall narrows to one scope; the multi-branch
5855        // visibility clause still benefits because each branch evaluates
5856        // the predicate against a single column rather than a JSON extract.
5857        let conn = test_db();
5858        // Seed enough rows + ANALYZE so planner cost model is honest.
5859        for i in 0..200 {
5860            let scope = if i % 3 == 0 { "collective" } else { "private" };
5861            let mut mem = make_memory(&format!("row-{i}"), "test", Tier::Long, 5);
5862            mem.metadata = serde_json::json!({"scope": scope});
5863            insert(&conn, &mem).unwrap();
5864        }
5865        conn.execute("ANALYZE", []).unwrap();
5866        let plan: Vec<String> = conn
5867            .prepare("EXPLAIN QUERY PLAN SELECT id FROM memories WHERE scope_idx = ?1")
5868            .unwrap()
5869            .query_map(params!["collective"], |row| row.get::<_, String>(3))
5870            .unwrap()
5871            .collect::<rusqlite::Result<_>>()
5872            .unwrap();
5873        let joined = plan.join("\n");
5874        assert!(
5875            joined.contains("idx_memories_scope_idx"),
5876            "direct scope filter must use idx_memories_scope_idx; got:\n{joined}"
5877        );
5878    }
5879
5880    #[test]
5881    fn scope_idx_reflects_metadata_on_insert_and_update() {
5882        // v0.6.0 GA — the VIRTUAL generated column must track metadata.scope
5883        // across insert and update without manual maintenance.
5884        let conn = test_db();
5885        let mut mem = make_memory("scope-tracking", "test", Tier::Long, 5);
5886        mem.metadata = serde_json::json!({"scope": "team"});
5887        let id = insert(&conn, &mem).unwrap();
5888        let scope: String = conn
5889            .query_row(
5890                "SELECT scope_idx FROM memories WHERE id = ?1",
5891                params![id],
5892                |r| r.get(0),
5893            )
5894            .unwrap();
5895        assert_eq!(scope, "team");
5896
5897        // Flip scope to unit via metadata update — generated column updates.
5898        let new_meta = serde_json::json!({"scope": "unit"});
5899        update(
5900            &conn,
5901            &id,
5902            None,
5903            None,
5904            None,
5905            None,
5906            None,
5907            None,
5908            None,
5909            None,
5910            Some(&new_meta),
5911        )
5912        .unwrap();
5913        let scope2: String = conn
5914            .query_row(
5915                "SELECT scope_idx FROM memories WHERE id = ?1",
5916                params![id],
5917                |r| r.get(0),
5918            )
5919            .unwrap();
5920        assert_eq!(scope2, "unit");
5921
5922        // Memory with no scope key — virtual column returns the default.
5923        let mut bare = make_memory("no-scope-key", "test", Tier::Long, 5);
5924        bare.metadata = serde_json::json!({});
5925        let id2 = insert(&conn, &bare).unwrap();
5926        let scope3: String = conn
5927            .query_row(
5928                "SELECT scope_idx FROM memories WHERE id = ?1",
5929                params![id2],
5930                |r| r.get(0),
5931            )
5932            .unwrap();
5933        assert_eq!(scope3, "private");
5934    }
5935
5936    #[test]
5937    fn auto_purge_archive_respects_max_days() {
5938        let conn = test_db();
5939        let mut mem = make_memory("Purge test", "test", Tier::Short, 5);
5940        mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5941        insert(&conn, &mem).unwrap();
5942        gc(&conn, true).unwrap();
5943
5944        // Archive exists
5945        let archived = list_archived(&conn, None, 10, 0).unwrap();
5946        assert_eq!(archived.len(), 1);
5947
5948        // Backdate archived_at to 30 days ago so purge can detect it
5949        conn.execute(
5950            "UPDATE archived_memories SET archived_at = ?1",
5951            params![(chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339()],
5952        )
5953        .unwrap();
5954
5955        // Purge with None (disabled) — no-op
5956        let purged = auto_purge_archive(&conn, None).unwrap();
5957        assert_eq!(purged, 0);
5958        assert_eq!(list_archived(&conn, None, 10, 0).unwrap().len(), 1);
5959
5960        // Purge with 0 days — should NOT purge (guard condition)
5961        let purged = auto_purge_archive(&conn, Some(0)).unwrap();
5962        assert_eq!(purged, 0);
5963
5964        // Purge with 90 days — archive is only 30 days old, should NOT purge
5965        let purged = auto_purge_archive(&conn, Some(90)).unwrap();
5966        assert_eq!(purged, 0);
5967
5968        // Purge with 7 days — archive is 30 days old, should be purged
5969        let purged = auto_purge_archive(&conn, Some(7)).unwrap();
5970        assert_eq!(purged, 1);
5971        assert!(list_archived(&conn, None, 10, 0).unwrap().is_empty());
5972    }
5973
5974    // ─────────────────────────────────────────────────────────────────
5975    // Schema v15 (v0.6.3 Stream B) — temporal-validity KG migration.
5976    // ─────────────────────────────────────────────────────────────────
5977
5978    fn column_exists(conn: &Connection, table: &str, column: &str) -> bool {
5979        let mut stmt = conn
5980            .prepare(&format!("PRAGMA table_info({table})"))
5981            .unwrap();
5982        let cols: Vec<String> = stmt
5983            .query_map([], |row| row.get::<_, String>(1))
5984            .unwrap()
5985            .filter_map(Result::ok)
5986            .collect();
5987        cols.iter().any(|c| c == column)
5988    }
5989
5990    fn index_exists(conn: &Connection, name: &str) -> bool {
5991        conn.query_row(
5992            "SELECT 1 FROM sqlite_master WHERE type='index' AND name=?1",
5993            params![name],
5994            |r| r.get::<_, i64>(0),
5995        )
5996        .is_ok()
5997    }
5998
5999    #[test]
6000    fn schema_v15_memory_links_has_temporal_columns() {
6001        let conn = test_db();
6002        assert!(column_exists(&conn, "memory_links", "valid_from"));
6003        assert!(column_exists(&conn, "memory_links", "valid_until"));
6004        assert!(column_exists(&conn, "memory_links", "observed_by"));
6005        assert!(column_exists(&conn, "memory_links", "signature"));
6006    }
6007
6008    #[test]
6009    fn schema_v15_memory_links_temporal_indexes_exist() {
6010        let conn = test_db();
6011        assert!(index_exists(&conn, "idx_links_temporal_src"));
6012        assert!(index_exists(&conn, "idx_links_temporal_tgt"));
6013        assert!(index_exists(&conn, "idx_links_relation"));
6014    }
6015
6016    #[test]
6017    fn schema_v15_entity_aliases_table_exists() {
6018        let conn = test_db();
6019        let count: i64 = conn
6020            .query_row("SELECT COUNT(*) FROM entity_aliases", [], |r| r.get(0))
6021            .unwrap();
6022        assert_eq!(count, 0);
6023        assert!(index_exists(&conn, "idx_entity_aliases_alias"));
6024    }
6025
6026    #[test]
6027    fn schema_v15_entity_aliases_primary_key_unique() {
6028        let conn = test_db();
6029        let now = chrono::Utc::now().to_rfc3339();
6030        conn.execute(
6031            "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
6032            params!["e1", "Alpha", &now],
6033        )
6034        .unwrap();
6035        let dup = conn.execute(
6036            "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
6037            params!["e1", "Alpha", &now],
6038        );
6039        assert!(dup.is_err(), "expected PK uniqueness violation");
6040    }
6041
6042    // -- Pillar 2 / Stream B — entity_register / entity_get_by_alias ------
6043
6044    #[test]
6045    fn entity_register_creates_new_entity_with_aliases() {
6046        let conn = test_db();
6047        let aliases = vec!["pa".to_string(), "Project A".to_string()];
6048        let reg = entity_register(
6049            &conn,
6050            "Project Alpha",
6051            "projects/alpha",
6052            &aliases,
6053            &serde_json::json!({}),
6054            Some("test-agent"),
6055        )
6056        .unwrap();
6057        assert!(reg.created, "first registration must be created=true");
6058        assert_eq!(reg.canonical_name, "Project Alpha");
6059        assert_eq!(reg.namespace, "projects/alpha");
6060        // Aliases inserted in one call share a created_at; the
6061        // secondary `alias ASC` sort orders 'P' before 'p'.
6062        assert_eq!(reg.aliases, vec!["Project A".to_string(), "pa".to_string()]);
6063
6064        let m = get(&conn, &reg.entity_id).unwrap().unwrap();
6065        assert_eq!(m.title, "Project Alpha");
6066        assert_eq!(m.tier.rank(), Tier::Long.rank());
6067        assert!(m.tags.contains(&"entity".to_string()));
6068        assert_eq!(m.metadata["kind"], "entity");
6069        assert_eq!(m.metadata["agent_id"], "test-agent");
6070    }
6071
6072    #[test]
6073    fn entity_register_reuses_existing_and_merges_aliases() {
6074        let conn = test_db();
6075        let first = entity_register(
6076            &conn,
6077            "Project Alpha",
6078            "projects/alpha",
6079            &["pa".to_string()],
6080            &serde_json::json!({}),
6081            Some("a1"),
6082        )
6083        .unwrap();
6084        let second = entity_register(
6085            &conn,
6086            "Project Alpha",
6087            "projects/alpha",
6088            &["pa".to_string(), "alpha".to_string()],
6089            &serde_json::json!({}),
6090            Some("a2"),
6091        )
6092        .unwrap();
6093        assert!(first.created);
6094        assert!(!second.created, "second call must reuse the entity");
6095        assert_eq!(first.entity_id, second.entity_id);
6096        assert_eq!(second.aliases, vec!["pa".to_string(), "alpha".to_string()]);
6097    }
6098
6099    #[test]
6100    fn entity_register_errors_on_collision_with_non_entity_memory() {
6101        let conn = test_db();
6102        let mem = make_memory("Conflict", "projects/alpha", Tier::Long, 5);
6103        insert(&conn, &mem).unwrap();
6104        let err = entity_register(
6105            &conn,
6106            "Conflict",
6107            "projects/alpha",
6108            &[],
6109            &serde_json::json!({}),
6110            None,
6111        )
6112        .unwrap_err();
6113        let msg = format!("{err}");
6114        assert!(
6115            msg.contains("non-entity memory"),
6116            "expected collision error, got: {msg}"
6117        );
6118    }
6119
6120    #[test]
6121    fn entity_register_skips_blank_aliases() {
6122        let conn = test_db();
6123        let reg = entity_register(
6124            &conn,
6125            "Trim Test",
6126            "test",
6127            &[String::new(), "   ".to_string(), "ok".to_string()],
6128            &serde_json::json!({}),
6129            None,
6130        )
6131        .unwrap();
6132        assert_eq!(reg.aliases, vec!["ok".to_string()]);
6133    }
6134
6135    #[test]
6136    fn entity_register_preserves_caller_metadata_keys() {
6137        let conn = test_db();
6138        let extra = serde_json::json!({"team": "platform", "kind": "ignored"});
6139        let reg = entity_register(&conn, "Service X", "svc", &[], &extra, None).unwrap();
6140        let m = get(&conn, &reg.entity_id).unwrap().unwrap();
6141        assert_eq!(m.metadata["team"], "platform");
6142        // Caller's `kind` is overwritten — entity records must always
6143        // carry kind=entity for the resolver to find them.
6144        assert_eq!(m.metadata["kind"], "entity");
6145    }
6146
6147    #[test]
6148    fn entity_get_by_alias_returns_record_with_full_alias_set() {
6149        let conn = test_db();
6150        let reg = entity_register(
6151            &conn,
6152            "Project Alpha",
6153            "projects/alpha",
6154            &["pa".to_string(), "alpha".to_string()],
6155            &serde_json::json!({}),
6156            None,
6157        )
6158        .unwrap();
6159        let got = entity_get_by_alias(&conn, "pa", None).unwrap().unwrap();
6160        assert_eq!(got.entity_id, reg.entity_id);
6161        assert_eq!(got.canonical_name, "Project Alpha");
6162        assert_eq!(got.namespace, "projects/alpha");
6163        // Same-batch aliases share a created_at; alphabetical
6164        // tiebreak puts "alpha" before "pa".
6165        assert_eq!(got.aliases, vec!["alpha".to_string(), "pa".to_string()]);
6166    }
6167
6168    #[test]
6169    fn entity_get_by_alias_returns_none_for_unknown_alias() {
6170        let conn = test_db();
6171        let got = entity_get_by_alias(&conn, "missing", None).unwrap();
6172        assert!(got.is_none());
6173    }
6174
6175    #[test]
6176    fn entity_get_by_alias_filters_by_namespace() {
6177        let conn = test_db();
6178        entity_register(
6179            &conn,
6180            "Acme",
6181            "ns_a",
6182            &["a".to_string()],
6183            &serde_json::json!({}),
6184            None,
6185        )
6186        .unwrap();
6187        entity_register(
6188            &conn,
6189            "Acme Corp",
6190            "ns_b",
6191            &["a".to_string()],
6192            &serde_json::json!({}),
6193            None,
6194        )
6195        .unwrap();
6196        let in_a = entity_get_by_alias(&conn, "a", Some("ns_a"))
6197            .unwrap()
6198            .unwrap();
6199        assert_eq!(in_a.namespace, "ns_a");
6200        assert_eq!(in_a.canonical_name, "Acme");
6201        let in_b = entity_get_by_alias(&conn, "a", Some("ns_b"))
6202            .unwrap()
6203            .unwrap();
6204        assert_eq!(in_b.namespace, "ns_b");
6205        assert_eq!(in_b.canonical_name, "Acme Corp");
6206    }
6207
6208    #[test]
6209    fn entity_get_by_alias_without_namespace_picks_most_recent() {
6210        let conn = test_db();
6211        // Older entity created first.
6212        entity_register(
6213            &conn,
6214            "Older",
6215            "ns_old",
6216            &["dup".to_string()],
6217            &serde_json::json!({}),
6218            None,
6219        )
6220        .unwrap();
6221        // Sleep just enough to guarantee a strictly later created_at.
6222        std::thread::sleep(std::time::Duration::from_millis(5));
6223        entity_register(
6224            &conn,
6225            "Newer",
6226            "ns_new",
6227            &["dup".to_string()],
6228            &serde_json::json!({}),
6229            None,
6230        )
6231        .unwrap();
6232        let got = entity_get_by_alias(&conn, "dup", None).unwrap().unwrap();
6233        assert_eq!(got.canonical_name, "Newer");
6234        assert_eq!(got.namespace, "ns_new");
6235    }
6236
6237    #[test]
6238    fn entity_get_by_alias_ignores_non_entity_memory_with_matching_alias() {
6239        let conn = test_db();
6240        // Insert a regular (non-entity) memory and a stray
6241        // entity_aliases row pointing at it. The resolver must skip
6242        // it because `kind != 'entity'`.
6243        let mut mem = make_memory("Decoy", "test", Tier::Long, 5);
6244        mem.metadata = serde_json::json!({});
6245        let mid = insert(&conn, &mem).unwrap();
6246        let now = chrono::Utc::now().to_rfc3339();
6247        conn.execute(
6248            "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
6249            params![&mid, "decoy", &now],
6250        )
6251        .unwrap();
6252        let got = entity_get_by_alias(&conn, "decoy", None).unwrap();
6253        assert!(got.is_none(), "non-entity memories must not resolve");
6254    }
6255
6256    #[test]
6257    fn entity_register_idempotent_aliases_are_deduped() {
6258        let conn = test_db();
6259        let reg = entity_register(
6260            &conn,
6261            "Dedup",
6262            "test",
6263            &["x".to_string(), "x".to_string(), "y".to_string()],
6264            &serde_json::json!({}),
6265            None,
6266        )
6267        .unwrap();
6268        // INSERT OR IGNORE collapses the duplicate "x".
6269        assert_eq!(reg.aliases.len(), 2);
6270        assert!(reg.aliases.contains(&"x".to_string()));
6271        assert!(reg.aliases.contains(&"y".to_string()));
6272    }
6273
6274    // -- Pillar 2 / Stream C — kg_timeline ---------------------------------
6275
6276    /// Insert a link with an explicit `valid_from` so timeline tests can
6277    /// pin event ordering without relying on wall-clock spread.
6278    fn insert_link_at(
6279        conn: &Connection,
6280        source_id: &str,
6281        target_id: &str,
6282        relation: &str,
6283        valid_from: &str,
6284    ) {
6285        let now = chrono::Utc::now().to_rfc3339();
6286        conn.execute(
6287            "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
6288             VALUES (?1, ?2, ?3, ?4, ?5)",
6289            params![source_id, target_id, relation, now, valid_from],
6290        )
6291        .unwrap();
6292    }
6293
6294    #[test]
6295    fn create_link_populates_valid_from_for_new_rows() {
6296        let conn = test_db();
6297        let src = make_memory("kg-src", "test", Tier::Long, 5);
6298        let tgt = make_memory("kg-tgt", "test", Tier::Long, 5);
6299        insert(&conn, &src).unwrap();
6300        insert(&conn, &tgt).unwrap();
6301        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6302        let valid_from: Option<String> = conn
6303            .query_row(
6304                "SELECT valid_from FROM memory_links WHERE source_id = ?1",
6305                params![&src.id],
6306                |r| r.get(0),
6307            )
6308            .unwrap();
6309        assert!(
6310            valid_from.is_some(),
6311            "create_link must populate valid_from so kg_timeline can see new links"
6312        );
6313    }
6314
6315    #[test]
6316    fn kg_timeline_returns_events_ordered_by_valid_from_ascending() {
6317        let conn = test_db();
6318        let src = make_memory("alpha", "kg/projects/alpha", Tier::Long, 5);
6319        let s1 = make_memory("kickoff", "kg/projects/alpha", Tier::Long, 5);
6320        let s2 = make_memory("design phase", "kg/projects/alpha", Tier::Long, 5);
6321        let s3 = make_memory("implementation", "kg/projects/alpha", Tier::Long, 5);
6322        insert(&conn, &src).unwrap();
6323        insert(&conn, &s1).unwrap();
6324        insert(&conn, &s2).unwrap();
6325        insert(&conn, &s3).unwrap();
6326
6327        // Insert in a deliberately-shuffled order so ORDER BY isn't
6328        // a happy accident of insertion order.
6329        insert_link_at(
6330            &conn,
6331            &src.id,
6332            &s2.id,
6333            "supersedes",
6334            "2026-02-03T00:00:00+00:00",
6335        );
6336        insert_link_at(
6337            &conn,
6338            &src.id,
6339            &s1.id,
6340            "related_to",
6341            "2026-01-15T00:00:00+00:00",
6342        );
6343        insert_link_at(
6344            &conn,
6345            &src.id,
6346            &s3.id,
6347            "supersedes",
6348            "2026-03-22T00:00:00+00:00",
6349        );
6350
6351        let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
6352        assert_eq!(events.len(), 3);
6353        assert_eq!(events[0].target_id, s1.id);
6354        assert_eq!(events[1].target_id, s2.id);
6355        assert_eq!(events[2].target_id, s3.id);
6356        assert_eq!(events[0].title, "kickoff");
6357        assert_eq!(events[1].relation, "supersedes");
6358        assert_eq!(events[0].target_namespace, "kg/projects/alpha");
6359    }
6360
6361    #[test]
6362    fn kg_timeline_filters_by_since_inclusive() {
6363        let conn = test_db();
6364        let src = make_memory("e", "ns", Tier::Long, 5);
6365        let t1 = make_memory("e1", "ns", Tier::Long, 5);
6366        let t2 = make_memory("e2", "ns", Tier::Long, 5);
6367        insert(&conn, &src).unwrap();
6368        insert(&conn, &t1).unwrap();
6369        insert(&conn, &t2).unwrap();
6370        insert_link_at(&conn, &src.id, &t1.id, "rel", "2026-01-01T00:00:00+00:00");
6371        insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-03-01T00:00:00+00:00");
6372
6373        let events = kg_timeline(
6374            &conn,
6375            &src.id,
6376            Some("2026-02-01T00:00:00+00:00"),
6377            None,
6378            None,
6379        )
6380        .unwrap();
6381        assert_eq!(events.len(), 1);
6382        assert_eq!(events[0].target_id, t2.id);
6383
6384        // Boundary: since == valid_from should match (inclusive).
6385        let on_boundary = kg_timeline(
6386            &conn,
6387            &src.id,
6388            Some("2026-03-01T00:00:00+00:00"),
6389            None,
6390            None,
6391        )
6392        .unwrap();
6393        assert_eq!(on_boundary.len(), 1);
6394    }
6395
6396    #[test]
6397    fn kg_timeline_filters_by_until_inclusive() {
6398        let conn = test_db();
6399        let src = make_memory("e", "ns", Tier::Long, 5);
6400        let t1 = make_memory("e1", "ns", Tier::Long, 5);
6401        let t2 = make_memory("e2", "ns", Tier::Long, 5);
6402        insert(&conn, &src).unwrap();
6403        insert(&conn, &t1).unwrap();
6404        insert(&conn, &t2).unwrap();
6405        insert_link_at(&conn, &src.id, &t1.id, "rel", "2026-01-01T00:00:00+00:00");
6406        insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-03-01T00:00:00+00:00");
6407
6408        let events = kg_timeline(
6409            &conn,
6410            &src.id,
6411            None,
6412            Some("2026-02-01T00:00:00+00:00"),
6413            None,
6414        )
6415        .unwrap();
6416        assert_eq!(events.len(), 1);
6417        assert_eq!(events[0].target_id, t1.id);
6418    }
6419
6420    #[test]
6421    fn kg_timeline_skips_links_with_null_valid_from() {
6422        let conn = test_db();
6423        let src = make_memory("s", "ns", Tier::Long, 5);
6424        let t1 = make_memory("t1", "ns", Tier::Long, 5);
6425        let t2 = make_memory("t2", "ns", Tier::Long, 5);
6426        insert(&conn, &src).unwrap();
6427        insert(&conn, &t1).unwrap();
6428        insert(&conn, &t2).unwrap();
6429        // Direct insert with NULL valid_from to simulate an external
6430        // writer that bypassed `create_link`.
6431        let now = chrono::Utc::now().to_rfc3339();
6432        conn.execute(
6433            "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
6434             VALUES (?1, ?2, 'rel', ?3, NULL)",
6435            params![&src.id, &t1.id, &now],
6436        )
6437        .unwrap();
6438        insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-01-01T00:00:00+00:00");
6439
6440        let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
6441        assert_eq!(events.len(), 1);
6442        assert_eq!(events[0].target_id, t2.id);
6443    }
6444
6445    #[test]
6446    fn kg_timeline_excludes_links_where_source_is_target() {
6447        // The query is anchored on `source_id`; inbound edges (where the
6448        // entity is the target) are intentionally NOT part of the
6449        // timeline. This guards against accidentally widening the
6450        // contract to a bidirectional view.
6451        let conn = test_db();
6452        let entity = make_memory("entity", "ns", Tier::Long, 5);
6453        let other = make_memory("other", "ns", Tier::Long, 5);
6454        insert(&conn, &entity).unwrap();
6455        insert(&conn, &other).unwrap();
6456        insert_link_at(
6457            &conn,
6458            &other.id,
6459            &entity.id,
6460            "rel",
6461            "2026-01-01T00:00:00+00:00",
6462        );
6463        let events = kg_timeline(&conn, &entity.id, None, None, None).unwrap();
6464        assert!(events.is_empty());
6465    }
6466
6467    #[test]
6468    fn kg_timeline_limit_clamped_to_max() {
6469        let conn = test_db();
6470        let src = make_memory("s", "ns", Tier::Long, 5);
6471        insert(&conn, &src).unwrap();
6472        for i in 0..5 {
6473            let t = make_memory(&format!("t{i}"), "ns", Tier::Long, 5);
6474            insert(&conn, &t).unwrap();
6475            insert_link_at(
6476                &conn,
6477                &src.id,
6478                &t.id,
6479                "rel",
6480                &format!("2026-01-0{}T00:00:00+00:00", i + 1),
6481            );
6482        }
6483        // Caller passes a wildly oversized limit — should be clamped
6484        // to KG_TIMELINE_MAX_LIMIT (i.e. accepted, not errored), and
6485        // since the row count is small, should return all 5.
6486        let events = kg_timeline(&conn, &src.id, None, None, Some(usize::MAX)).unwrap();
6487        assert_eq!(events.len(), 5);
6488
6489        // Caller passes 0 — clamp to 1.
6490        let one = kg_timeline(&conn, &src.id, None, None, Some(0)).unwrap();
6491        assert_eq!(one.len(), 1);
6492    }
6493
6494    #[test]
6495    fn kg_timeline_carries_observed_by_and_valid_until() {
6496        let conn = test_db();
6497        let src = make_memory("s", "ns", Tier::Long, 5);
6498        let t = make_memory("t", "ns", Tier::Long, 5);
6499        insert(&conn, &src).unwrap();
6500        insert(&conn, &t).unwrap();
6501        let now = chrono::Utc::now().to_rfc3339();
6502        conn.execute(
6503            "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from, valid_until, observed_by) \
6504             VALUES (?1, ?2, 'supersedes', ?3, '2026-01-01T00:00:00+00:00', '2026-12-31T23:59:59+00:00', 'agent-pm-1')",
6505            params![&src.id, &t.id, &now],
6506        )
6507        .unwrap();
6508        let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
6509        assert_eq!(events.len(), 1);
6510        assert_eq!(events[0].observed_by.as_deref(), Some("agent-pm-1"));
6511        assert_eq!(
6512            events[0].valid_until.as_deref(),
6513            Some("2026-12-31T23:59:59+00:00")
6514        );
6515    }
6516
6517    #[test]
6518    fn kg_timeline_empty_for_unknown_source() {
6519        let conn = test_db();
6520        let events = kg_timeline(&conn, "nonexistent-id", None, None, None).unwrap();
6521        assert!(events.is_empty());
6522    }
6523
6524    // -- Pillar 2 / Stream C — kg_invalidate -------------------------------
6525
6526    #[test]
6527    fn invalidate_link_sets_valid_until_to_provided_timestamp() {
6528        let conn = test_db();
6529        let src = make_memory("inv-s", "test", Tier::Long, 5);
6530        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6531        insert(&conn, &src).unwrap();
6532        insert(&conn, &tgt).unwrap();
6533        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6534        let stamp = "2026-12-31T23:59:59+00:00";
6535        let res = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(stamp))
6536            .unwrap()
6537            .expect("link must exist");
6538        assert_eq!(res.valid_until, stamp);
6539        assert!(res.previous_valid_until.is_none());
6540        let stored: Option<String> = conn
6541            .query_row(
6542                "SELECT valid_until FROM memory_links \
6543                 WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
6544                params![&src.id, &tgt.id, "related_to"],
6545                |r| r.get(0),
6546            )
6547            .unwrap();
6548        assert_eq!(stored.as_deref(), Some(stamp));
6549    }
6550
6551    #[test]
6552    fn invalidate_link_defaults_to_now_when_no_timestamp_provided() {
6553        let conn = test_db();
6554        let src = make_memory("inv-s", "test", Tier::Long, 5);
6555        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6556        insert(&conn, &src).unwrap();
6557        insert(&conn, &tgt).unwrap();
6558        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6559        let res = invalidate_link(&conn, &src.id, &tgt.id, "related_to", None)
6560            .unwrap()
6561            .expect("link must exist");
6562        // The default is wall-clock now; assert it parses as RFC3339 and
6563        // is within a small window of the test's "now" (allow 60s skew
6564        // to accommodate slow runners).
6565        let parsed = chrono::DateTime::parse_from_rfc3339(&res.valid_until)
6566            .expect("default valid_until must be RFC3339");
6567        let now = chrono::Utc::now();
6568        let drift = now.signed_duration_since(parsed.with_timezone(&chrono::Utc));
6569        assert!(
6570            drift.num_seconds().abs() < 60,
6571            "default valid_until {} should be near now {now}",
6572            res.valid_until
6573        );
6574    }
6575
6576    #[test]
6577    fn invalidate_link_returns_none_for_unknown_triple() {
6578        let conn = test_db();
6579        // No memories or links created.
6580        let res = invalidate_link(&conn, "missing-src", "missing-tgt", "related_to", None).unwrap();
6581        assert!(res.is_none());
6582    }
6583
6584    #[test]
6585    fn invalidate_link_returns_none_when_relation_does_not_match() {
6586        // Link exists for ("related_to") but caller asks for ("supersedes").
6587        let conn = test_db();
6588        let src = make_memory("inv-s", "test", Tier::Long, 5);
6589        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6590        insert(&conn, &src).unwrap();
6591        insert(&conn, &tgt).unwrap();
6592        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6593        let res = invalidate_link(&conn, &src.id, &tgt.id, "supersedes", None).unwrap();
6594        assert!(res.is_none(), "must not match across relation values");
6595    }
6596
6597    #[test]
6598    fn invalidate_link_overwrites_existing_valid_until_and_reports_prior() {
6599        let conn = test_db();
6600        let src = make_memory("inv-s", "test", Tier::Long, 5);
6601        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6602        insert(&conn, &src).unwrap();
6603        insert(&conn, &tgt).unwrap();
6604        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6605        let first = "2026-06-01T00:00:00+00:00";
6606        let second = "2026-12-01T00:00:00+00:00";
6607        let r1 = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(first))
6608            .unwrap()
6609            .unwrap();
6610        assert!(r1.previous_valid_until.is_none());
6611        let r2 = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(second))
6612            .unwrap()
6613            .unwrap();
6614        assert_eq!(r2.previous_valid_until.as_deref(), Some(first));
6615        assert_eq!(r2.valid_until, second);
6616    }
6617
6618    #[test]
6619    fn invalidate_link_distinguishes_relation_when_multiple_links_share_endpoints() {
6620        // Two links between the same pair, different relations. Invalidating
6621        // one must not affect the other.
6622        let conn = test_db();
6623        let src = make_memory("inv-s", "test", Tier::Long, 5);
6624        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6625        insert(&conn, &src).unwrap();
6626        insert(&conn, &tgt).unwrap();
6627        create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
6628        create_link(&conn, &src.id, &tgt.id, "supersedes").unwrap();
6629        let stamp = "2026-07-15T12:00:00+00:00";
6630        invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(stamp))
6631            .unwrap()
6632            .unwrap();
6633        let related: Option<String> = conn
6634            .query_row(
6635                "SELECT valid_until FROM memory_links \
6636                 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'related_to'",
6637                params![&src.id, &tgt.id],
6638                |r| r.get(0),
6639            )
6640            .unwrap();
6641        let supers: Option<String> = conn
6642            .query_row(
6643                "SELECT valid_until FROM memory_links \
6644                 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'supersedes'",
6645                params![&src.id, &tgt.id],
6646                |r| r.get(0),
6647            )
6648            .unwrap();
6649        assert_eq!(related.as_deref(), Some(stamp));
6650        assert!(
6651            supers.is_none(),
6652            "the sibling 'supersedes' link must remain valid"
6653        );
6654    }
6655
6656    #[test]
6657    fn invalidate_link_preserves_other_columns() {
6658        // valid_from, observed_by, created_at, signature must not be
6659        // touched by the invalidate UPDATE.
6660        let conn = test_db();
6661        let src = make_memory("inv-s", "test", Tier::Long, 5);
6662        let tgt = make_memory("inv-t", "test", Tier::Long, 5);
6663        insert(&conn, &src).unwrap();
6664        insert(&conn, &tgt).unwrap();
6665        let now = chrono::Utc::now().to_rfc3339();
6666        conn.execute(
6667            "INSERT INTO memory_links \
6668             (source_id, target_id, relation, created_at, valid_from, observed_by) \
6669             VALUES (?1, ?2, 'related_to', ?3, '2026-01-01T00:00:00+00:00', 'agent-x')",
6670            params![&src.id, &tgt.id, &now],
6671        )
6672        .unwrap();
6673        invalidate_link(
6674            &conn,
6675            &src.id,
6676            &tgt.id,
6677            "related_to",
6678            Some("2026-12-31T23:59:59+00:00"),
6679        )
6680        .unwrap()
6681        .unwrap();
6682        let (vf, ob, ca): (Option<String>, Option<String>, String) = conn
6683            .query_row(
6684                "SELECT valid_from, observed_by, created_at FROM memory_links \
6685                 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'related_to'",
6686                params![&src.id, &tgt.id],
6687                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
6688            )
6689            .unwrap();
6690        assert_eq!(vf.as_deref(), Some("2026-01-01T00:00:00+00:00"));
6691        assert_eq!(ob.as_deref(), Some("agent-x"));
6692        assert_eq!(ca, now);
6693    }
6694
6695    // -- Pillar 2 / Stream C — kg_query (depth=1) ---------------------------
6696
6697    /// Insert a link with explicit `temporal/observed_by` columns so the
6698    /// `kg_query` filter tests can pin behavior without relying on
6699    /// wall-clock spread.
6700    fn insert_link_full(
6701        conn: &Connection,
6702        source_id: &str,
6703        target_id: &str,
6704        relation: &str,
6705        valid_from: Option<&str>,
6706        valid_until: Option<&str>,
6707        observed_by: Option<&str>,
6708    ) {
6709        let now = chrono::Utc::now().to_rfc3339();
6710        conn.execute(
6711            "INSERT INTO memory_links \
6712             (source_id, target_id, relation, created_at, valid_from, valid_until, observed_by) \
6713             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
6714            params![
6715                source_id,
6716                target_id,
6717                relation,
6718                now,
6719                valid_from,
6720                valid_until,
6721                observed_by
6722            ],
6723        )
6724        .unwrap();
6725    }
6726
6727    #[test]
6728    fn kg_query_returns_outbound_neighbors_at_depth_1() {
6729        let conn = test_db();
6730        let src = make_memory("alpha", "kg/projects/alpha", Tier::Long, 5);
6731        let n1 = make_memory("kickoff", "kg/projects/alpha", Tier::Long, 5);
6732        let n2 = make_memory("design", "kg/projects/alpha", Tier::Long, 5);
6733        insert(&conn, &src).unwrap();
6734        insert(&conn, &n1).unwrap();
6735        insert(&conn, &n2).unwrap();
6736        insert_link_full(
6737            &conn,
6738            &src.id,
6739            &n1.id,
6740            "related_to",
6741            Some("2026-01-15T00:00:00+00:00"),
6742            None,
6743            Some("agent-1"),
6744        );
6745        insert_link_full(
6746            &conn,
6747            &src.id,
6748            &n2.id,
6749            "supersedes",
6750            Some("2026-02-03T00:00:00+00:00"),
6751            None,
6752            Some("agent-2"),
6753        );
6754
6755        let nodes = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
6756        assert_eq!(nodes.len(), 2);
6757        // Ordered by COALESCE(valid_from, created_at) ASC.
6758        assert_eq!(nodes[0].target_id, n1.id);
6759        assert_eq!(nodes[1].target_id, n2.id);
6760        assert_eq!(nodes[0].title, "kickoff");
6761        assert_eq!(nodes[0].relation, "related_to");
6762        assert_eq!(nodes[0].observed_by.as_deref(), Some("agent-1"));
6763        assert_eq!(nodes[0].depth, 1);
6764        assert_eq!(nodes[0].path, format!("{}->{}", src.id, n1.id));
6765        assert_eq!(nodes[0].target_namespace, "kg/projects/alpha");
6766    }
6767
6768    #[test]
6769    fn kg_query_filters_by_valid_at_window() {
6770        let conn = test_db();
6771        let src = make_memory("e", "ns", Tier::Long, 5);
6772        let t1 = make_memory("e1", "ns", Tier::Long, 5);
6773        let t2 = make_memory("e2", "ns", Tier::Long, 5);
6774        insert(&conn, &src).unwrap();
6775        insert(&conn, &t1).unwrap();
6776        insert(&conn, &t2).unwrap();
6777        // t1 valid 2026-01-01 → 2026-02-01; t2 valid from 2026-03-01.
6778        insert_link_full(
6779            &conn,
6780            &src.id,
6781            &t1.id,
6782            "related_to",
6783            Some("2026-01-01T00:00:00+00:00"),
6784            Some("2026-02-01T00:00:00+00:00"),
6785            None,
6786        );
6787        insert_link_full(
6788            &conn,
6789            &src.id,
6790            &t2.id,
6791            "related_to",
6792            Some("2026-03-01T00:00:00+00:00"),
6793            None,
6794            None,
6795        );
6796
6797        // At 2026-01-15 only t1 is valid.
6798        let n_jan = kg_query(
6799            &conn,
6800            &src.id,
6801            1,
6802            Some("2026-01-15T00:00:00+00:00"),
6803            None,
6804            None,
6805        )
6806        .unwrap();
6807        assert_eq!(n_jan.len(), 1);
6808        assert_eq!(n_jan[0].target_id, t1.id);
6809
6810        // At 2026-02-15 the first link is closed, the second hasn't
6811        // started yet — empty.
6812        let n_feb = kg_query(
6813            &conn,
6814            &src.id,
6815            1,
6816            Some("2026-02-15T00:00:00+00:00"),
6817            None,
6818            None,
6819        )
6820        .unwrap();
6821        assert!(n_feb.is_empty());
6822
6823        // At 2026-04-01 only t2 is valid.
6824        let n_apr = kg_query(
6825            &conn,
6826            &src.id,
6827            1,
6828            Some("2026-04-01T00:00:00+00:00"),
6829            None,
6830            None,
6831        )
6832        .unwrap();
6833        assert_eq!(n_apr.len(), 1);
6834        assert_eq!(n_apr[0].target_id, t2.id);
6835    }
6836
6837    #[test]
6838    fn kg_query_skips_null_valid_from_when_valid_at_filter_active() {
6839        let conn = test_db();
6840        let src = make_memory("s", "ns", Tier::Long, 5);
6841        let t = make_memory("t", "ns", Tier::Long, 5);
6842        insert(&conn, &src).unwrap();
6843        insert(&conn, &t).unwrap();
6844        // Link with NULL valid_from — must be invisible to a temporally
6845        // scoped query (we cannot tell if it was valid at any point).
6846        insert_link_full(&conn, &src.id, &t.id, "related_to", None, None, None);
6847
6848        let with_filter = kg_query(
6849            &conn,
6850            &src.id,
6851            1,
6852            Some("2026-01-15T00:00:00+00:00"),
6853            None,
6854            None,
6855        )
6856        .unwrap();
6857        assert!(with_filter.is_empty());
6858
6859        // Without the filter, the same link IS returned.
6860        let without = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
6861        assert_eq!(without.len(), 1);
6862        assert_eq!(without[0].target_id, t.id);
6863    }
6864
6865    #[test]
6866    fn kg_query_filters_by_allowed_agents() {
6867        let conn = test_db();
6868        let src = make_memory("s", "ns", Tier::Long, 5);
6869        let t1 = make_memory("t1", "ns", Tier::Long, 5);
6870        let t2 = make_memory("t2", "ns", Tier::Long, 5);
6871        let t3 = make_memory("t3", "ns", Tier::Long, 5);
6872        insert(&conn, &src).unwrap();
6873        insert(&conn, &t1).unwrap();
6874        insert(&conn, &t2).unwrap();
6875        insert(&conn, &t3).unwrap();
6876        insert_link_full(
6877            &conn,
6878            &src.id,
6879            &t1.id,
6880            "related_to",
6881            Some("2026-01-01T00:00:00+00:00"),
6882            None,
6883            Some("agent-a"),
6884        );
6885        insert_link_full(
6886            &conn,
6887            &src.id,
6888            &t2.id,
6889            "related_to",
6890            Some("2026-01-02T00:00:00+00:00"),
6891            None,
6892            Some("agent-b"),
6893        );
6894        // Link with NULL observed_by must be excluded once the agent
6895        // filter is active (`NULL IN (...)` is NULL/false in SQLite).
6896        insert_link_full(
6897            &conn,
6898            &src.id,
6899            &t3.id,
6900            "related_to",
6901            Some("2026-01-03T00:00:00+00:00"),
6902            None,
6903            None,
6904        );
6905
6906        let allow_a = vec!["agent-a".to_string()];
6907        let only_a = kg_query(&conn, &src.id, 1, None, Some(&allow_a), None).unwrap();
6908        assert_eq!(only_a.len(), 1);
6909        assert_eq!(only_a[0].target_id, t1.id);
6910
6911        let allow_both = vec!["agent-a".to_string(), "agent-b".to_string()];
6912        let both = kg_query(&conn, &src.id, 1, None, Some(&allow_both), None).unwrap();
6913        assert_eq!(both.len(), 2);
6914    }
6915
6916    #[test]
6917    fn kg_query_empty_allowed_agents_returns_zero_rows() {
6918        let conn = test_db();
6919        let src = make_memory("s", "ns", Tier::Long, 5);
6920        let t = make_memory("t", "ns", Tier::Long, 5);
6921        insert(&conn, &src).unwrap();
6922        insert(&conn, &t).unwrap();
6923        insert_link_full(
6924            &conn,
6925            &src.id,
6926            &t.id,
6927            "related_to",
6928            Some("2026-01-01T00:00:00+00:00"),
6929            None,
6930            Some("agent-a"),
6931        );
6932
6933        // Sanity: no filter returns the link.
6934        let unfiltered = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
6935        assert_eq!(unfiltered.len(), 1);
6936
6937        // Empty allowlist == "no agents trusted" — must return zero
6938        // rows, not silently fall through to the unfiltered path.
6939        let empty: Vec<String> = Vec::new();
6940        let none = kg_query(&conn, &src.id, 1, None, Some(&empty), None).unwrap();
6941        assert!(none.is_empty());
6942    }
6943
6944    #[test]
6945    fn kg_query_rejects_max_depth_zero() {
6946        let conn = test_db();
6947        let src = make_memory("s", "ns", Tier::Long, 5);
6948        insert(&conn, &src).unwrap();
6949        let err = kg_query(&conn, &src.id, 0, None, None, None).unwrap_err();
6950        assert!(err.to_string().contains("max_depth"));
6951    }
6952
6953    #[test]
6954    fn kg_query_rejects_unsupported_max_depth() {
6955        // The recursive-CTE slice supports depth 1..=5; passing 6+ must
6956        // produce an explicit error so callers learn they hit the
6957        // ceiling rather than receiving a partial graph.
6958        let conn = test_db();
6959        let src = make_memory("s", "ns", Tier::Long, 5);
6960        insert(&conn, &src).unwrap();
6961        let err = kg_query(
6962            &conn,
6963            &src.id,
6964            KG_QUERY_MAX_SUPPORTED_DEPTH + 1,
6965            None,
6966            None,
6967            None,
6968        )
6969        .unwrap_err();
6970        let msg = err.to_string();
6971        assert!(msg.contains(&format!("max_depth={}", KG_QUERY_MAX_SUPPORTED_DEPTH + 1)));
6972        assert!(msg.contains(&format!("supported depth={KG_QUERY_MAX_SUPPORTED_DEPTH}")));
6973    }
6974
6975    #[test]
6976    fn kg_query_traverses_multiple_hops() {
6977        // src -> mid -> leaf. depth=2 must return both hops, with
6978        // depth/path reflecting the chain.
6979        let conn = test_db();
6980        let src = make_memory("src", "ns", Tier::Long, 5);
6981        let mid = make_memory("mid", "ns", Tier::Long, 5);
6982        let leaf = make_memory("leaf", "ns", Tier::Long, 5);
6983        insert(&conn, &src).unwrap();
6984        insert(&conn, &mid).unwrap();
6985        insert(&conn, &leaf).unwrap();
6986        insert_link_full(
6987            &conn,
6988            &src.id,
6989            &mid.id,
6990            "related_to",
6991            Some("2026-01-01T00:00:00+00:00"),
6992            None,
6993            Some("agent-x"),
6994        );
6995        insert_link_full(
6996            &conn,
6997            &mid.id,
6998            &leaf.id,
6999            "supersedes",
7000            Some("2026-01-02T00:00:00+00:00"),
7001            None,
7002            Some("agent-x"),
7003        );
7004
7005        // depth=1 sees only mid.
7006        let d1 = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
7007        assert_eq!(d1.len(), 1);
7008        assert_eq!(d1[0].target_id, mid.id);
7009        assert_eq!(d1[0].depth, 1);
7010
7011        // depth=2 sees both, ordered shallow-first.
7012        let d2 = kg_query(&conn, &src.id, 2, None, None, None).unwrap();
7013        assert_eq!(d2.len(), 2);
7014        assert_eq!(d2[0].target_id, mid.id);
7015        assert_eq!(d2[0].depth, 1);
7016        assert_eq!(d2[0].path, format!("{}->{}", src.id, mid.id));
7017        assert_eq!(d2[1].target_id, leaf.id);
7018        assert_eq!(d2[1].depth, 2);
7019        assert_eq!(d2[1].relation, "supersedes");
7020        assert_eq!(d2[1].path, format!("{}->{}->{}", src.id, mid.id, leaf.id));
7021    }
7022
7023    #[test]
7024    fn kg_query_multi_hop_respects_valid_at_per_hop() {
7025        // src -> mid valid 2026-01..02; mid -> leaf valid 2026-04+.
7026        // At valid_at=2026-01-15 the second hop is not yet valid, so
7027        // only mid is returned; at valid_at=2026-04-15 the first hop is
7028        // closed, so both are filtered out.
7029        let conn = test_db();
7030        let src = make_memory("s", "ns", Tier::Long, 5);
7031        let mid = make_memory("m", "ns", Tier::Long, 5);
7032        let leaf = make_memory("l", "ns", Tier::Long, 5);
7033        insert(&conn, &src).unwrap();
7034        insert(&conn, &mid).unwrap();
7035        insert(&conn, &leaf).unwrap();
7036        insert_link_full(
7037            &conn,
7038            &src.id,
7039            &mid.id,
7040            "related_to",
7041            Some("2026-01-01T00:00:00+00:00"),
7042            Some("2026-02-01T00:00:00+00:00"),
7043            None,
7044        );
7045        insert_link_full(
7046            &conn,
7047            &mid.id,
7048            &leaf.id,
7049            "related_to",
7050            Some("2026-04-01T00:00:00+00:00"),
7051            None,
7052            None,
7053        );
7054
7055        let mid_only = kg_query(
7056            &conn,
7057            &src.id,
7058            3,
7059            Some("2026-01-15T00:00:00+00:00"),
7060            None,
7061            None,
7062        )
7063        .unwrap();
7064        assert_eq!(mid_only.len(), 1);
7065        assert_eq!(mid_only[0].target_id, mid.id);
7066
7067        let neither = kg_query(
7068            &conn,
7069            &src.id,
7070            3,
7071            Some("2026-04-15T00:00:00+00:00"),
7072            None,
7073            None,
7074        )
7075        .unwrap();
7076        assert!(neither.is_empty());
7077    }
7078
7079    #[test]
7080    fn kg_query_detects_cycles() {
7081        // a -> b -> c -> a forms a cycle. Even with max_depth=5, the
7082        // traversal must stop revisiting nodes that are already on the
7083        // path; the result lists each reachable node at most once.
7084        let conn = test_db();
7085        let a = make_memory("a", "ns", Tier::Long, 5);
7086        let b = make_memory("b", "ns", Tier::Long, 5);
7087        let c = make_memory("c", "ns", Tier::Long, 5);
7088        insert(&conn, &a).unwrap();
7089        insert(&conn, &b).unwrap();
7090        insert(&conn, &c).unwrap();
7091        insert_link_full(
7092            &conn,
7093            &a.id,
7094            &b.id,
7095            "related_to",
7096            Some("2026-01-01T00:00:00+00:00"),
7097            None,
7098            None,
7099        );
7100        insert_link_full(
7101            &conn,
7102            &b.id,
7103            &c.id,
7104            "related_to",
7105            Some("2026-01-02T00:00:00+00:00"),
7106            None,
7107            None,
7108        );
7109        insert_link_full(
7110            &conn,
7111            &c.id,
7112            &a.id,
7113            "related_to",
7114            Some("2026-01-03T00:00:00+00:00"),
7115            None,
7116            None,
7117        );
7118
7119        let nodes = kg_query(&conn, &a.id, 5, None, None, None).unwrap();
7120        // Expect b at depth 1 and c at depth 2; the cycle back to a is
7121        // pruned. (The c->a edge could in principle surface a again at
7122        // depth 3, but only if a is not on its own path — and the
7123        // anchor seeds path with `a->b`, so a IS on every descendant
7124        // path through b/c.)
7125        assert_eq!(nodes.len(), 2);
7126        assert_eq!(nodes[0].target_id, b.id);
7127        assert_eq!(nodes[0].depth, 1);
7128        assert_eq!(nodes[1].target_id, c.id);
7129        assert_eq!(nodes[1].depth, 2);
7130    }
7131
7132    #[test]
7133    fn kg_query_multi_hop_filters_by_allowed_agents_per_hop() {
7134        // src -> mid (agent-a), mid -> leaf (agent-b). With allow=[a]
7135        // only the first hop survives; with allow=[a,b] both surface.
7136        let conn = test_db();
7137        let src = make_memory("s", "ns", Tier::Long, 5);
7138        let mid = make_memory("m", "ns", Tier::Long, 5);
7139        let leaf = make_memory("l", "ns", Tier::Long, 5);
7140        insert(&conn, &src).unwrap();
7141        insert(&conn, &mid).unwrap();
7142        insert(&conn, &leaf).unwrap();
7143        insert_link_full(
7144            &conn,
7145            &src.id,
7146            &mid.id,
7147            "related_to",
7148            Some("2026-01-01T00:00:00+00:00"),
7149            None,
7150            Some("agent-a"),
7151        );
7152        insert_link_full(
7153            &conn,
7154            &mid.id,
7155            &leaf.id,
7156            "related_to",
7157            Some("2026-01-02T00:00:00+00:00"),
7158            None,
7159            Some("agent-b"),
7160        );
7161
7162        let allow_a = vec!["agent-a".to_string()];
7163        let only_first = kg_query(&conn, &src.id, 3, None, Some(&allow_a), None).unwrap();
7164        assert_eq!(only_first.len(), 1);
7165        assert_eq!(only_first[0].target_id, mid.id);
7166
7167        let allow_both = vec!["agent-a".to_string(), "agent-b".to_string()];
7168        let both = kg_query(&conn, &src.id, 3, None, Some(&allow_both), None).unwrap();
7169        assert_eq!(both.len(), 2);
7170        assert_eq!(both[1].target_id, leaf.id);
7171        assert_eq!(both[1].depth, 2);
7172    }
7173
7174    #[test]
7175    fn kg_query_limit_clamped_to_max() {
7176        let conn = test_db();
7177        let src = make_memory("s", "ns", Tier::Long, 5);
7178        insert(&conn, &src).unwrap();
7179        for i in 0..3 {
7180            let t = make_memory(&format!("t{i}"), "ns", Tier::Long, 5);
7181            insert(&conn, &t).unwrap();
7182            insert_link_full(
7183                &conn,
7184                &src.id,
7185                &t.id,
7186                "related_to",
7187                Some(&format!("2026-01-{:02}T00:00:00+00:00", i + 1)),
7188                None,
7189                None,
7190            );
7191        }
7192
7193        // limit=usize::MAX clamps to KG_QUERY_MAX_LIMIT (1000),
7194        // which is bigger than our 3 rows — all returned.
7195        let all = kg_query(&conn, &src.id, 1, None, None, Some(usize::MAX)).unwrap();
7196        assert_eq!(all.len(), 3);
7197
7198        // limit=0 clamps up to 1.
7199        let one = kg_query(&conn, &src.id, 1, None, None, Some(0)).unwrap();
7200        assert_eq!(one.len(), 1);
7201    }
7202
7203    #[test]
7204    fn kg_query_empty_for_unknown_source() {
7205        let conn = test_db();
7206        let nodes = kg_query(&conn, "no-such-id", 1, None, None, None).unwrap();
7207        assert!(nodes.is_empty());
7208    }
7209
7210    #[test]
7211    fn schema_v15_existing_links_get_valid_from_backfilled() {
7212        // Simulate a v14 database with one link, then re-run the
7213        // v15 migration and assert valid_from was backfilled to the
7214        // source memory's created_at. We do this by opening a fresh
7215        // db (which is at v15), inserting a link with NULL valid_from,
7216        // rolling schema_version back to 14, and re-opening to force
7217        // the v15 block to re-execute the backfill UPDATE.
7218        let path = std::env::temp_dir().join(format!(
7219            "ai_memory_v15_backfill_{}.db",
7220            uuid::Uuid::new_v4()
7221        ));
7222        {
7223            let conn = open(&path).unwrap();
7224            let src = make_memory("src", "test", Tier::Long, 5);
7225            let tgt = make_memory("tgt", "test", Tier::Long, 5);
7226            insert(&conn, &src).unwrap();
7227            insert(&conn, &tgt).unwrap();
7228            // Insert a link directly with NULL valid_from to mimic
7229            // pre-migration state.
7230            conn.execute(
7231                "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
7232                 VALUES (?1, ?2, 'related_to', ?3, NULL)",
7233                params![&src.id, &tgt.id, &chrono::Utc::now().to_rfc3339()],
7234            )
7235            .unwrap();
7236            // Roll schema back to v14 and re-run migrate via re-open.
7237            conn.execute("DELETE FROM schema_version", []).unwrap();
7238            conn.execute("INSERT INTO schema_version (version) VALUES (14)", [])
7239                .unwrap();
7240        }
7241
7242        let conn2 = open(&path).unwrap();
7243        let backfilled: Option<String> = conn2
7244            .query_row("SELECT valid_from FROM memory_links LIMIT 1", [], |r| {
7245                r.get(0)
7246            })
7247            .unwrap();
7248        assert!(
7249            backfilled.is_some(),
7250            "expected valid_from to be backfilled, got NULL"
7251        );
7252        let _ = std::fs::remove_file(&path);
7253    }
7254
7255    #[test]
7256    fn namespace_prefix_query_index_available() {
7257        let conn = test_db();
7258        // SQLite's default BINARY collation supports prefix-matching LIKE queries
7259        // with the idx_memories_namespace index. Verify the index exists and a
7260        // simple prefix query can execute (EXPLAIN QUERY PLAN output varies by
7261        // SQLite version and query planner heuristics, so we just check that the
7262        // query completes without error).
7263        let result: Option<String> = conn
7264            .query_row(
7265                "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_memories_namespace'",
7266                [],
7267                |r| r.get(0),
7268            )
7269            .unwrap();
7270        assert_eq!(
7271            result,
7272            Some("idx_memories_namespace".to_string()),
7273            "idx_memories_namespace index should exist"
7274        );
7275
7276        // Execute a prefix LIKE query to ensure it compiles and runs
7277        let count: i64 = conn
7278            .query_row(
7279                "SELECT COUNT(*) FROM memories WHERE namespace LIKE 'test/%'",
7280                [],
7281                |r| r.get(0),
7282            )
7283            .unwrap();
7284        assert_eq!(count, 0);
7285    }
7286}