1use anyhow::{Context, Result};
5use chrono::Utc;
6use rusqlite::{Connection, params};
7use std::collections::HashMap;
8use std::path::Path;
9
10use crate::models::{
11 AGENTS_NAMESPACE, AgentRegistration, Approval, ApproverType, DuplicateCheck, DuplicateMatch,
12 GovernanceDecision, GovernanceLevel, GovernancePolicy, GovernedAction, MAX_NAMESPACE_DEPTH,
13 Memory, MemoryLink, NamespaceCount, PROMOTION_THRESHOLD, PendingAction, Stats, Taxonomy,
14 TaxonomyNode, Tier, TierCount, namespace_ancestors,
15};
16
17type VisibilityPrefixes = (
21 Option<String>,
22 Option<String>,
23 Option<String>,
24 Option<String>,
25);
26
27fn compute_visibility_prefixes(as_agent: Option<&str>) -> VisibilityPrefixes {
28 let Some(ns) = as_agent else {
29 return (None, None, None, None);
30 };
31 let ancestors = namespace_ancestors(ns);
32 let p = ancestors.first().cloned();
33 let t = ancestors.get(1).cloned();
34 let u = ancestors.get(2).cloned();
35 let o = ancestors.get(3).cloned();
36 (p, t, u, o)
37}
38
39fn is_visible(mem: &Memory, prefixes: &VisibilityPrefixes) -> bool {
44 let (p, t, u, o) = prefixes;
45 if p.is_none() {
46 return true;
47 }
48 let scope = mem
49 .metadata
50 .get("scope")
51 .and_then(|v| v.as_str())
52 .unwrap_or("private");
53 match scope {
54 "collective" => true,
55 "private" => p.as_ref().is_some_and(|ns| &mem.namespace == ns),
56 "team" => matches_subtree(&mem.namespace, t.as_deref()),
57 "unit" => matches_subtree(&mem.namespace, u.as_deref()),
58 "org" => matches_subtree(&mem.namespace, o.as_deref()),
59 _ => false,
60 }
61}
62
63fn matches_subtree(namespace: &str, prefix: Option<&str>) -> bool {
64 match prefix {
65 None => false,
66 Some(p) => namespace == p || namespace.starts_with(&format!("{p}/")),
67 }
68}
69
70fn visibility_clause(start: usize, table_alias: &str) -> String {
93 let private_ph = start;
94 let team_ph = start + 1;
95 let unit_ph = start + 2;
96 let org_ph = start + 3;
97 let ta = table_alias;
98 format!(
99 "AND (\
100 ?{private_ph} IS NULL \
101 OR {ta}.scope_idx = 'collective' \
102 OR ({ta}.scope_idx = 'private' AND {ta}.namespace = ?{private_ph}) \
103 OR ({ta}.scope_idx = 'team' AND ?{team_ph} IS NOT NULL AND ({ta}.namespace = ?{team_ph} OR {ta}.namespace LIKE replace(replace(?{team_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\')) \
104 OR ({ta}.scope_idx = 'unit' AND ?{unit_ph} IS NOT NULL AND ({ta}.namespace = ?{unit_ph} OR {ta}.namespace LIKE replace(replace(?{unit_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\')) \
105 OR ({ta}.scope_idx = 'org' AND ?{org_ph} IS NOT NULL AND ({ta}.namespace = ?{org_ph} OR {ta}.namespace LIKE replace(replace(?{org_ph}, '%', '\\%'), '_', '\\_') || '/%' ESCAPE '\\'))\
106 )"
107 )
108}
109
110const SCHEMA: &str = r"
111CREATE TABLE IF NOT EXISTS memories (
112 id TEXT PRIMARY KEY,
113 tier TEXT NOT NULL,
114 namespace TEXT NOT NULL DEFAULT 'global',
115 title TEXT NOT NULL,
116 content TEXT NOT NULL,
117 tags TEXT NOT NULL DEFAULT '[]',
118 priority INTEGER NOT NULL DEFAULT 5,
119 confidence REAL NOT NULL DEFAULT 1.0,
120 source TEXT NOT NULL DEFAULT 'api',
121 access_count INTEGER NOT NULL DEFAULT 0,
122 created_at TEXT NOT NULL,
123 updated_at TEXT NOT NULL,
124 last_accessed_at TEXT,
125 expires_at TEXT,
126 metadata TEXT NOT NULL DEFAULT '{}'
127);
128
129CREATE INDEX IF NOT EXISTS idx_memories_tier ON memories(tier);
130CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace);
131CREATE INDEX IF NOT EXISTS idx_memories_priority ON memories(priority DESC);
132CREATE INDEX IF NOT EXISTS idx_memories_expires ON memories(expires_at);
133CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_title_ns ON memories(title, namespace);
134
135CREATE TABLE IF NOT EXISTS memory_links (
136 source_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
137 target_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
138 relation TEXT NOT NULL DEFAULT 'related_to',
139 created_at TEXT NOT NULL,
140 PRIMARY KEY (source_id, target_id, relation)
141);
142
143CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
144 title,
145 content,
146 tags,
147 content=memories,
148 content_rowid=rowid
149);
150
151CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
152 INSERT INTO memories_fts(rowid, title, content, tags)
153 VALUES (new.rowid, new.title, new.content, new.tags);
154END;
155
156CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
157 INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
158 VALUES ('delete', old.rowid, old.title, old.content, old.tags);
159END;
160
161CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
162 INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
163 VALUES ('delete', old.rowid, old.title, old.content, old.tags);
164 INSERT INTO memories_fts(rowid, title, content, tags)
165 VALUES (new.rowid, new.title, new.content, new.tags);
166END;
167
168CREATE TABLE IF NOT EXISTS schema_version (
169 version INTEGER NOT NULL
170);
171
172-- v0.6.4-009 — capability-expansion audit log (NHI guardrails phase 1).
173-- Mirrors migrations/sqlite/0014_v064_audit_log.sql so a fresh DB
174-- bootstrap that bypasses the migration ladder still ends up with the
175-- table present.
176CREATE TABLE IF NOT EXISTS audit_log (
177 id TEXT PRIMARY KEY,
178 agent_id TEXT,
179 event_type TEXT NOT NULL,
180 requested_family TEXT,
181 granted INTEGER NOT NULL,
182 attestation_tier TEXT,
183 timestamp TEXT NOT NULL
184);
185CREATE INDEX IF NOT EXISTS idx_audit_log_agent_id
186 ON audit_log (agent_id);
187CREATE INDEX IF NOT EXISTS idx_audit_log_timestamp
188 ON audit_log (timestamp);
189CREATE INDEX IF NOT EXISTS idx_audit_log_event_type
190 ON audit_log (event_type);
191";
192
193const CURRENT_SCHEMA_VERSION: i64 = 20;
201
202pub fn open(path: &Path) -> Result<Connection> {
203 let conn = Connection::open(path).context("failed to open database")?;
204 apply_sqlcipher_key(&conn)?;
205 conn.pragma_update(None, "journal_mode", "WAL")?;
206 conn.pragma_update(None, "busy_timeout", 5000)?;
207 conn.pragma_update(None, "synchronous", "NORMAL")?;
208 conn.pragma_update(None, "foreign_keys", "ON")?;
209 conn.execute_batch(SCHEMA)
210 .context("failed to initialize schema")?;
211 migrate(&conn)?;
212 Ok(conn)
213}
214
215#[cfg(feature = "sqlcipher")]
227fn apply_sqlcipher_key(conn: &Connection) -> Result<()> {
228 let Ok(passphrase) = std::env::var("AI_MEMORY_DB_PASSPHRASE") else {
229 anyhow::bail!(
230 "sqlcipher build requires AI_MEMORY_DB_PASSPHRASE (set via --db-passphrase-file <path>)"
231 );
232 };
233 let escaped = passphrase.replace('\'', "''");
236 conn.pragma_update(None, "key", format!("'{escaped}'"))
237 .context("PRAGMA key failed (wrong passphrase or unencrypted DB?)")?;
238 conn.query_row("SELECT count(*) FROM sqlite_master", [], |r| {
240 r.get::<_, i64>(0)
241 })
242 .context("SQLCipher unlock verification failed — wrong passphrase?")?;
243 Ok(())
244}
245
246#[cfg(not(feature = "sqlcipher"))]
247#[allow(clippy::unnecessary_wraps)]
248fn apply_sqlcipher_key(_conn: &Connection) -> Result<()> {
249 Ok(())
250}
251
252const MIGRATION_V15_SQLITE: &str = include_str!("../migrations/sqlite/0010_v063_hierarchy_kg.sql");
253const MIGRATION_V17_SQLITE: &str = include_str!("../migrations/sqlite/0012_governance_inherit.sql");
257const MIGRATION_V18_SQLITE: &str =
261 include_str!("../migrations/sqlite/0011_v0631_data_integrity.sql");
262const MIGRATION_V19_SQLITE: &str =
266 include_str!("../migrations/sqlite/0013_webhook_event_types.sql");
267const MIGRATION_V20_SQLITE: &str = include_str!("../migrations/sqlite/0014_v064_audit_log.sql");
270
271#[allow(clippy::too_many_lines)]
272fn migrate(conn: &Connection) -> Result<()> {
273 let version: i64 = conn
274 .query_row(
275 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
276 [],
277 |r| r.get(0),
278 )
279 .unwrap_or(0);
280
281 if version >= CURRENT_SCHEMA_VERSION {
282 return Ok(());
283 }
284
285 conn.execute_batch("BEGIN EXCLUSIVE")?;
286 let result = (|| -> Result<()> {
287 if version < 2 {
288 let mut has_confidence = false;
289 let mut has_source = false;
290 let mut stmt = conn.prepare("PRAGMA table_info(memories)")?;
291 let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
292 for col in cols {
293 match col?.as_str() {
294 "confidence" => has_confidence = true,
295 "source" => has_source = true,
296 _ => {}
297 }
298 }
299 drop(stmt);
300 if !has_confidence {
301 conn.execute(
302 "ALTER TABLE memories ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0",
303 [],
304 )?;
305 }
306 if !has_source {
307 conn.execute(
308 "ALTER TABLE memories ADD COLUMN source TEXT NOT NULL DEFAULT 'api'",
309 [],
310 )?;
311 }
312 }
313
314 if version < 3 {
315 let mut has_embedding = false;
317 let mut stmt = conn.prepare("PRAGMA table_info(memories)")?;
318 let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
319 for col in cols {
320 if col?.as_str() == "embedding" {
321 has_embedding = true;
322 }
323 }
324 drop(stmt);
325 if !has_embedding {
326 conn.execute("ALTER TABLE memories ADD COLUMN embedding BLOB", [])?;
327 }
328 }
329 if version < 4 {
330 conn.execute_batch(
331 "CREATE TABLE IF NOT EXISTS archived_memories (
332 id TEXT PRIMARY KEY,
333 tier TEXT NOT NULL,
334 namespace TEXT NOT NULL DEFAULT 'global',
335 title TEXT NOT NULL,
336 content TEXT NOT NULL,
337 tags TEXT NOT NULL DEFAULT '[]',
338 priority INTEGER NOT NULL DEFAULT 5,
339 confidence REAL NOT NULL DEFAULT 1.0,
340 source TEXT NOT NULL DEFAULT 'api',
341 access_count INTEGER NOT NULL DEFAULT 0,
342 created_at TEXT NOT NULL,
343 updated_at TEXT NOT NULL,
344 last_accessed_at TEXT,
345 expires_at TEXT,
346 archived_at TEXT NOT NULL,
347 archive_reason TEXT NOT NULL DEFAULT 'ttl_expired',
348 metadata TEXT NOT NULL DEFAULT '{}'
349 );
350 CREATE INDEX IF NOT EXISTS idx_archived_namespace ON archived_memories(namespace);
351 CREATE INDEX IF NOT EXISTS idx_archived_at ON archived_memories(archived_at);",
352 )?;
353 }
354 if version < 5 {
355 conn.execute_batch(
356 "CREATE TABLE IF NOT EXISTS namespace_meta (
357 namespace TEXT PRIMARY KEY,
358 standard_id TEXT,
359 updated_at TEXT NOT NULL
360 );",
361 )?;
362 }
363 if version < 6 {
364 let has_parent: bool = conn
366 .prepare("SELECT parent_namespace FROM namespace_meta LIMIT 0")
367 .is_ok();
368 if !has_parent {
369 conn.execute_batch("ALTER TABLE namespace_meta ADD COLUMN parent_namespace TEXT;")?;
370 }
371 }
372 if version < 7 {
373 let has_metadata: bool = conn
375 .prepare("SELECT metadata FROM memories LIMIT 0")
376 .is_ok();
377 if !has_metadata {
378 conn.execute(
379 "ALTER TABLE memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
380 [],
381 )?;
382 }
383 let has_archive_metadata: bool = conn
384 .prepare("SELECT metadata FROM archived_memories LIMIT 0")
385 .is_ok();
386 if !has_archive_metadata {
387 conn.execute(
388 "ALTER TABLE archived_memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
389 [],
390 )?;
391 }
392 }
393 if version < 8 {
394 conn.execute_batch(
396 "CREATE TABLE IF NOT EXISTS pending_actions (
397 id TEXT PRIMARY KEY,
398 action_type TEXT NOT NULL,
399 memory_id TEXT,
400 namespace TEXT NOT NULL,
401 payload TEXT NOT NULL DEFAULT '{}',
402 requested_by TEXT NOT NULL,
403 requested_at TEXT NOT NULL,
404 status TEXT NOT NULL DEFAULT 'pending',
405 decided_by TEXT,
406 decided_at TEXT
407 );
408 CREATE INDEX IF NOT EXISTS idx_pending_status ON pending_actions(status);
409 CREATE INDEX IF NOT EXISTS idx_pending_namespace ON pending_actions(namespace);",
410 )?;
411 }
412 if version < 9 {
413 let has_approvals: bool = conn
415 .prepare("SELECT approvals FROM pending_actions LIMIT 0")
416 .is_ok();
417 if !has_approvals {
418 conn.execute(
419 "ALTER TABLE pending_actions ADD COLUMN approvals TEXT NOT NULL DEFAULT '[]'",
420 [],
421 )?;
422 }
423 }
424
425 if version < 10 {
426 let has_scope_idx: bool = conn
441 .prepare("SELECT scope_idx FROM memories LIMIT 0")
442 .is_ok();
443 if !has_scope_idx {
444 conn.execute(
445 "ALTER TABLE memories ADD COLUMN scope_idx TEXT \
446 GENERATED ALWAYS AS (\
447 CASE WHEN json_valid(metadata) \
448 THEN COALESCE(json_extract(metadata, '$.scope'), 'private') \
449 ELSE 'private' END\
450 ) VIRTUAL",
451 [],
452 )?;
453 }
454 conn.execute(
455 "CREATE INDEX IF NOT EXISTS idx_memories_scope_idx ON memories(scope_idx)",
456 [],
457 )?;
458 }
459
460 if version < 11 {
461 conn.execute_batch(
471 "CREATE TABLE IF NOT EXISTS sync_state (
472 agent_id TEXT NOT NULL,
473 peer_id TEXT NOT NULL,
474 last_seen_at TEXT NOT NULL,
475 last_pulled_at TEXT NOT NULL,
476 PRIMARY KEY (agent_id, peer_id)
477 );
478 CREATE INDEX IF NOT EXISTS idx_sync_state_agent ON sync_state(agent_id);",
479 )?;
480 }
481
482 if version < 12 {
483 let has_last_pushed: bool = conn
488 .prepare("SELECT last_pushed_at FROM sync_state LIMIT 0")
489 .is_ok();
490 if !has_last_pushed {
491 conn.execute("ALTER TABLE sync_state ADD COLUMN last_pushed_at TEXT", [])?;
492 }
493 }
494
495 if version < 13 {
496 conn.execute(
503 "CREATE TABLE IF NOT EXISTS subscriptions (
504 id TEXT PRIMARY KEY,
505 url TEXT NOT NULL,
506 events TEXT NOT NULL DEFAULT '*',
507 secret_hash TEXT,
508 namespace_filter TEXT,
509 agent_filter TEXT,
510 created_by TEXT,
511 created_at TEXT NOT NULL,
512 last_dispatched_at TEXT,
513 dispatch_count INTEGER NOT NULL DEFAULT 0,
514 failure_count INTEGER NOT NULL DEFAULT 0
515 )",
516 [],
517 )?;
518 conn.execute(
519 "CREATE INDEX IF NOT EXISTS idx_subscriptions_url ON subscriptions(url)",
520 [],
521 )?;
522 }
523
524 if version < 14 {
525 let has_agent_id_idx: bool = conn
538 .prepare("SELECT agent_id_idx FROM memories LIMIT 0")
539 .is_ok();
540 if !has_agent_id_idx {
541 conn.execute(
542 "ALTER TABLE memories ADD COLUMN agent_id_idx TEXT \
543 GENERATED ALWAYS AS (\
544 CASE WHEN json_valid(metadata) \
545 THEN json_extract(metadata, '$.agent_id') \
546 ELSE NULL END\
547 ) VIRTUAL",
548 [],
549 )?;
550 }
551 conn.execute(
552 "CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id_idx)",
553 [],
554 )?;
555 conn.execute(
556 "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at)",
557 [],
558 )?;
559 }
560
561 if version < 15 {
562 let has_valid_from = conn
591 .prepare("SELECT valid_from FROM memory_links LIMIT 0")
592 .is_ok();
593 if !has_valid_from {
594 conn.execute("ALTER TABLE memory_links ADD COLUMN valid_from TEXT", [])?;
595 }
596 let has_valid_until = conn
597 .prepare("SELECT valid_until FROM memory_links LIMIT 0")
598 .is_ok();
599 if !has_valid_until {
600 conn.execute("ALTER TABLE memory_links ADD COLUMN valid_until TEXT", [])?;
601 }
602 let has_observed_by = conn
603 .prepare("SELECT observed_by FROM memory_links LIMIT 0")
604 .is_ok();
605 if !has_observed_by {
606 conn.execute("ALTER TABLE memory_links ADD COLUMN observed_by TEXT", [])?;
607 }
608 let has_signature = conn
609 .prepare("SELECT signature FROM memory_links LIMIT 0")
610 .is_ok();
611 if !has_signature {
612 conn.execute("ALTER TABLE memory_links ADD COLUMN signature BLOB", [])?;
613 }
614
615 conn.execute_batch(MIGRATION_V15_SQLITE)?;
617 }
618
619 if version < 16 {
620 }
627
628 if version < 17 {
629 conn.execute_batch(MIGRATION_V17_SQLITE)?;
637 }
638
639 if version < 18 {
640 let has_embedding_dim = conn
653 .prepare("SELECT embedding_dim FROM memories LIMIT 0")
654 .is_ok();
655 if !has_embedding_dim {
656 conn.execute("ALTER TABLE memories ADD COLUMN embedding_dim INTEGER", [])?;
657 }
658
659 let has_archive_embedding = conn
664 .prepare("SELECT embedding FROM archived_memories LIMIT 0")
665 .is_ok();
666 if !has_archive_embedding {
667 conn.execute(
668 "ALTER TABLE archived_memories ADD COLUMN embedding BLOB",
669 [],
670 )?;
671 }
672 let has_archive_embedding_dim = conn
673 .prepare("SELECT embedding_dim FROM archived_memories LIMIT 0")
674 .is_ok();
675 if !has_archive_embedding_dim {
676 conn.execute(
677 "ALTER TABLE archived_memories ADD COLUMN embedding_dim INTEGER",
678 [],
679 )?;
680 }
681 let has_original_tier = conn
682 .prepare("SELECT original_tier FROM archived_memories LIMIT 0")
683 .is_ok();
684 if !has_original_tier {
685 conn.execute(
686 "ALTER TABLE archived_memories ADD COLUMN original_tier TEXT",
687 [],
688 )?;
689 }
690 let has_original_expires_at = conn
691 .prepare("SELECT original_expires_at FROM archived_memories LIMIT 0")
692 .is_ok();
693 if !has_original_expires_at {
694 conn.execute(
695 "ALTER TABLE archived_memories ADD COLUMN original_expires_at TEXT",
696 [],
697 )?;
698 }
699
700 conn.execute_batch(MIGRATION_V18_SQLITE)?;
702 }
703
704 if version < 19 {
705 let has_event_types = conn
717 .prepare("SELECT event_types FROM subscriptions LIMIT 0")
718 .is_ok();
719 if !has_event_types {
720 conn.execute("ALTER TABLE subscriptions ADD COLUMN event_types TEXT", [])?;
721 }
722 conn.execute_batch(MIGRATION_V19_SQLITE)?;
724 }
725 if version < 20 {
726 conn.execute_batch(MIGRATION_V20_SQLITE)?;
728 }
729
730 conn.execute("DELETE FROM schema_version", [])?;
731 conn.execute(
732 "INSERT INTO schema_version (version) VALUES (?1)",
733 params![CURRENT_SCHEMA_VERSION],
734 )?;
735 Ok(())
736 })();
737
738 match result {
739 Ok(()) => {
740 conn.execute_batch("COMMIT")?;
741 Ok(())
742 }
743 Err(e) => {
744 let _ = conn.execute_batch("ROLLBACK");
745 Err(e)
746 }
747 }
748}
749
750fn row_to_memory(row: &rusqlite::Row) -> rusqlite::Result<Memory> {
751 let tags_json: String = row.get("tags")?;
752 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
753 let tier_str: String = row.get("tier")?;
754 let tier = Tier::from_str(&tier_str).unwrap_or(Tier::Mid);
755 let metadata_str: String = row
756 .get::<_, String>("metadata")
757 .unwrap_or_else(|_| "{}".to_string());
758 let metadata: serde_json::Value = serde_json::from_str(&metadata_str).unwrap_or_else(|e| {
759 tracing::warn!("corrupt metadata in DB row, defaulting to {{}}: {e}");
760 serde_json::json!({})
761 });
762 Ok(Memory {
763 id: row.get("id")?,
764 tier,
765 namespace: row.get("namespace")?,
766 title: row.get("title")?,
767 content: row.get("content")?,
768 tags,
769 priority: row.get("priority")?,
770 confidence: row.get("confidence").unwrap_or(1.0),
771 source: row.get("source").unwrap_or_else(|_| "api".to_string()),
772 access_count: row.get("access_count")?,
773 created_at: row.get("created_at")?,
774 updated_at: row.get("updated_at")?,
775 last_accessed_at: row.get("last_accessed_at")?,
776 expires_at: row.get("expires_at")?,
777 metadata,
778 })
779}
780
781pub fn insert(conn: &Connection, mem: &Memory) -> Result<String> {
789 let tags_json = serde_json::to_string(&mem.tags)?;
790 let metadata_json = serde_json::to_string(&mem.metadata)?;
791 let actual_id: String = conn.query_row(
792 "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, last_accessed_at, expires_at, metadata)
793 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
794 ON CONFLICT(title, namespace) DO UPDATE SET
795 content = excluded.content,
796 tags = excluded.tags,
797 priority = MAX(memories.priority, excluded.priority),
798 confidence = MAX(memories.confidence, excluded.confidence),
799 source = excluded.source,
800 tier = CASE WHEN excluded.tier = 'long' THEN 'long'
801 WHEN memories.tier = 'long' THEN 'long'
802 WHEN excluded.tier = 'mid' THEN 'mid'
803 ELSE memories.tier END,
804 updated_at = excluded.updated_at,
805 expires_at = CASE WHEN excluded.tier = 'long' OR memories.tier = 'long' THEN NULL
806 ELSE COALESCE(excluded.expires_at, memories.expires_at) END,
807 -- Preserve metadata.agent_id across upsert (NHI provenance is immutable).
808 metadata = CASE
809 WHEN json_extract(memories.metadata, '$.agent_id') IS NOT NULL
810 THEN json_set(
811 excluded.metadata,
812 '$.agent_id',
813 json_extract(memories.metadata, '$.agent_id')
814 )
815 ELSE excluded.metadata
816 END
817 RETURNING id",
818 params![
819 mem.id, mem.tier.as_str(), mem.namespace, mem.title, mem.content,
820 tags_json, mem.priority, mem.confidence, mem.source, mem.access_count,
821 mem.created_at, mem.updated_at, mem.last_accessed_at, mem.expires_at,
822 metadata_json,
823 ],
824 |r| r.get(0),
825 )?;
826 Ok(actual_id)
827}
828
829pub fn get(conn: &Connection, id: &str) -> Result<Option<Memory>> {
830 let mut stmt = conn.prepare("SELECT * FROM memories WHERE id = ?1")?;
831 let mut rows = stmt.query_map(params![id], row_to_memory)?;
832 match rows.next() {
833 Some(Ok(m)) => Ok(Some(m)),
834 Some(Err(e)) => Err(e.into()),
835 None => Ok(None),
836 }
837}
838
839pub fn get_by_prefix(conn: &Connection, prefix: &str) -> Result<Option<Memory>> {
842 let escaped = prefix.replace('%', "\\%").replace('_', "\\_");
844 let pattern = format!("{escaped}%");
845 let mut stmt = conn.prepare("SELECT * FROM memories WHERE id LIKE ?1 ESCAPE '\\'")?;
846 let rows: Vec<Memory> = stmt
847 .query_map(params![pattern], row_to_memory)?
848 .filter_map(Result::ok)
849 .collect();
850 match rows.len() {
851 0 => Ok(None),
852 1 => Ok(Some(rows.into_iter().next().expect("len checked"))),
853 n => {
854 let ids: Vec<String> = rows.iter().map(|m| m.id.clone()).collect();
855 anyhow::bail!(
856 "ambiguous ID prefix '{prefix}': {n} matches\n{}",
857 ids.join("\n")
858 );
859 }
860 }
861}
862
863pub fn resolve_id(conn: &Connection, id: &str) -> Result<Option<Memory>> {
865 if let Some(mem) = get(conn, id)? {
866 return Ok(Some(mem));
867 }
868 get_by_prefix(conn, id)
869}
870
871pub fn touch(conn: &Connection, id: &str, short_extend: i64, mid_extend: i64) -> Result<()> {
873 let now = Utc::now();
874 let now_str = now.to_rfc3339();
875 let short_expires = (now + chrono::Duration::seconds(short_extend)).to_rfc3339();
876 let mid_expires = (now + chrono::Duration::seconds(mid_extend)).to_rfc3339();
877
878 conn.execute_batch("BEGIN IMMEDIATE")?;
879
880 let result = (|| -> Result<()> {
881 conn.execute(
882 "UPDATE memories SET
883 access_count = MIN(access_count + 1, 1000000),
884 last_accessed_at = ?1,
885 expires_at = CASE
886 WHEN tier = 'long' THEN expires_at
887 WHEN tier = 'short' AND expires_at IS NOT NULL THEN ?2
888 WHEN tier = 'mid' AND expires_at IS NOT NULL THEN ?3
889 ELSE expires_at
890 END
891 WHERE id = ?4",
892 params![now_str, short_expires, mid_expires, id],
893 )?;
894
895 conn.execute(
896 "UPDATE memories SET tier = 'long', expires_at = NULL, updated_at = ?1
897 WHERE id = ?2 AND tier = 'mid' AND access_count >= ?3",
898 params![now_str, id, PROMOTION_THRESHOLD],
899 )?;
900
901 conn.execute(
902 "UPDATE memories SET priority = MIN(priority + 1, 10)
903 WHERE id = ?1 AND access_count > 0 AND access_count % 10 = 0 AND priority < 10",
904 params![id],
905 )?;
906
907 Ok(())
908 })();
909
910 match result {
911 Ok(()) => {
912 conn.execute_batch("COMMIT")?;
913 Ok(())
914 }
915 Err(e) => {
916 if let Err(rb) = conn.execute_batch("ROLLBACK") {
917 tracing::error!("ROLLBACK failed in touch: {}", rb);
918 }
919 Err(e)
920 }
921 }
922}
923
924#[allow(clippy::too_many_arguments)]
925pub fn update(
928 conn: &Connection,
929 id: &str,
930 title: Option<&str>,
931 content: Option<&str>,
932 tier: Option<&Tier>,
933 namespace: Option<&str>,
934 tags: Option<&Vec<String>>,
935 priority: Option<i32>,
936 confidence: Option<f64>,
937 expires_at: Option<&str>,
938 metadata: Option<&serde_json::Value>,
939) -> Result<(bool, bool)> {
940 let mut stmt = conn.prepare("SELECT * FROM memories WHERE id = ?1")?;
941 let mut rows = stmt.query_map(params![id], row_to_memory)?;
942 let Some(Ok(existing)) = rows.next() else {
943 return Ok((false, false));
944 };
945 drop(rows);
946 drop(stmt);
947
948 let new_title = title.unwrap_or(&existing.title);
949 let new_content = content.unwrap_or(&existing.content);
950 let content_changed = new_title != existing.title || new_content != existing.content;
951
952 let effective_tier = match (tier, &existing.tier) {
954 (Some(requested), existing_tier) => match (existing_tier, requested) {
955 (Tier::Long, _) => &Tier::Long, (Tier::Mid, Tier::Short) => &Tier::Mid, (_, requested) => requested, },
959 (None, existing_tier) => existing_tier,
960 };
961
962 let namespace = namespace.unwrap_or(&existing.namespace);
963 let tags = tags.unwrap_or(&existing.tags);
964 let priority = priority.unwrap_or(existing.priority);
965 let confidence = confidence.unwrap_or(existing.confidence);
966 let expires_at = match expires_at {
968 Some("" | "null") => None,
969 Some(v) => Some(v),
970 None => existing.expires_at.as_deref(),
971 };
972 let metadata = metadata.unwrap_or(&existing.metadata);
973 let tags_json = serde_json::to_string(tags)?;
974 let metadata_json = serde_json::to_string(metadata)?;
975 let now = Utc::now().to_rfc3339();
976
977 let update_res = conn.execute(
987 "UPDATE memories SET tier=?1, namespace=?2, title=?3, content=?4, tags=?5, priority=?6, confidence=?7, updated_at=?8, expires_at=?9, metadata=?10
988 WHERE id=?11",
989 params![effective_tier.as_str(), namespace, new_title, new_content, tags_json, priority, confidence, now, expires_at, metadata_json, id],
990 );
991 match update_res {
992 Ok(_) => Ok((true, content_changed)),
993 Err(rusqlite::Error::SqliteFailure(err, _))
994 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
995 {
996 let other: Option<String> = conn
997 .query_row(
998 "SELECT id FROM memories WHERE title = ?1 AND namespace = ?2 AND id != ?3",
999 params![new_title, namespace, id],
1000 |r| r.get(0),
1001 )
1002 .ok();
1003 if let Some(other_id) = other {
1004 anyhow::bail!(
1005 "title '{new_title}' already exists in namespace '{namespace}' (memory {other_id})"
1006 );
1007 }
1008 Err(anyhow::anyhow!("update failed with constraint violation"))
1009 }
1010 Err(e) => Err(e.into()),
1011 }
1012}
1013
1014pub fn delete(conn: &Connection, id: &str) -> Result<bool> {
1015 conn.execute(
1017 "DELETE FROM namespace_meta WHERE standard_id = ?1",
1018 params![id],
1019 )?;
1020 let changed = conn.execute("DELETE FROM memories WHERE id = ?1", params![id])?;
1021 Ok(changed > 0)
1022}
1023
1024pub fn archive_memory(conn: &Connection, id: &str, reason: Option<&str>) -> Result<bool> {
1040 let now = Utc::now().to_rfc3339();
1041 let reason = reason.unwrap_or("archive");
1042 conn.execute_batch("BEGIN IMMEDIATE")?;
1043 let result = (|| -> Result<bool> {
1044 let exists: bool = conn
1045 .query_row(
1046 "SELECT COUNT(*) > 0 FROM memories WHERE id = ?1",
1047 params![id],
1048 |r| r.get(0),
1049 )
1050 .unwrap_or(false);
1051 if !exists {
1052 return Ok(false);
1053 }
1054 conn.execute(
1058 "INSERT OR REPLACE INTO archived_memories
1059 (id, tier, namespace, title, content, tags, priority, confidence,
1060 source, access_count, created_at, updated_at, last_accessed_at,
1061 expires_at, archived_at, archive_reason, metadata,
1062 embedding, embedding_dim, original_tier, original_expires_at)
1063 SELECT id, tier, namespace, title, content, tags, priority, confidence,
1064 source, access_count, created_at, updated_at, last_accessed_at,
1065 expires_at, ?1, ?2, metadata,
1066 embedding, embedding_dim, tier, expires_at
1067 FROM memories WHERE id = ?3",
1068 params![now, reason, id],
1069 )?;
1070 conn.execute(
1073 "DELETE FROM namespace_meta WHERE standard_id = ?1",
1074 params![id],
1075 )?;
1076 let removed = conn.execute("DELETE FROM memories WHERE id = ?1", params![id])?;
1077 Ok(removed > 0)
1078 })();
1079 match result {
1080 Ok(moved) => {
1081 conn.execute_batch("COMMIT")?;
1082 Ok(moved)
1083 }
1084 Err(e) => {
1085 let _ = conn.execute_batch("ROLLBACK");
1086 Err(e)
1087 }
1088 }
1089}
1090
1091pub fn forget_count(
1093 conn: &Connection,
1094 namespace: Option<&str>,
1095 pattern: Option<&str>,
1096 tier: Option<&Tier>,
1097) -> Result<usize> {
1098 if pattern.is_none() && namespace.is_none() && tier.is_none() {
1099 anyhow::bail!("at least one of namespace, pattern, or tier is required");
1100 }
1101 if let Some(pat) = pattern {
1102 let fts_query = sanitize_fts_query(pat, true);
1103 let tier_str = tier.map(|t| t.as_str().to_string());
1104 let count: i64 = conn.query_row(
1105 "SELECT COUNT(*) FROM memories WHERE rowid IN (
1106 SELECT m.rowid FROM memories_fts fts
1107 JOIN memories m ON m.rowid = fts.rowid
1108 WHERE memories_fts MATCH ?1
1109 AND (?2 IS NULL OR m.namespace = ?2)
1110 AND (?3 IS NULL OR m.tier = ?3)
1111 )",
1112 params![fts_query, namespace, tier_str],
1113 |r| r.get(0),
1114 )?;
1115 return Ok(usize::try_from(count).unwrap_or(0));
1116 }
1117 let tier_str = tier.map(|t| t.as_str().to_string());
1118 let count: i64 = conn.query_row(
1119 "SELECT COUNT(*) FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
1120 params![namespace, tier_str],
1121 |r| r.get(0),
1122 )?;
1123 Ok(usize::try_from(count).unwrap_or(0))
1124}
1125
1126pub fn forget(
1129 conn: &Connection,
1130 namespace: Option<&str>,
1131 pattern: Option<&str>,
1132 tier: Option<&Tier>,
1133 archive: bool,
1134) -> Result<usize> {
1135 if pattern.is_none() && namespace.is_none() && tier.is_none() {
1136 anyhow::bail!("at least one of namespace, pattern, or tier is required");
1137 }
1138
1139 if archive {
1140 let now = Utc::now().to_rfc3339();
1142 if let Some(pat) = pattern {
1143 let fts_query = sanitize_fts_query(pat, true);
1144 let tier_str = tier.map(|t| t.as_str().to_string());
1145 conn.execute(
1147 "INSERT OR REPLACE INTO archived_memories
1148 (id, tier, namespace, title, content, tags, priority, confidence,
1149 source, access_count, created_at, updated_at, last_accessed_at,
1150 expires_at, archived_at, archive_reason,
1151 embedding, embedding_dim, original_tier, original_expires_at)
1152 SELECT id, tier, namespace, title, content, tags, priority, confidence,
1153 source, access_count, created_at, updated_at, last_accessed_at,
1154 expires_at, ?4, 'forget',
1155 embedding, embedding_dim, tier, expires_at
1156 FROM memories WHERE rowid IN (
1157 SELECT m.rowid FROM memories_fts fts
1158 JOIN memories m ON m.rowid = fts.rowid
1159 WHERE memories_fts MATCH ?1
1160 AND (?2 IS NULL OR m.namespace = ?2)
1161 AND (?3 IS NULL OR m.tier = ?3)
1162 )",
1163 params![fts_query, namespace, tier_str, now],
1164 )?;
1165 } else {
1166 let tier_str = tier.map(|t| t.as_str().to_string());
1167 conn.execute(
1168 "INSERT OR REPLACE INTO archived_memories
1169 (id, tier, namespace, title, content, tags, priority, confidence,
1170 source, access_count, created_at, updated_at, last_accessed_at,
1171 expires_at, archived_at, archive_reason,
1172 embedding, embedding_dim, original_tier, original_expires_at)
1173 SELECT id, tier, namespace, title, content, tags, priority, confidence,
1174 source, access_count, created_at, updated_at, last_accessed_at,
1175 expires_at, ?3, 'forget',
1176 embedding, embedding_dim, tier, expires_at
1177 FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
1178 params![namespace, tier_str, now],
1179 )?;
1180 }
1181 }
1182
1183 if let Some(pat) = pattern {
1185 let fts_query = sanitize_fts_query(pat, true);
1186 let tier_str = tier.map(|t| t.as_str().to_string());
1187 let deleted = conn.execute(
1188 "DELETE FROM memories WHERE rowid IN (
1189 SELECT m.rowid FROM memories_fts fts
1190 JOIN memories m ON m.rowid = fts.rowid
1191 WHERE memories_fts MATCH ?1
1192 AND (?2 IS NULL OR m.namespace = ?2)
1193 AND (?3 IS NULL OR m.tier = ?3)
1194 )",
1195 params![fts_query, namespace, tier_str],
1196 )?;
1197 return Ok(deleted);
1198 }
1199
1200 let tier_str = tier.map(|t| t.as_str().to_string());
1201 let deleted = conn.execute(
1202 "DELETE FROM memories WHERE (?1 IS NULL OR namespace = ?1) AND (?2 IS NULL OR tier = ?2)",
1203 params![namespace, tier_str],
1204 )?;
1205 Ok(deleted)
1206}
1207
1208#[allow(clippy::too_many_arguments)]
1209pub fn list(
1210 conn: &Connection,
1211 namespace: Option<&str>,
1212 tier: Option<&Tier>,
1213 limit: usize,
1214 offset: usize,
1215 min_priority: Option<i32>,
1216 since: Option<&str>,
1217 until: Option<&str>,
1218 tags_filter: Option<&str>,
1219 agent_id: Option<&str>,
1220) -> Result<Vec<Memory>> {
1221 let now = Utc::now().to_rfc3339();
1222 let tier_str = tier.map(|t| t.as_str().to_string());
1223 let mut stmt = conn.prepare(
1224 "SELECT * FROM memories
1225 WHERE (?1 IS NULL OR namespace = ?1)
1226 AND (?2 IS NULL OR tier = ?2)
1227 AND (?3 IS NULL OR priority >= ?3)
1228 AND (expires_at IS NULL OR expires_at > ?4)
1229 AND (?5 IS NULL OR created_at >= ?5)
1230 AND (?6 IS NULL OR created_at <= ?6)
1231 AND (?7 IS NULL OR EXISTS (SELECT 1 FROM json_each(memories.tags) WHERE json_each.value = ?7))
1232 AND (?10 IS NULL OR agent_id_idx = ?10)
1233 ORDER BY priority DESC, updated_at DESC
1234 LIMIT ?8 OFFSET ?9",
1235 )?;
1236 let rows = stmt.query_map(
1237 params![
1238 namespace,
1239 tier_str,
1240 min_priority,
1241 now,
1242 since,
1243 until,
1244 tags_filter,
1245 limit,
1246 offset,
1247 agent_id,
1248 ],
1249 row_to_memory,
1250 )?;
1251 rows.collect::<rusqlite::Result<Vec<_>>>()
1252 .map_err(Into::into)
1253}
1254
1255#[allow(clippy::too_many_arguments)]
1256pub fn search(
1257 conn: &Connection,
1258 query: &str,
1259 namespace: Option<&str>,
1260 tier: Option<&Tier>,
1261 limit: usize,
1262 min_priority: Option<i32>,
1263 since: Option<&str>,
1264 until: Option<&str>,
1265 tags_filter: Option<&str>,
1266 agent_id: Option<&str>,
1267 as_agent: Option<&str>,
1268) -> Result<Vec<Memory>> {
1269 let now = Utc::now().to_rfc3339();
1270 let tier_str = tier.map(|t| t.as_str().to_string());
1271 let fts_query = sanitize_fts_query(query, false);
1272 let (vis_p, vis_t, vis_u, vis_o) = compute_visibility_prefixes(as_agent);
1273
1274 let sql = format!(
1275 "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1276 m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1277 m.last_accessed_at, m.expires_at, m.metadata
1278 FROM memories_fts fts
1279 JOIN memories m ON m.rowid = fts.rowid
1280 WHERE memories_fts MATCH ?1
1281 AND (?2 IS NULL OR m.namespace = ?2)
1282 AND (?3 IS NULL OR m.tier = ?3)
1283 AND (?4 IS NULL OR m.priority >= ?4)
1284 AND (m.expires_at IS NULL OR m.expires_at > ?5)
1285 AND (?6 IS NULL OR m.created_at >= ?6)
1286 AND (?7 IS NULL OR m.created_at <= ?7)
1287 AND (?8 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?8))
1288 AND (?10 IS NULL OR m.agent_id_idx = ?10)
1289 {vis}
1290 ORDER BY (fts.rank * -1)
1291 + (m.priority * 0.5)
1292 + (MIN(m.access_count, 50) * 0.1)
1293 + (m.confidence * 2.0)
1294 + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
1295 DESC
1296 LIMIT ?9",
1297 vis = visibility_clause(11, "m"),
1298 );
1299 let mut stmt = conn.prepare(&sql)?;
1300 let rows = stmt.query_map(
1301 params![
1302 fts_query,
1303 namespace,
1304 tier_str,
1305 min_priority,
1306 now,
1307 since,
1308 until,
1309 tags_filter,
1310 limit,
1311 agent_id,
1312 vis_p,
1313 vis_t,
1314 vis_u,
1315 vis_o,
1316 ],
1317 row_to_memory,
1318 )?;
1319 rows.collect::<rusqlite::Result<Vec<_>>>()
1320 .map_err(Into::into)
1321}
1322
1323#[must_use]
1328pub fn proximity_boost(agent_ns: &str, memory_ns: &str) -> f64 {
1329 let agent_depth = crate::models::namespace_depth(agent_ns);
1330 let memory_depth = crate::models::namespace_depth(memory_ns);
1331 let distance = agent_depth.saturating_sub(memory_depth);
1332 #[allow(clippy::cast_precision_loss)]
1333 let d = distance as f64;
1334 1.0 / (1.0 + d * 0.3)
1335}
1336
1337fn hierarchy_in_clause(namespace: Option<&str>) -> (Option<String>, bool) {
1348 let Some(ns) = namespace else {
1349 return (None, false);
1350 };
1351 if !ns.contains('/') {
1352 return (None, false);
1353 }
1354 let ancestors = crate::models::namespace_ancestors(ns);
1355 if ancestors.is_empty() {
1356 return (None, false);
1357 }
1358 let quoted: Vec<String> = ancestors
1359 .iter()
1360 .map(|a| format!("'{}'", a.replace('\'', "''")))
1361 .collect();
1362 (
1363 Some(format!("AND m.namespace IN ({})", quoted.join(","))),
1364 true,
1365 )
1366}
1367
1368fn apply_proximity_boost(scored: Vec<(Memory, f64)>, agent_ns: &str) -> Vec<(Memory, f64)> {
1371 let mut boosted: Vec<(Memory, f64)> = scored
1372 .into_iter()
1373 .map(|(mem, score)| {
1374 let boost = proximity_boost(agent_ns, &mem.namespace);
1375 (mem, score * boost)
1376 })
1377 .collect();
1378 boosted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1379 boosted
1380}
1381
1382#[must_use]
1394pub fn count_tokens_cl100k(text: &str) -> usize {
1395 use std::sync::OnceLock;
1396 static BPE: OnceLock<Option<tiktoken_rs::CoreBPE>> = OnceLock::new();
1397 let bpe = BPE.get_or_init(|| tiktoken_rs::cl100k_base().ok());
1398 if let Some(bpe) = bpe.as_ref() {
1399 bpe.encode_with_special_tokens(text).len()
1400 } else {
1401 text.len() / 4
1405 }
1406}
1407
1408#[must_use]
1413pub fn count_memory_tokens(mem: &Memory) -> usize {
1414 count_tokens_cl100k(&mem.content)
1415}
1416
1417#[must_use]
1423pub fn estimate_memory_tokens(mem: &Memory) -> usize {
1424 count_memory_tokens(mem)
1425}
1426
1427#[derive(Debug, Clone)]
1432pub struct BudgetOutcome {
1433 pub tokens_used: usize,
1435 pub tokens_remaining: Option<usize>,
1437 pub memories_dropped: usize,
1439 pub budget_overflow: bool,
1443}
1444
1445#[must_use]
1465pub fn apply_token_budget(
1466 scored: Vec<(Memory, f64)>,
1467 budget_tokens: Option<usize>,
1468) -> (Vec<(Memory, f64)>, BudgetOutcome) {
1469 let total_candidates = scored.len();
1470
1471 if budget_tokens == Some(0) {
1476 return (
1477 Vec::new(),
1478 BudgetOutcome {
1479 tokens_used: 0,
1480 tokens_remaining: Some(0),
1481 memories_dropped: total_candidates,
1482 budget_overflow: false,
1483 },
1484 );
1485 }
1486
1487 if budget_tokens.is_none() {
1492 let mut used: usize = 0;
1493 let mut out: Vec<(Memory, f64)> = Vec::with_capacity(scored.len());
1494 for (mem, score) in scored {
1495 used = used.saturating_add(mem.content.len() / 4);
1496 out.push((mem, score));
1497 }
1498 return (
1499 out,
1500 BudgetOutcome {
1501 tokens_used: used,
1502 tokens_remaining: None,
1503 memories_dropped: 0,
1504 budget_overflow: false,
1505 },
1506 );
1507 }
1508
1509 let mut used: usize = 0;
1512 let mut out: Vec<(Memory, f64)> = Vec::with_capacity(scored.len());
1513 let mut overflow = false;
1514
1515 for (mem, score) in scored {
1516 let cost = count_memory_tokens(&mem);
1517 if let Some(budget) = budget_tokens
1518 && used.saturating_add(cost) > budget
1519 {
1520 if out.is_empty() {
1523 used = used.saturating_add(cost);
1524 out.push((mem, score));
1525 overflow = true;
1526 }
1527 break;
1528 }
1529 used = used.saturating_add(cost);
1530 out.push((mem, score));
1531 }
1532
1533 let dropped = total_candidates.saturating_sub(out.len());
1534 let tokens_remaining = budget_tokens.map(|b| b.saturating_sub(used));
1535 (
1536 out,
1537 BudgetOutcome {
1538 tokens_used: used,
1539 tokens_remaining,
1540 memories_dropped: dropped,
1541 budget_overflow: overflow,
1542 },
1543 )
1544}
1545
1546#[allow(clippy::too_many_arguments)]
1553#[allow(clippy::too_many_arguments)]
1561pub fn recall_with_telemetry(
1562 conn: &Connection,
1563 context: &str,
1564 namespace: Option<&str>,
1565 limit: usize,
1566 tags_filter: Option<&str>,
1567 since: Option<&str>,
1568 until: Option<&str>,
1569 short_extend: i64,
1570 mid_extend: i64,
1571 as_agent: Option<&str>,
1572 budget_tokens: Option<usize>,
1573) -> Result<(
1574 Vec<(Memory, f64)>,
1575 BudgetOutcome,
1576 crate::models::RecallTelemetry,
1577)> {
1578 let (results, outcome) = recall(
1579 conn,
1580 context,
1581 namespace,
1582 limit,
1583 tags_filter,
1584 since,
1585 until,
1586 short_extend,
1587 mid_extend,
1588 as_agent,
1589 budget_tokens,
1590 )?;
1591 let telemetry = crate::models::RecallTelemetry {
1592 fts_candidates: results.len(),
1593 hnsw_candidates: 0,
1594 blend_weight_avg: 0.0,
1595 };
1596 Ok((results, outcome, telemetry))
1597}
1598
1599pub fn recall(
1600 conn: &Connection,
1601 context: &str,
1602 namespace: Option<&str>,
1603 limit: usize,
1604 tags_filter: Option<&str>,
1605 since: Option<&str>,
1606 until: Option<&str>,
1607 short_extend: i64,
1608 mid_extend: i64,
1609 as_agent: Option<&str>,
1610 budget_tokens: Option<usize>,
1611) -> Result<(Vec<(Memory, f64)>, BudgetOutcome)> {
1612 let now = Utc::now().to_rfc3339();
1613 let fts_query = sanitize_fts_query(context, true);
1614 let (vis_p, vis_t, vis_u, vis_o) = compute_visibility_prefixes(as_agent);
1615
1616 let (hierarchy_in, hierarchy_active) = hierarchy_in_clause(namespace);
1620 let hierarchy_fragment = hierarchy_in.unwrap_or_default();
1621 let effective_namespace = if hierarchy_active { None } else { namespace };
1622
1623 let sql = format!(
1624 "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1625 m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1626 m.last_accessed_at, m.expires_at, m.metadata,
1627 (fts.rank * -1)
1628 + (m.priority * 0.5)
1629 + (MIN(m.access_count, 50) * 0.1)
1630 + (m.confidence * 2.0)
1631 + (CASE m.tier WHEN 'long' THEN 3.0 WHEN 'mid' THEN 1.0 ELSE 0.0 END)
1632 + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
1633 AS score
1634 FROM memories_fts fts
1635 JOIN memories m ON m.rowid = fts.rowid
1636 WHERE memories_fts MATCH ?1
1637 AND (?2 IS NULL OR m.namespace = ?2)
1638 {hierarchy_fragment}
1639 AND (m.expires_at IS NULL OR m.expires_at > ?3)
1640 AND (?4 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?4))
1641 AND (?5 IS NULL OR m.created_at >= ?5)
1642 AND (?6 IS NULL OR m.created_at <= ?6)
1643 {vis}
1644 ORDER BY score DESC
1645 LIMIT ?7",
1646 vis = visibility_clause(8, "m"),
1647 );
1648 let mut stmt = conn.prepare(&sql)?;
1649 let rows = stmt.query_map(
1650 params![
1651 fts_query,
1652 effective_namespace,
1653 now,
1654 tags_filter,
1655 since,
1656 until,
1657 limit,
1658 vis_p,
1659 vis_t,
1660 vis_u,
1661 vis_o
1662 ],
1663 |row| {
1664 let mem = row_to_memory(row)?;
1665 let score: f64 = row.get(15)?;
1666 Ok((mem, score))
1667 },
1668 )?;
1669 let results: Vec<(Memory, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
1670
1671 let boosted = if let (true, Some(anchor)) = (hierarchy_active, namespace) {
1673 apply_proximity_boost(results, anchor)
1674 } else {
1675 results
1676 };
1677
1678 let (budgeted, outcome) = apply_token_budget(boosted, budget_tokens);
1681
1682 for (mem, _) in &budgeted {
1685 if let Err(e) = touch(conn, &mem.id, short_extend, mid_extend) {
1686 tracing::warn!("touch failed for memory {}: {}", &mem.id, e);
1687 }
1688 }
1689 Ok((budgeted, outcome))
1690}
1691
1692pub fn promote_to_namespace(
1707 conn: &Connection,
1708 source_id: &str,
1709 to_namespace: &str,
1710) -> Result<String> {
1711 if to_namespace.is_empty() {
1712 anyhow::bail!("to_namespace cannot be empty");
1713 }
1714 let source = get(conn, source_id)?
1715 .ok_or_else(|| anyhow::anyhow!("source memory not found: {source_id}"))?;
1716 if to_namespace == source.namespace {
1717 anyhow::bail!(
1718 "to_namespace must be a proper ancestor of the memory's namespace (got self: {})",
1719 source.namespace
1720 );
1721 }
1722 let ancestors = namespace_ancestors(&source.namespace);
1723 if !ancestors.iter().any(|a| a == to_namespace) {
1724 anyhow::bail!(
1725 "to_namespace '{to_namespace}' is not an ancestor of '{}' (ancestors: {ancestors:?})",
1726 source.namespace
1727 );
1728 }
1729
1730 let now = Utc::now().to_rfc3339();
1731 let clone = Memory {
1732 id: uuid::Uuid::new_v4().to_string(),
1733 tier: source.tier.clone(),
1734 namespace: to_namespace.to_string(),
1735 title: source.title.clone(),
1736 content: source.content.clone(),
1737 tags: source.tags.clone(),
1738 priority: source.priority,
1739 confidence: source.confidence,
1740 source: source.source.clone(),
1741 access_count: 0,
1742 created_at: now.clone(),
1743 updated_at: now,
1744 last_accessed_at: None,
1745 expires_at: source.expires_at.clone(),
1746 metadata: source.metadata.clone(),
1747 };
1748 let actual_id = insert(conn, &clone)?;
1749 create_link(conn, &actual_id, source_id, "derived_from")?;
1752 Ok(actual_id)
1753}
1754
1755pub fn find_by_title_namespace(
1763 conn: &Connection,
1764 title: &str,
1765 namespace: &str,
1766) -> Result<Option<String>> {
1767 let id: Option<String> = conn
1768 .query_row(
1769 "SELECT id FROM memories WHERE title = ?1 AND namespace = ?2 LIMIT 1",
1770 params![title, namespace],
1771 |r| r.get(0),
1772 )
1773 .ok();
1774 Ok(id)
1775}
1776
1777const MAX_VERSION_SUFFIX: u32 = 1024;
1785
1786pub fn next_versioned_title(
1791 conn: &Connection,
1792 base_title: &str,
1793 namespace: &str,
1794) -> Result<String> {
1795 if find_by_title_namespace(conn, base_title, namespace)?.is_none() {
1796 return Ok(base_title.to_string());
1797 }
1798 for n in 2..=MAX_VERSION_SUFFIX {
1799 let candidate = format!("{base_title} ({n})");
1800 if find_by_title_namespace(conn, &candidate, namespace)?.is_none() {
1801 return Ok(candidate);
1802 }
1803 }
1804 anyhow::bail!(
1805 "could not find a free versioned title for '{base_title}' in namespace '{namespace}' \
1806 within {MAX_VERSION_SUFFIX} attempts"
1807 )
1808}
1809
1810pub fn find_contradictions(conn: &Connection, title: &str, namespace: &str) -> Result<Vec<Memory>> {
1812 let fts_query = sanitize_fts_query(title, true);
1813 let mut stmt = conn.prepare(
1814 "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
1815 m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
1816 m.last_accessed_at, m.expires_at, m.metadata
1817 FROM memories_fts fts
1818 JOIN memories m ON m.rowid = fts.rowid
1819 WHERE memories_fts MATCH ?1 AND m.namespace = ?2
1820 ORDER BY fts.rank
1821 LIMIT 5",
1822 )?;
1823 let rows = stmt.query_map(params![fts_query, namespace], row_to_memory)?;
1824 rows.collect::<rusqlite::Result<Vec<_>>>()
1825 .map_err(Into::into)
1826}
1827
1828pub fn create_link(
1831 conn: &Connection,
1832 source_id: &str,
1833 target_id: &str,
1834 relation: &str,
1835) -> Result<()> {
1836 let source_exists: bool = conn
1838 .query_row(
1839 "SELECT EXISTS(SELECT 1 FROM memories WHERE id = ?1)",
1840 params![source_id],
1841 |r| r.get(0),
1842 )
1843 .unwrap_or(false);
1844 if !source_exists {
1845 anyhow::bail!("source memory not found: {source_id}");
1846 }
1847 let target_exists: bool = conn
1848 .query_row(
1849 "SELECT EXISTS(SELECT 1 FROM memories WHERE id = ?1)",
1850 params![target_id],
1851 |r| r.get(0),
1852 )
1853 .unwrap_or(false);
1854 if !target_exists {
1855 anyhow::bail!("target memory not found: {target_id}");
1856 }
1857 let now = Utc::now().to_rfc3339();
1862 conn.execute(
1863 "INSERT OR IGNORE INTO memory_links (source_id, target_id, relation, created_at, valid_from) VALUES (?1, ?2, ?3, ?4, ?4)",
1864 params![source_id, target_id, relation, now],
1865 )?;
1866 Ok(())
1867}
1868
1869pub fn get_links(conn: &Connection, id: &str) -> Result<Vec<MemoryLink>> {
1870 let mut stmt = conn.prepare(
1871 "SELECT source_id, target_id, relation, created_at FROM memory_links
1872 WHERE source_id = ?1 OR target_id = ?1",
1873 )?;
1874 let rows = stmt.query_map(params![id], |row| {
1875 Ok(MemoryLink {
1876 source_id: row.get(0)?,
1877 target_id: row.get(1)?,
1878 relation: row.get(2)?,
1879 created_at: row.get(3)?,
1880 })
1881 })?;
1882 rows.collect::<rusqlite::Result<Vec<_>>>()
1883 .map_err(Into::into)
1884}
1885
1886#[allow(dead_code)]
1887pub fn delete_link(conn: &Connection, source_id: &str, target_id: &str) -> Result<bool> {
1888 let changed = conn.execute(
1889 "DELETE FROM memory_links WHERE source_id = ?1 AND target_id = ?2",
1890 params![source_id, target_id],
1891 )?;
1892 Ok(changed > 0)
1893}
1894
1895#[allow(clippy::too_many_arguments)]
1900pub fn consolidate(
1901 conn: &Connection,
1902 ids: &[String],
1903 title: &str,
1904 summary: &str,
1905 namespace: &str,
1906 tier: &Tier,
1907 source: &str,
1908 consolidator_agent_id: &str,
1909) -> Result<String> {
1910 let now = Utc::now().to_rfc3339();
1911 let new_id = uuid::Uuid::new_v4().to_string();
1912
1913 conn.execute_batch("BEGIN IMMEDIATE")?;
1914
1915 let result = (|| -> Result<String> {
1916 let mut max_priority = 5i32;
1918 let mut all_tags: Vec<String> = Vec::new();
1919 let mut total_access = 0i64;
1920 let mut merged_metadata = serde_json::Map::new();
1921 let mut source_agent_ids: Vec<String> = Vec::new();
1925 for id in ids {
1926 match get(conn, id)? {
1927 Some(mem) => {
1928 max_priority = max_priority.max(mem.priority);
1929 all_tags.extend(mem.tags);
1930 total_access = total_access.saturating_add(mem.access_count);
1931 if let serde_json::Value::Object(map) = mem.metadata {
1935 for (k, v) in map {
1936 if k == "agent_id" {
1937 if let serde_json::Value::String(aid) = &v
1938 && !source_agent_ids.contains(aid)
1939 {
1940 source_agent_ids.push(aid.clone());
1941 }
1942 continue;
1943 }
1944 if let Some(existing) = merged_metadata.get(&k)
1945 && std::mem::discriminant(existing) != std::mem::discriminant(&v)
1946 {
1947 tracing::warn!(
1948 "consolidate: key '{}' type changed during merge",
1949 k
1950 );
1951 }
1952 merged_metadata.insert(k, v);
1953 }
1954 } else {
1955 tracing::warn!(
1956 "memory {} has non-object metadata during consolidate, skipping",
1957 id
1958 );
1959 }
1960 }
1961 None => anyhow::bail!("memory not found: {id}"),
1962 }
1963 }
1964 all_tags.sort();
1965 all_tags.dedup();
1966 let tags_json = serde_json::to_string(&all_tags)?;
1967 merged_metadata.insert(
1969 "derived_from".to_string(),
1970 serde_json::Value::Array(
1971 ids.iter()
1972 .map(|id| serde_json::Value::String(id.clone()))
1973 .collect(),
1974 ),
1975 );
1976 merged_metadata.insert(
1979 "agent_id".to_string(),
1980 serde_json::Value::String(consolidator_agent_id.to_string()),
1981 );
1982 if !source_agent_ids.is_empty() {
1983 merged_metadata.insert(
1984 "consolidated_from_agents".to_string(),
1985 serde_json::Value::Array(
1986 source_agent_ids
1987 .into_iter()
1988 .map(serde_json::Value::String)
1989 .collect(),
1990 ),
1991 );
1992 }
1993 let merged_metadata_value = serde_json::Value::Object(merged_metadata);
1994 crate::validate::validate_metadata(&merged_metadata_value)
1995 .context("merged metadata exceeds size limit")?;
1996 let metadata_json = serde_json::to_string(&merged_metadata_value)?;
1997
1998 conn.execute(
1999 "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, metadata)
2000 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1.0, ?8, ?9, ?10, ?10, ?11)",
2001 params![new_id, tier.as_str(), namespace, title, summary, tags_json, max_priority, source, total_access, now, metadata_json],
2002 )?;
2003
2004 for id in ids {
2009 delete(conn, id)?;
2010 }
2011
2012 Ok(new_id.clone())
2013 })();
2014
2015 match result {
2016 Ok(id) => {
2017 conn.execute_batch("COMMIT")?;
2018 Ok(id)
2019 }
2020 Err(e) => {
2021 if let Err(rb) = conn.execute_batch("ROLLBACK") {
2022 tracing::error!("ROLLBACK failed in consolidate: {}", rb);
2023 }
2024 Err(e)
2025 }
2026 }
2027}
2028
2029fn strip_invisible(s: &str) -> String {
2031 s.chars()
2032 .filter(|c| {
2033 !matches!(c,
2034 '\u{200B}' | '\u{200C}' | '\u{200D}' | '\u{FEFF}' |
2035 '\u{00AD}' | '\u{034F}' | '\u{061C}' |
2036 '\u{180E}' | '\u{2060}' | '\u{2061}'..='\u{2064}' |
2037 '\u{FE00}'..='\u{FE0F}' | '\u{200E}' | '\u{200F}' |
2038 '\u{202A}'..='\u{202E}' | '\u{2066}'..='\u{2069}'
2039 )
2040 })
2041 .collect()
2042}
2043
2044fn sanitize_fts_query(input: &str, use_or: bool) -> String {
2045 let joiner = if use_or { " OR " } else { " " };
2046 let cleaned = strip_invisible(input);
2047 let tokens: Vec<String> = cleaned
2048 .split_whitespace()
2049 .filter(|t| !t.is_empty())
2050 .filter(|t| {
2051 let upper = t.to_uppercase();
2053 upper != "AND" && upper != "OR" && upper != "NOT" && upper != "NEAR"
2054 })
2055 .map(|token| {
2056 let clean: String = token
2069 .chars()
2070 .filter(|c| {
2071 *c != '"'
2072 && *c != '*'
2073 && *c != '^'
2074 && *c != '{'
2075 && *c != '}'
2076 && *c != '('
2077 && *c != ')'
2078 && *c != ':'
2079 && *c != '|'
2080 && *c != '+'
2081 })
2082 .collect();
2083 if clean.is_empty() {
2084 return String::new();
2085 }
2086 format!("\"{clean}\"")
2087 })
2088 .filter(|t| !t.is_empty())
2089 .collect();
2090 if tokens.is_empty() {
2091 return "\"_empty_\"".to_string();
2092 }
2093 tokens.join(joiner)
2094}
2095
2096pub fn list_namespaces(conn: &Connection) -> Result<Vec<NamespaceCount>> {
2097 let now = Utc::now().to_rfc3339();
2098 let mut stmt = conn.prepare(
2099 "SELECT namespace, COUNT(*) FROM memories WHERE expires_at IS NULL OR expires_at > ?1 GROUP BY namespace ORDER BY COUNT(*) DESC",
2100 )?;
2101 let rows = stmt.query_map(params![now], |row| {
2102 Ok(NamespaceCount {
2103 namespace: row.get(0)?,
2104 count: row.get(1)?,
2105 })
2106 })?;
2107 rows.collect::<rusqlite::Result<Vec<_>>>()
2108 .map_err(Into::into)
2109}
2110
2111const TAXONOMY_MAX_LIMIT: usize = 10_000;
2115
2116#[allow(clippy::too_many_lines)]
2137pub fn get_taxonomy(
2138 conn: &Connection,
2139 namespace_prefix: Option<&str>,
2140 max_depth: usize,
2141 limit: usize,
2142) -> Result<Taxonomy> {
2143 let now = Utc::now().to_rfc3339();
2144 let effective_limit = limit.min(TAXONOMY_MAX_LIMIT);
2145 let effective_depth = max_depth.min(MAX_NAMESPACE_DEPTH);
2149
2150 let prefix = namespace_prefix.unwrap_or("");
2151
2152 let total_count: usize = if prefix.is_empty() {
2156 let v: i64 = conn.query_row(
2157 "SELECT COUNT(*) FROM memories WHERE expires_at IS NULL OR expires_at > ?1",
2158 params![now],
2159 |row| row.get(0),
2160 )?;
2161 usize::try_from(v).unwrap_or(0)
2162 } else {
2163 let v: i64 = conn.query_row(
2164 "SELECT COUNT(*) FROM memories
2165 WHERE (expires_at IS NULL OR expires_at > ?1)
2166 AND (namespace = ?2 OR namespace LIKE ?2 || '/%')",
2167 params![now, prefix],
2168 |row| row.get(0),
2169 )?;
2170 usize::try_from(v).unwrap_or(0)
2171 };
2172
2173 let groups: Vec<(String, usize)> = if prefix.is_empty() {
2176 let mut stmt = conn.prepare(
2177 "SELECT namespace, COUNT(*) FROM memories
2178 WHERE expires_at IS NULL OR expires_at > ?1
2179 GROUP BY namespace
2180 ORDER BY COUNT(*) DESC, namespace ASC
2181 LIMIT ?2",
2182 )?;
2183 let rows = stmt.query_map(
2184 params![now, i64::try_from(effective_limit).unwrap_or(i64::MAX)],
2185 |row| {
2186 let ns: String = row.get(0)?;
2187 let c: i64 = row.get(1)?;
2188 Ok((ns, usize::try_from(c).unwrap_or(0)))
2189 },
2190 )?;
2191 rows.collect::<rusqlite::Result<Vec<_>>>()?
2192 } else {
2193 let mut stmt = conn.prepare(
2194 "SELECT namespace, COUNT(*) FROM memories
2195 WHERE (expires_at IS NULL OR expires_at > ?1)
2196 AND (namespace = ?2 OR namespace LIKE ?2 || '/%')
2197 GROUP BY namespace
2198 ORDER BY COUNT(*) DESC, namespace ASC
2199 LIMIT ?3",
2200 )?;
2201 let rows = stmt.query_map(
2202 params![
2203 now,
2204 prefix,
2205 i64::try_from(effective_limit).unwrap_or(i64::MAX)
2206 ],
2207 |row| {
2208 let ns: String = row.get(0)?;
2209 let c: i64 = row.get(1)?;
2210 Ok((ns, usize::try_from(c).unwrap_or(0)))
2211 },
2212 )?;
2213 rows.collect::<rusqlite::Result<Vec<_>>>()?
2214 };
2215
2216 let walked_count: usize = groups.iter().map(|(_, c)| *c).sum();
2217 let truncated = walked_count < total_count;
2218
2219 let root_name = prefix.rsplit('/').next().unwrap_or("").to_string();
2222 let mut root = TaxonomyNode {
2223 namespace: prefix.to_string(),
2224 name: root_name,
2225 count: 0,
2226 subtree_count: 0,
2227 children: Vec::new(),
2228 };
2229
2230 for (ns, c) in groups {
2231 let suffix: &str = if prefix.is_empty() {
2235 ns.as_str()
2236 } else if ns == prefix {
2237 ""
2238 } else if ns.len() > prefix.len() + 1
2239 && ns.starts_with(prefix)
2240 && ns.as_bytes()[prefix.len()] == b'/'
2241 {
2242 &ns[prefix.len() + 1..]
2243 } else {
2244 continue;
2248 };
2249 let all_segments: Vec<&str> = if suffix.is_empty() {
2250 Vec::new()
2251 } else {
2252 suffix.split('/').collect()
2253 };
2254 let take = all_segments.len().min(effective_depth);
2255 let used = &all_segments[..take];
2256 let exact_match_in_view = take == all_segments.len();
2257
2258 root.subtree_count += c;
2263 if used.is_empty() {
2264 root.count += c;
2265 continue;
2266 }
2267
2268 let mut path_so_far = prefix.to_string();
2269 let mut node = &mut root;
2270 for (i, seg) in used.iter().enumerate() {
2271 if !path_so_far.is_empty() {
2272 path_so_far.push('/');
2273 }
2274 path_so_far.push_str(seg);
2275 let pos = node.children.iter().position(|ch| ch.name == *seg);
2276 let idx = if let Some(p) = pos {
2277 p
2278 } else {
2279 node.children.push(TaxonomyNode {
2280 namespace: path_so_far.clone(),
2281 name: (*seg).to_string(),
2282 count: 0,
2283 subtree_count: 0,
2284 children: Vec::new(),
2285 });
2286 node.children.len() - 1
2287 };
2288 node = &mut node.children[idx];
2289 node.subtree_count += c;
2290 let is_leaf = i + 1 == used.len();
2291 if is_leaf && exact_match_in_view {
2292 node.count += c;
2293 }
2294 }
2295 }
2296
2297 sort_taxonomy(&mut root);
2298
2299 Ok(Taxonomy {
2300 tree: root,
2301 total_count,
2302 truncated,
2303 })
2304}
2305
2306fn sort_taxonomy(node: &mut TaxonomyNode) {
2307 node.children.sort_by(|a, b| a.name.cmp(&b.name));
2308 for child in &mut node.children {
2309 sort_taxonomy(child);
2310 }
2311}
2312
2313pub const DUPLICATE_THRESHOLD_MIN: f32 = 0.5;
2317
2318pub const DUPLICATE_THRESHOLD_DEFAULT: f32 = 0.85;
2323
2324pub fn check_duplicate(
2342 conn: &Connection,
2343 query_embedding: &[f32],
2344 namespace: Option<&str>,
2345 threshold: f32,
2346) -> Result<DuplicateCheck> {
2347 let effective_threshold = threshold.max(DUPLICATE_THRESHOLD_MIN);
2348 let now = Utc::now().to_rfc3339();
2349
2350 let rows: Vec<(String, String, String, Vec<u8>)> = if let Some(ns) = namespace {
2355 let mut stmt = conn.prepare(
2356 "SELECT id, title, namespace, embedding FROM memories
2357 WHERE embedding IS NOT NULL
2358 AND (expires_at IS NULL OR expires_at > ?1)
2359 AND namespace = ?2",
2360 )?;
2361 let mapped = stmt.query_map(params![now, ns], |row| {
2362 Ok((
2363 row.get::<_, String>(0)?,
2364 row.get::<_, String>(1)?,
2365 row.get::<_, String>(2)?,
2366 row.get::<_, Vec<u8>>(3)?,
2367 ))
2368 })?;
2369 mapped.collect::<rusqlite::Result<Vec<_>>>()?
2370 } else {
2371 let mut stmt = conn.prepare(
2372 "SELECT id, title, namespace, embedding FROM memories
2373 WHERE embedding IS NOT NULL
2374 AND (expires_at IS NULL OR expires_at > ?1)",
2375 )?;
2376 let mapped = stmt.query_map(params![now], |row| {
2377 Ok((
2378 row.get::<_, String>(0)?,
2379 row.get::<_, String>(1)?,
2380 row.get::<_, String>(2)?,
2381 row.get::<_, Vec<u8>>(3)?,
2382 ))
2383 })?;
2384 mapped.collect::<rusqlite::Result<Vec<_>>>()?
2385 };
2386
2387 let mut best: Option<DuplicateMatch> = None;
2388 let mut scanned: usize = 0;
2389 for (id, title, ns, bytes) in rows {
2390 if bytes.is_empty() {
2391 continue;
2392 }
2393 let candidate = match crate::embeddings::decode_embedding_blob(&bytes) {
2397 Ok(v) => v,
2398 Err(e) => {
2399 tracing::warn!(
2400 memory_id = %id,
2401 blob_len = bytes.len(),
2402 error = %e,
2403 "skipping duplicate-check candidate with malformed embedding"
2404 );
2405 continue;
2406 }
2407 };
2408 if candidate.len() != query_embedding.len() {
2412 tracing::warn!(
2413 memory_id = %id,
2414 expected = query_embedding.len(),
2415 got = candidate.len(),
2416 "skipping duplicate-check candidate with dimension mismatch"
2417 );
2418 continue;
2419 }
2420 let similarity =
2421 crate::embeddings::Embedder::cosine_similarity(query_embedding, &candidate);
2422 scanned += 1;
2423 let is_better = best.as_ref().is_none_or(|m| similarity > m.similarity);
2424 if is_better {
2425 best = Some(DuplicateMatch {
2426 id,
2427 title,
2428 namespace: ns,
2429 similarity,
2430 });
2431 }
2432 }
2433
2434 let is_duplicate = best
2435 .as_ref()
2436 .is_some_and(|m| m.similarity >= effective_threshold);
2437 Ok(DuplicateCheck {
2438 is_duplicate,
2439 threshold: effective_threshold,
2440 nearest: best,
2441 candidates_scanned: scanned,
2442 })
2443}
2444
2445pub fn entity_register(
2474 conn: &Connection,
2475 canonical_name: &str,
2476 namespace: &str,
2477 aliases: &[String],
2478 extra_metadata: &serde_json::Value,
2479 agent_id: Option<&str>,
2480) -> Result<crate::models::EntityRegistration> {
2481 use crate::models::{ENTITY_KIND, ENTITY_TAG, EntityRegistration};
2482
2483 let existing_id: Option<String> = match conn.query_row(
2487 "SELECT id FROM memories
2488 WHERE namespace = ?1 AND title = ?2
2489 AND COALESCE(json_extract(metadata, '$.kind'), '') = ?3",
2490 params![namespace, canonical_name, ENTITY_KIND],
2491 |r| r.get::<_, String>(0),
2492 ) {
2493 Ok(id) => Some(id),
2494 Err(rusqlite::Error::QueryReturnedNoRows) => None,
2495 Err(e) => return Err(e.into()),
2496 };
2497
2498 let (entity_id, created) = if let Some(id) = existing_id {
2499 (id, false)
2500 } else {
2501 let collision: Option<String> = match conn.query_row(
2502 "SELECT id FROM memories
2503 WHERE namespace = ?1 AND title = ?2
2504 AND COALESCE(json_extract(metadata, '$.kind'), '') != ?3",
2505 params![namespace, canonical_name, ENTITY_KIND],
2506 |r| r.get::<_, String>(0),
2507 ) {
2508 Ok(id) => Some(id),
2509 Err(rusqlite::Error::QueryReturnedNoRows) => None,
2510 Err(e) => return Err(e.into()),
2511 };
2512 if collision.is_some() {
2513 anyhow::bail!(
2514 "entity_register: title '{canonical_name}' in namespace '{namespace}' is already used by a non-entity memory"
2515 );
2516 }
2517
2518 let mut meta_map = match extra_metadata {
2521 serde_json::Value::Object(m) => m.clone(),
2522 _ => serde_json::Map::new(),
2523 };
2524 meta_map.insert(
2525 "kind".to_string(),
2526 serde_json::Value::String(ENTITY_KIND.to_string()),
2527 );
2528 if let Some(a) = agent_id {
2529 meta_map
2530 .entry("agent_id".to_string())
2531 .or_insert(serde_json::Value::String(a.to_string()));
2532 }
2533 let metadata = serde_json::Value::Object(meta_map);
2534
2535 let now = Utc::now().to_rfc3339();
2536 let mem = Memory {
2537 id: uuid::Uuid::new_v4().to_string(),
2538 tier: Tier::Long,
2539 namespace: namespace.to_string(),
2540 title: canonical_name.to_string(),
2541 content: canonical_name.to_string(),
2542 tags: vec![ENTITY_TAG.to_string()],
2543 priority: 7,
2544 confidence: 1.0,
2545 source: "api".to_string(),
2546 access_count: 0,
2547 created_at: now.clone(),
2548 updated_at: now,
2549 last_accessed_at: None,
2550 expires_at: None,
2551 metadata,
2552 };
2553 let id = insert(conn, &mem).context("insert entity memory")?;
2554 (id, true)
2555 };
2556
2557 let now = Utc::now().to_rfc3339();
2558 {
2559 let mut stmt = conn.prepare(
2560 "INSERT OR IGNORE INTO entity_aliases (entity_id, alias, created_at)
2561 VALUES (?1, ?2, ?3)",
2562 )?;
2563 for alias in aliases {
2564 let trimmed = alias.trim();
2565 if trimmed.is_empty() {
2566 continue;
2567 }
2568 stmt.execute(params![entity_id, trimmed, now])?;
2569 }
2570 }
2571
2572 let aliases_out = list_entity_aliases(conn, &entity_id)?;
2573
2574 Ok(EntityRegistration {
2575 entity_id,
2576 canonical_name: canonical_name.to_string(),
2577 namespace: namespace.to_string(),
2578 aliases: aliases_out,
2579 created,
2580 })
2581}
2582
2583pub fn entity_get_by_alias(
2594 conn: &Connection,
2595 alias: &str,
2596 namespace: Option<&str>,
2597) -> Result<Option<crate::models::EntityRecord>> {
2598 use crate::models::{ENTITY_KIND, EntityRecord};
2599
2600 let trimmed = alias.trim();
2601 if trimmed.is_empty() {
2602 return Ok(None);
2603 }
2604
2605 let row: std::result::Result<(String, String, String), rusqlite::Error> =
2606 if let Some(ns) = namespace {
2607 conn.query_row(
2608 "SELECT m.id, m.title, m.namespace
2609 FROM entity_aliases ea
2610 JOIN memories m ON m.id = ea.entity_id
2611 WHERE ea.alias = ?1
2612 AND m.namespace = ?2
2613 AND COALESCE(json_extract(m.metadata, '$.kind'), '') = ?3
2614 ORDER BY m.created_at DESC
2615 LIMIT 1",
2616 params![trimmed, ns, ENTITY_KIND],
2617 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2618 )
2619 } else {
2620 conn.query_row(
2621 "SELECT m.id, m.title, m.namespace
2622 FROM entity_aliases ea
2623 JOIN memories m ON m.id = ea.entity_id
2624 WHERE ea.alias = ?1
2625 AND COALESCE(json_extract(m.metadata, '$.kind'), '') = ?2
2626 ORDER BY m.created_at DESC
2627 LIMIT 1",
2628 params![trimmed, ENTITY_KIND],
2629 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2630 )
2631 };
2632
2633 let (entity_id, canonical_name, ns) = match row {
2634 Ok(t) => t,
2635 Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
2636 Err(e) => return Err(e.into()),
2637 };
2638
2639 let aliases = list_entity_aliases(conn, &entity_id)?;
2640 Ok(Some(EntityRecord {
2641 entity_id,
2642 canonical_name,
2643 namespace: ns,
2644 aliases,
2645 }))
2646}
2647
2648pub const KG_TIMELINE_DEFAULT_LIMIT: usize = 200;
2653
2654pub const KG_TIMELINE_MAX_LIMIT: usize = 1000;
2657
2658pub fn kg_timeline(
2697 conn: &Connection,
2698 source_id: &str,
2699 since: Option<&str>,
2700 until: Option<&str>,
2701 limit: Option<usize>,
2702) -> Result<Vec<crate::models::KgTimelineEvent>> {
2703 use crate::models::KgTimelineEvent;
2704
2705 let cap = limit
2706 .unwrap_or(KG_TIMELINE_DEFAULT_LIMIT)
2707 .clamp(1, KG_TIMELINE_MAX_LIMIT);
2708
2709 let mut sql = String::from(
2712 "SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until,
2713 ml.observed_by, m.title, m.namespace, ml.created_at
2714 FROM memory_links ml
2715 JOIN memories m ON m.id = ml.target_id
2716 WHERE ml.source_id = ?1
2717 AND ml.valid_from IS NOT NULL",
2718 );
2719 let mut binds: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(source_id.to_string())];
2720 if let Some(s) = since {
2721 sql.push_str(" AND ml.valid_from >= ?");
2722 sql.push_str(&(binds.len() + 1).to_string());
2723 binds.push(Box::new(s.to_string()));
2724 }
2725 if let Some(u) = until {
2726 sql.push_str(" AND ml.valid_from <= ?");
2727 sql.push_str(&(binds.len() + 1).to_string());
2728 binds.push(Box::new(u.to_string()));
2729 }
2730 sql.push_str(" ORDER BY ml.valid_from ASC, ml.created_at ASC LIMIT ?");
2731 sql.push_str(&(binds.len() + 1).to_string());
2732 binds.push(Box::new(i64::try_from(cap).unwrap_or(i64::MAX)));
2733
2734 let mut stmt = conn.prepare(&sql)?;
2735 let bind_refs: Vec<&dyn rusqlite::ToSql> = binds.iter().map(AsRef::as_ref).collect();
2736 let rows = stmt.query_map(rusqlite::params_from_iter(bind_refs), |row| {
2737 Ok(KgTimelineEvent {
2738 target_id: row.get(0)?,
2739 relation: row.get(1)?,
2740 valid_from: row.get(2)?,
2741 valid_until: row.get(3)?,
2742 observed_by: row.get(4)?,
2743 title: row.get(5)?,
2744 target_namespace: row.get(6)?,
2745 })
2746 })?;
2747 rows.collect::<rusqlite::Result<Vec<_>>>()
2748 .map_err(Into::into)
2749}
2750
2751#[derive(Debug, Clone, PartialEq, Eq)]
2757pub struct InvalidateResult {
2758 pub valid_until: String,
2759 pub previous_valid_until: Option<String>,
2760}
2761
2762pub fn invalidate_link(
2776 conn: &Connection,
2777 source_id: &str,
2778 target_id: &str,
2779 relation: &str,
2780 valid_until: Option<&str>,
2781) -> Result<Option<InvalidateResult>> {
2782 let stamp = valid_until.map_or_else(|| Utc::now().to_rfc3339(), str::to_string);
2783
2784 let prior = match conn.query_row(
2785 "SELECT valid_until FROM memory_links \
2786 WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
2787 params![source_id, target_id, relation],
2788 |r| r.get::<_, Option<String>>(0),
2789 ) {
2790 Ok(v) => v,
2791 Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
2792 Err(e) => return Err(e.into()),
2793 };
2794
2795 conn.execute(
2796 "UPDATE memory_links SET valid_until = ?4 \
2797 WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
2798 params![source_id, target_id, relation, &stamp],
2799 )?;
2800
2801 Ok(Some(InvalidateResult {
2802 valid_until: stamp,
2803 previous_valid_until: prior,
2804 }))
2805}
2806
2807pub const KG_QUERY_DEFAULT_LIMIT: usize = 200;
2811
2812pub const KG_QUERY_MAX_LIMIT: usize = 1000;
2816
2817pub const KG_QUERY_MAX_SUPPORTED_DEPTH: usize = 5;
2822
2823pub fn kg_query(
2855 conn: &Connection,
2856 source_id: &str,
2857 max_depth: usize,
2858 valid_at: Option<&str>,
2859 allowed_agents: Option<&[String]>,
2860 limit: Option<usize>,
2861) -> Result<Vec<crate::models::KgQueryNode>> {
2862 use crate::models::KgQueryNode;
2863
2864 if max_depth == 0 {
2865 anyhow::bail!("max_depth must be >= 1");
2866 }
2867 if max_depth > KG_QUERY_MAX_SUPPORTED_DEPTH {
2868 anyhow::bail!(
2869 "max_depth={max_depth} exceeds supported depth={KG_QUERY_MAX_SUPPORTED_DEPTH}"
2870 );
2871 }
2872
2873 if let Some(agents) = allowed_agents
2876 && agents.is_empty()
2877 {
2878 return Ok(Vec::new());
2879 }
2880
2881 let cap = limit
2882 .unwrap_or(KG_QUERY_DEFAULT_LIMIT)
2883 .clamp(1, KG_QUERY_MAX_LIMIT);
2884
2885 let mut binds: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
2889 let mut hop_filter = String::new();
2890 if let Some(t) = valid_at {
2891 hop_filter.push_str(" AND ml.valid_from IS NOT NULL AND ml.valid_from <= ?");
2892 binds.push(Box::new(t.to_string()));
2893 hop_filter.push_str(&binds.len().to_string());
2894 hop_filter.push_str(" AND (ml.valid_until IS NULL OR ml.valid_until > ?");
2895 binds.push(Box::new(t.to_string()));
2896 hop_filter.push_str(&binds.len().to_string());
2897 hop_filter.push(')');
2898 }
2899 if let Some(agents) = allowed_agents {
2900 hop_filter.push_str(" AND ml.observed_by IN (");
2902 for (i, a) in agents.iter().enumerate() {
2903 binds.push(Box::new(a.clone()));
2904 if i > 0 {
2905 hop_filter.push_str(", ");
2906 }
2907 hop_filter.push('?');
2908 hop_filter.push_str(&binds.len().to_string());
2909 }
2910 hop_filter.push(')');
2911 }
2912
2913 binds.push(Box::new(source_id.to_string()));
2917 let source_ph = binds.len();
2918 binds.push(Box::new(i64::try_from(max_depth).unwrap_or(i64::MAX)));
2919 let max_depth_ph = binds.len();
2920 binds.push(Box::new(i64::try_from(cap).unwrap_or(i64::MAX)));
2921 let limit_ph = binds.len();
2922
2923 let sql = format!(
2940 "WITH RECURSIVE traversal(\
2941 target_id, relation, valid_from, valid_until, observed_by, \
2942 link_created_at, depth, path\
2943 ) AS (\
2944 SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until, \
2945 ml.observed_by, ml.created_at, 1, \
2946 json_array(ml.source_id, ml.target_id) \
2947 FROM memory_links ml \
2948 WHERE ml.source_id = ?{source_ph}{hop_filter} \
2949 UNION ALL \
2950 SELECT ml.target_id, ml.relation, ml.valid_from, ml.valid_until, \
2951 ml.observed_by, ml.created_at, t.depth + 1, \
2952 json_insert(t.path, '$[' || json_array_length(t.path) || ']', ml.target_id) \
2953 FROM memory_links ml \
2954 JOIN traversal t ON ml.source_id = t.target_id \
2955 WHERE t.depth < ?{max_depth_ph} \
2956 AND NOT EXISTS (SELECT 1 FROM json_each(t.path) WHERE value = ml.target_id)\
2957 {hop_filter}\
2958 ) \
2959 SELECT t.target_id, t.relation, t.valid_from, t.valid_until, \
2960 t.observed_by, m.title, m.namespace, t.depth, \
2961 (SELECT group_concat(value, '->') FROM json_each(t.path)) \
2962 FROM traversal t \
2963 JOIN memories m ON m.id = t.target_id \
2964 ORDER BY t.depth ASC, COALESCE(t.valid_from, t.link_created_at) ASC, \
2965 t.link_created_at ASC \
2966 LIMIT ?{limit_ph}",
2967 );
2968
2969 let mut stmt = conn.prepare(&sql)?;
2970 let bind_refs: Vec<&dyn rusqlite::ToSql> = binds.iter().map(AsRef::as_ref).collect();
2971 let rows = stmt.query_map(rusqlite::params_from_iter(bind_refs), |row| {
2972 let target_id: String = row.get(0)?;
2973 let depth: i64 = row.get(7)?;
2974 Ok(KgQueryNode {
2975 target_id,
2976 relation: row.get(1)?,
2977 valid_from: row.get(2)?,
2978 valid_until: row.get(3)?,
2979 observed_by: row.get(4)?,
2980 title: row.get(5)?,
2981 target_namespace: row.get(6)?,
2982 depth: usize::try_from(depth).unwrap_or(0),
2983 path: row.get(8)?,
2984 })
2985 })?;
2986 rows.collect::<rusqlite::Result<Vec<_>>>()
2987 .map_err(Into::into)
2988}
2989
2990fn list_entity_aliases(conn: &Connection, entity_id: &str) -> Result<Vec<String>> {
2993 let mut stmt = conn.prepare(
2994 "SELECT alias FROM entity_aliases
2995 WHERE entity_id = ?1
2996 ORDER BY created_at ASC, alias ASC",
2997 )?;
2998 let aliases: Vec<String> = stmt
2999 .query_map(params![entity_id], |r| r.get::<_, String>(0))?
3000 .collect::<rusqlite::Result<Vec<_>>>()?;
3001 Ok(aliases)
3002}
3003
3004pub fn register_agent(
3013 conn: &Connection,
3014 agent_id: &str,
3015 agent_type: &str,
3016 capabilities: &[String],
3017) -> Result<String> {
3018 let title = format!("agent:{agent_id}");
3019 let now = Utc::now().to_rfc3339();
3020
3021 let registered_at = conn
3023 .query_row(
3024 "SELECT json_extract(metadata, '$.registered_at') FROM memories
3025 WHERE namespace = ?1 AND title = ?2",
3026 params![AGENTS_NAMESPACE, &title],
3027 |row| row.get::<_, Option<String>>(0),
3028 )
3029 .ok()
3030 .flatten()
3031 .unwrap_or_else(|| now.clone());
3032
3033 let caps_json: Vec<serde_json::Value> = capabilities
3034 .iter()
3035 .map(|c| serde_json::Value::String(c.clone()))
3036 .collect();
3037
3038 let metadata = serde_json::json!({
3039 "agent_id": agent_id,
3040 "agent_type": agent_type,
3041 "capabilities": caps_json,
3042 "registered_at": registered_at,
3043 "last_seen_at": now,
3044 });
3045
3046 let content = serde_json::to_string(&metadata)
3047 .context("failed to serialize agent registration content")?;
3048
3049 let mem = Memory {
3050 id: uuid::Uuid::new_v4().to_string(),
3051 tier: Tier::Long,
3052 namespace: AGENTS_NAMESPACE.to_string(),
3053 title,
3054 content,
3055 tags: vec!["agent-registration".to_string()],
3056 priority: 5,
3057 confidence: 1.0,
3058 source: "system".to_string(),
3059 access_count: 0,
3060 created_at: now.clone(),
3061 updated_at: now,
3062 last_accessed_at: None,
3063 expires_at: None,
3064 metadata,
3065 };
3066
3067 insert(conn, &mem)
3068}
3069
3070pub fn list_agents(conn: &Connection) -> Result<Vec<AgentRegistration>> {
3073 let now = Utc::now().to_rfc3339();
3074 let mut stmt = conn.prepare(
3075 "SELECT metadata FROM memories
3076 WHERE namespace = ?1
3077 AND (expires_at IS NULL OR expires_at > ?2)
3078 ORDER BY json_extract(metadata, '$.registered_at') ASC",
3079 )?;
3080 let rows = stmt.query_map(params![AGENTS_NAMESPACE, now], |row| {
3081 row.get::<_, String>(0)
3082 })?;
3083
3084 let mut agents = Vec::new();
3085 for r in rows {
3086 let raw = r?;
3087 let meta: serde_json::Value =
3088 serde_json::from_str(&raw).context("failed to parse agent metadata as JSON")?;
3089 let agent_id = meta
3090 .get("agent_id")
3091 .and_then(serde_json::Value::as_str)
3092 .unwrap_or_default()
3093 .to_string();
3094 let agent_type = meta
3095 .get("agent_type")
3096 .and_then(serde_json::Value::as_str)
3097 .unwrap_or_default()
3098 .to_string();
3099 let capabilities: Vec<String> = meta
3100 .get("capabilities")
3101 .and_then(serde_json::Value::as_array)
3102 .map(|arr| {
3103 arr.iter()
3104 .filter_map(|v| v.as_str().map(String::from))
3105 .collect()
3106 })
3107 .unwrap_or_default();
3108 let registered_at = meta
3109 .get("registered_at")
3110 .and_then(serde_json::Value::as_str)
3111 .unwrap_or_default()
3112 .to_string();
3113 let last_seen_at = meta
3114 .get("last_seen_at")
3115 .and_then(serde_json::Value::as_str)
3116 .unwrap_or_default()
3117 .to_string();
3118 agents.push(AgentRegistration {
3119 agent_id,
3120 agent_type,
3121 capabilities,
3122 registered_at,
3123 last_seen_at,
3124 });
3125 }
3126 Ok(agents)
3127}
3128
3129pub fn stats(conn: &Connection, db_path: &Path) -> Result<Stats> {
3130 let total: usize = conn.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))?;
3131
3132 let mut stmt =
3133 conn.prepare("SELECT tier, COUNT(*) FROM memories GROUP BY tier ORDER BY COUNT(*) DESC")?;
3134 let by_tier = stmt
3135 .query_map([], |row| {
3136 Ok(TierCount {
3137 tier: row.get(0)?,
3138 count: row.get(1)?,
3139 })
3140 })?
3141 .collect::<rusqlite::Result<Vec<_>>>()?;
3142
3143 let mut stmt = conn.prepare(
3144 "SELECT namespace, COUNT(*) FROM memories GROUP BY namespace ORDER BY COUNT(*) DESC",
3145 )?;
3146 let by_namespace = stmt
3147 .query_map([], |row| {
3148 Ok(NamespaceCount {
3149 namespace: row.get(0)?,
3150 count: row.get(1)?,
3151 })
3152 })?
3153 .collect::<rusqlite::Result<Vec<_>>>()?;
3154
3155 let now = Utc::now().to_rfc3339();
3156 let one_hour = (Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
3157 let expiring_soon: usize = conn.query_row(
3158 "SELECT COUNT(*) FROM memories WHERE expires_at IS NOT NULL AND expires_at > ?1 AND expires_at <= ?2",
3159 params![now, one_hour], |r| r.get(0),
3160 )?;
3161
3162 let links_count: usize = conn
3163 .query_row("SELECT COUNT(*) FROM memory_links", [], |r| r.get(0))
3164 .unwrap_or(0);
3165 let db_size_bytes = std::fs::metadata(db_path).map_or(0, |m| m.len());
3166 let dim_violations = dim_violations(conn).unwrap_or(0);
3169
3170 let index_evictions_total = crate::hnsw::index_evictions_total();
3175
3176 Ok(Stats {
3177 total,
3178 by_tier,
3179 by_namespace,
3180 expiring_soon,
3181 links_count,
3182 db_size_bytes,
3183 dim_violations,
3184 index_evictions_total,
3185 })
3186}
3187
3188pub fn gc_if_needed(conn: &Connection, archive: bool) -> Result<usize> {
3190 let now = Utc::now().to_rfc3339();
3191 let has_expired: bool = conn
3192 .query_row(
3193 "SELECT EXISTS(SELECT 1 FROM memories WHERE expires_at IS NOT NULL AND expires_at < ?1)",
3194 params![now],
3195 |r| r.get(0),
3196 )
3197 .unwrap_or(false);
3198 if has_expired {
3199 gc(conn, archive)
3200 } else {
3201 Ok(0)
3202 }
3203}
3204
3205pub fn auto_purge_archive(conn: &Connection, max_days: Option<i64>) -> Result<usize> {
3207 match max_days {
3208 Some(days) if days > 0 => purge_archive(conn, Some(days)),
3209 _ => Ok(0),
3210 }
3211}
3212
3213pub fn gc(conn: &Connection, archive: bool) -> Result<usize> {
3214 let now = Utc::now().to_rfc3339();
3215 conn.execute_batch("BEGIN IMMEDIATE")?;
3216 let result = (|| -> Result<usize> {
3217 if archive {
3218 conn.execute(
3220 "INSERT OR REPLACE INTO archived_memories
3221 (id, tier, namespace, title, content, tags, priority, confidence,
3222 source, access_count, created_at, updated_at, last_accessed_at,
3223 expires_at, archived_at, archive_reason, metadata,
3224 embedding, embedding_dim, original_tier, original_expires_at)
3225 SELECT id, tier, namespace, title, content, tags, priority, confidence,
3226 source, access_count, created_at, updated_at, last_accessed_at,
3227 expires_at, ?1, 'ttl_expired', metadata,
3228 embedding, embedding_dim, tier, expires_at
3229 FROM memories
3230 WHERE expires_at IS NOT NULL AND expires_at < ?1",
3231 params![now],
3232 )?;
3233 }
3234 let deleted = conn.execute(
3235 "DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at < ?1",
3236 params![now],
3237 )?;
3238 Ok(deleted)
3239 })();
3240 match result {
3241 Ok(n) => {
3242 conn.execute_batch("COMMIT")?;
3243 let _ = conn.execute(
3245 "DELETE FROM namespace_meta WHERE standard_id NOT IN (SELECT id FROM memories)",
3246 [],
3247 );
3248 Ok(n)
3249 }
3250 Err(e) => {
3251 let _ = conn.execute_batch("ROLLBACK");
3252 Err(e)
3253 }
3254 }
3255}
3256
3257pub fn list_archived(
3262 conn: &Connection,
3263 namespace: Option<&str>,
3264 limit: usize,
3265 offset: usize,
3266) -> Result<Vec<serde_json::Value>> {
3267 let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match namespace {
3268 Some(ns) => (
3269 "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
3270 source, access_count, created_at, updated_at, last_accessed_at, \
3271 expires_at, archived_at, archive_reason, metadata \
3272 FROM archived_memories WHERE namespace = ?1 \
3273 ORDER BY archived_at DESC LIMIT ?2 OFFSET ?3"
3274 .to_string(),
3275 vec![Box::new(ns.to_string()), Box::new(limit), Box::new(offset)],
3276 ),
3277 None => (
3278 "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
3279 source, access_count, created_at, updated_at, last_accessed_at, \
3280 expires_at, archived_at, archive_reason, metadata \
3281 FROM archived_memories \
3282 ORDER BY archived_at DESC LIMIT ?1 OFFSET ?2"
3283 .to_string(),
3284 vec![Box::new(limit), Box::new(offset)],
3285 ),
3286 };
3287 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
3288 params_vec.iter().map(std::convert::AsRef::as_ref).collect();
3289 let mut stmt = conn.prepare(&sql)?;
3290 let rows = stmt.query_map(params_refs.as_slice(), |row| {
3291 let metadata_str = row
3292 .get::<_, String>(16)
3293 .unwrap_or_else(|_| "{}".to_string());
3294 let metadata: serde_json::Value =
3295 serde_json::from_str(&metadata_str).unwrap_or_else(|_| serde_json::json!({}));
3296 Ok(serde_json::json!({
3297 "id": row.get::<_, String>(0)?,
3298 "tier": row.get::<_, String>(1)?,
3299 "namespace": row.get::<_, String>(2)?,
3300 "title": row.get::<_, String>(3)?,
3301 "content": row.get::<_, String>(4)?,
3302 "tags": row.get::<_, String>(5)?,
3303 "priority": row.get::<_, i32>(6)?,
3304 "confidence": row.get::<_, f64>(7)?,
3305 "source": row.get::<_, String>(8)?,
3306 "access_count": row.get::<_, i64>(9)?,
3307 "created_at": row.get::<_, String>(10)?,
3308 "updated_at": row.get::<_, String>(11)?,
3309 "last_accessed_at": row.get::<_, Option<String>>(12)?,
3310 "expires_at": row.get::<_, Option<String>>(13)?,
3311 "archived_at": row.get::<_, String>(14)?,
3312 "archive_reason": row.get::<_, String>(15)?,
3313 "metadata": metadata,
3314 }))
3315 })?;
3316 rows.collect::<rusqlite::Result<Vec<_>>>()
3317 .map_err(Into::into)
3318}
3319
3320pub fn restore_archived(conn: &Connection, id: &str) -> Result<bool> {
3321 let now = Utc::now().to_rfc3339();
3322 conn.execute_batch("BEGIN IMMEDIATE")?;
3323 let result = (|| -> Result<bool> {
3324 let exists: bool = conn
3325 .query_row(
3326 "SELECT COUNT(*) > 0 FROM archived_memories WHERE id = ?1",
3327 params![id],
3328 |r| r.get(0),
3329 )
3330 .unwrap_or(false);
3331 if !exists {
3332 return Ok(false);
3333 }
3334 let active_exists: bool = conn
3336 .query_row(
3337 "SELECT COUNT(*) > 0 FROM memories WHERE id = ?1",
3338 params![id],
3339 |r| r.get(0),
3340 )
3341 .unwrap_or(false);
3342 if active_exists {
3343 anyhow::bail!(
3344 "cannot restore: memory {id} already exists in active table (would overwrite)"
3345 );
3346 }
3347 let archived_metadata: String = conn
3349 .query_row(
3350 "SELECT metadata FROM archived_memories WHERE id = ?1",
3351 params![id],
3352 |r| r.get(0),
3353 )
3354 .unwrap_or_else(|_| "{}".to_string());
3355 let meta_value: serde_json::Value =
3356 serde_json::from_str(&archived_metadata).unwrap_or_else(|_| serde_json::json!({}));
3357 if let Err(e) = crate::validate::validate_metadata(&meta_value) {
3358 tracing::warn!("archived memory {id} has invalid metadata, resetting to {{}}: {e}");
3359 conn.execute(
3360 "UPDATE archived_memories SET metadata = '{}' WHERE id = ?1",
3361 params![id],
3362 )?;
3363 }
3364
3365 conn.execute(
3371 "INSERT INTO memories
3372 (id, tier, namespace, title, content, tags, priority, confidence,
3373 source, access_count, created_at, updated_at, last_accessed_at,
3374 expires_at, metadata, embedding, embedding_dim)
3375 SELECT id, COALESCE(original_tier, 'long'), namespace, title, content,
3376 tags, priority, confidence, source, access_count, created_at,
3377 ?1, last_accessed_at, original_expires_at, metadata,
3378 embedding, embedding_dim
3379 FROM archived_memories WHERE id = ?2",
3380 params![now, id],
3381 )?;
3382 conn.execute("DELETE FROM archived_memories WHERE id = ?1", params![id])?;
3383 Ok(true)
3384 })();
3385 match result {
3386 Ok(v) => {
3387 conn.execute_batch("COMMIT")?;
3388 Ok(v)
3389 }
3390 Err(e) => {
3391 let _ = conn.execute_batch("ROLLBACK");
3392 Err(e)
3393 }
3394 }
3395}
3396
3397pub fn purge_archive(conn: &Connection, older_than_days: Option<i64>) -> Result<usize> {
3398 match older_than_days {
3399 Some(days) if days < 0 => {
3400 anyhow::bail!("older_than_days must be non-negative (got {days})");
3401 }
3402 Some(days) => {
3403 let cutoff = (Utc::now() - chrono::Duration::days(days)).to_rfc3339();
3404 let deleted = conn.execute(
3405 "DELETE FROM archived_memories WHERE archived_at < ?1",
3406 params![cutoff],
3407 )?;
3408 Ok(deleted)
3409 }
3410 None => {
3411 let deleted = conn.execute("DELETE FROM archived_memories", [])?;
3412 Ok(deleted)
3413 }
3414 }
3415}
3416
3417pub fn archive_stats(conn: &Connection) -> Result<serde_json::Value> {
3418 let total: i64 = conn.query_row("SELECT COUNT(*) FROM archived_memories", [], |r| r.get(0))?;
3419 let mut stmt = conn.prepare(
3420 "SELECT namespace, COUNT(*) FROM archived_memories GROUP BY namespace ORDER BY COUNT(*) DESC",
3421 )?;
3422 let by_ns: Vec<serde_json::Value> = stmt
3423 .query_map([], |row| {
3424 Ok(serde_json::json!({
3425 "namespace": row.get::<_, String>(0)?,
3426 "count": row.get::<_, i64>(1)?,
3427 }))
3428 })?
3429 .collect::<rusqlite::Result<Vec<_>>>()?;
3430 Ok(serde_json::json!({
3431 "archived_total": total,
3432 "by_namespace": by_ns,
3433 }))
3434}
3435
3436pub fn export_all(conn: &Connection) -> Result<Vec<Memory>> {
3437 let now = Utc::now().to_rfc3339();
3438 let mut stmt = conn.prepare(
3439 "SELECT * FROM memories WHERE expires_at IS NULL OR expires_at > ?1 ORDER BY created_at ASC",
3440 )?;
3441 let rows = stmt.query_map(params![now], row_to_memory)?;
3442 rows.collect::<rusqlite::Result<Vec<_>>>()
3443 .map_err(Into::into)
3444}
3445
3446pub fn export_links(conn: &Connection) -> Result<Vec<MemoryLink>> {
3447 let now = Utc::now().to_rfc3339();
3448 let mut stmt = conn.prepare(
3449 "SELECT ml.source_id, ml.target_id, ml.relation, ml.created_at
3450 FROM memory_links ml
3451 JOIN memories ms ON ms.id = ml.source_id AND (ms.expires_at IS NULL OR ms.expires_at > ?1)
3452 JOIN memories mt ON mt.id = ml.target_id AND (mt.expires_at IS NULL OR mt.expires_at > ?1)",
3453 )?;
3454 let rows = stmt.query_map(params![now], |row| {
3455 Ok(MemoryLink {
3456 source_id: row.get(0)?,
3457 target_id: row.get(1)?,
3458 relation: row.get(2)?,
3459 created_at: row.get(3)?,
3460 })
3461 })?;
3462 rows.collect::<rusqlite::Result<Vec<_>>>()
3463 .map_err(Into::into)
3464}
3465
3466pub fn insert_if_newer(conn: &Connection, mem: &Memory) -> Result<String> {
3478 let tags_json = serde_json::to_string(&mem.tags)?;
3479 let metadata_json = serde_json::to_string(&mem.metadata)?;
3480 let actual_id: String = conn.query_row(
3481 "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at, last_accessed_at, expires_at, metadata)
3482 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3483 ON CONFLICT(title, namespace) DO UPDATE SET
3484 content = CASE WHEN excluded.updated_at > memories.updated_at
3485 OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3486 THEN excluded.content ELSE memories.content END,
3487 tags = CASE WHEN excluded.updated_at > memories.updated_at
3488 OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3489 THEN excluded.tags ELSE memories.tags END,
3490 priority = MAX(memories.priority, excluded.priority),
3491 confidence = MAX(memories.confidence, excluded.confidence),
3492 source = CASE WHEN excluded.updated_at > memories.updated_at
3493 OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3494 THEN excluded.source ELSE memories.source END,
3495 tier = CASE WHEN excluded.tier = 'long' THEN 'long'
3496 WHEN memories.tier = 'long' THEN 'long'
3497 WHEN excluded.tier = 'mid' THEN 'mid'
3498 ELSE memories.tier END,
3499 updated_at = MAX(memories.updated_at, excluded.updated_at),
3500 access_count = MAX(memories.access_count, excluded.access_count),
3501 expires_at = CASE WHEN excluded.tier = 'long' OR memories.tier = 'long' THEN NULL
3502 ELSE COALESCE(excluded.expires_at, memories.expires_at) END,
3503 -- Preserve metadata.agent_id across newer-wins merge (NHI provenance immutable).
3504 metadata = CASE
3505 WHEN json_extract(memories.metadata, '$.agent_id') IS NOT NULL
3506 THEN json_set(
3507 CASE WHEN excluded.updated_at > memories.updated_at
3508 OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3509 THEN excluded.metadata
3510 ELSE memories.metadata END,
3511 '$.agent_id',
3512 json_extract(memories.metadata, '$.agent_id')
3513 )
3514 ELSE CASE WHEN excluded.updated_at > memories.updated_at
3515 OR (excluded.updated_at = memories.updated_at AND excluded.id > memories.id)
3516 THEN excluded.metadata
3517 ELSE memories.metadata END
3518 END
3519 RETURNING id",
3520 params![
3521 mem.id, mem.tier.as_str(), mem.namespace, mem.title, mem.content,
3522 tags_json, mem.priority, mem.confidence, mem.source, mem.access_count,
3523 mem.created_at, mem.updated_at, mem.last_accessed_at, mem.expires_at,
3524 metadata_json,
3525 ],
3526 |r| r.get(0),
3527 )?;
3528 Ok(actual_id)
3529}
3530
3531#[derive(Debug)]
3539pub struct EmbeddingDimMismatch {
3540 pub namespace: String,
3541 pub established: usize,
3542 pub attempted: usize,
3543}
3544
3545impl std::fmt::Display for EmbeddingDimMismatch {
3546 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3547 write!(
3548 f,
3549 "embedding dim mismatch in namespace '{}': established {}-dim, refused {}-dim write",
3550 self.namespace, self.established, self.attempted
3551 )
3552 }
3553}
3554
3555impl std::error::Error for EmbeddingDimMismatch {}
3556
3557pub fn namespace_embedding_dim(conn: &Connection, namespace: &str) -> Result<Option<usize>> {
3564 let dim: Option<i64> = conn
3566 .query_row(
3567 "SELECT embedding_dim FROM memories \
3568 WHERE namespace = ?1 AND embedding_dim IS NOT NULL \
3569 LIMIT 1",
3570 params![namespace],
3571 |r| r.get(0),
3572 )
3573 .ok();
3574 Ok(dim.and_then(|d| usize::try_from(d).ok()))
3575}
3576
3577pub fn dim_violations(conn: &Connection) -> Result<u64> {
3585 let n: i64 = conn
3590 .query_row(
3591 "SELECT COUNT(*) FROM memories \
3592 WHERE embedding IS NOT NULL \
3593 AND length(embedding) >= 4 \
3594 AND ( \
3595 embedding_dim IS NULL \
3596 OR ( \
3597 (length(embedding) % 4 = 0 AND embedding_dim != length(embedding)/4) \
3598 OR (length(embedding) % 4 = 1 AND embedding_dim != (length(embedding)-1)/4) \
3599 OR (length(embedding) % 4 NOT IN (0,1)) \
3600 ) \
3601 )",
3602 [],
3603 |r| r.get(0),
3604 )
3605 .unwrap_or(0);
3606 Ok(u64::try_from(n).unwrap_or(0))
3607}
3608
3609pub fn set_embedding(conn: &Connection, id: &str, embedding: &[f32]) -> Result<()> {
3623 let namespace: Option<String> = conn
3625 .query_row(
3626 "SELECT namespace FROM memories WHERE id = ?1",
3627 params![id],
3628 |r| r.get(0),
3629 )
3630 .ok();
3631 let attempted = embedding.len();
3632 if attempted == 0 {
3633 let bytes = crate::embeddings::encode_embedding_blob(embedding);
3637 conn.execute(
3638 "UPDATE memories SET embedding = ?1, embedding_dim = NULL WHERE id = ?2",
3639 params![bytes, id],
3640 )?;
3641 return Ok(());
3642 }
3643 if let Some(ref ns) = namespace
3644 && let Some(established) = namespace_embedding_dim(conn, ns)?
3645 && established != attempted
3646 {
3647 return Err(EmbeddingDimMismatch {
3648 namespace: ns.clone(),
3649 established,
3650 attempted,
3651 }
3652 .into());
3653 }
3654 let bytes = crate::embeddings::encode_embedding_blob(embedding);
3655 let dim_i64 = i64::try_from(attempted).unwrap_or(i64::MAX);
3656 conn.execute(
3657 "UPDATE memories SET embedding = ?1, embedding_dim = ?2 WHERE id = ?3",
3658 params![bytes, dim_i64, id],
3659 )?;
3660 Ok(())
3661}
3662
3663pub fn get_embedding(conn: &Connection, id: &str) -> Result<Option<Vec<f32>>> {
3674 let result: Option<Vec<u8>> = conn
3675 .query_row(
3676 "SELECT embedding FROM memories WHERE id = ?1",
3677 params![id],
3678 |row| row.get(0),
3679 )
3680 .ok();
3681 match result {
3682 Some(bytes) if !bytes.is_empty() => {
3683 let floats = crate::embeddings::decode_embedding_blob(&bytes)?;
3684 Ok(Some(floats))
3685 }
3686 _ => Ok(None),
3687 }
3688}
3689
3690pub fn get_unembedded_ids(conn: &Connection) -> Result<Vec<(String, String, String)>> {
3692 let mut stmt =
3693 conn.prepare("SELECT id, title, content FROM memories WHERE embedding IS NULL")?;
3694 let rows = stmt.query_map([], |row| {
3695 Ok((
3696 row.get::<_, String>(0)?,
3697 row.get::<_, String>(1)?,
3698 row.get::<_, String>(2)?,
3699 ))
3700 })?;
3701 rows.collect::<rusqlite::Result<Vec<_>>>()
3702 .map_err(Into::into)
3703}
3704
3705pub fn get_all_embeddings(conn: &Connection) -> Result<Vec<(String, Vec<f32>)>> {
3712 let mut stmt =
3713 conn.prepare("SELECT id, embedding FROM memories WHERE embedding IS NOT NULL")?;
3714 let rows = stmt.query_map([], |row| {
3715 let id: String = row.get(0)?;
3716 let bytes: Vec<u8> = row.get(1)?;
3717 Ok((id, bytes))
3718 })?;
3719 let mut entries = Vec::new();
3720 for row in rows {
3721 let (id, bytes) = row?;
3722 if bytes.is_empty() {
3723 continue;
3724 }
3725 match crate::embeddings::decode_embedding_blob(&bytes) {
3726 Ok(floats) => entries.push((id, floats)),
3727 Err(e) => {
3728 tracing::warn!(
3729 memory_id = %id,
3730 error = %e,
3731 "skipping memory with malformed embedding BLOB during HNSW build"
3732 );
3733 }
3734 }
3735 }
3736 Ok(entries)
3737}
3738
3739#[allow(clippy::too_many_arguments)]
3744#[allow(clippy::too_many_arguments)]
3750pub fn recall_hybrid(
3751 conn: &Connection,
3752 context: &str,
3753 query_embedding: &[f32],
3754 namespace: Option<&str>,
3755 limit: usize,
3756 tags_filter: Option<&str>,
3757 since: Option<&str>,
3758 until: Option<&str>,
3759 vector_index: Option<&crate::hnsw::VectorIndex>,
3760 short_extend: i64,
3761 mid_extend: i64,
3762 as_agent: Option<&str>,
3763 budget_tokens: Option<usize>,
3764 scoring: &crate::config::ResolvedScoring,
3765) -> Result<(Vec<(Memory, f64)>, BudgetOutcome)> {
3766 let (results, outcome, _telemetry) = recall_hybrid_with_telemetry(
3767 conn,
3768 context,
3769 query_embedding,
3770 namespace,
3771 limit,
3772 tags_filter,
3773 since,
3774 until,
3775 vector_index,
3776 short_extend,
3777 mid_extend,
3778 as_agent,
3779 budget_tokens,
3780 scoring,
3781 )?;
3782 Ok((results, outcome))
3783}
3784
3785#[allow(clippy::too_many_arguments)]
3798#[allow(clippy::too_many_lines)]
3799pub fn recall_hybrid_with_telemetry(
3800 conn: &Connection,
3801 context: &str,
3802 query_embedding: &[f32],
3803 namespace: Option<&str>,
3804 limit: usize,
3805 tags_filter: Option<&str>,
3806 since: Option<&str>,
3807 until: Option<&str>,
3808 vector_index: Option<&crate::hnsw::VectorIndex>,
3809 short_extend: i64,
3810 mid_extend: i64,
3811 as_agent: Option<&str>,
3812 budget_tokens: Option<usize>,
3813 scoring: &crate::config::ResolvedScoring,
3814) -> Result<(
3815 Vec<(Memory, f64)>,
3816 BudgetOutcome,
3817 crate::models::RecallTelemetry,
3818)> {
3819 let now = Utc::now().to_rfc3339();
3820 let fts_query = sanitize_fts_query(context, true);
3821 let prefixes = compute_visibility_prefixes(as_agent);
3822 let (vis_p, vis_t, vis_u, vis_o) = prefixes.clone();
3823
3824 let (fts_hierarchy_in, hierarchy_active) = hierarchy_in_clause(namespace);
3828 let fts_hierarchy_fragment = fts_hierarchy_in.unwrap_or_default();
3829 let sem_hierarchy_fragment = if hierarchy_active {
3831 if let Some(ns) = namespace {
3832 let ancestors = crate::models::namespace_ancestors(ns);
3833 let quoted: Vec<String> = ancestors
3834 .iter()
3835 .map(|a| format!("'{}'", a.replace('\'', "''")))
3836 .collect();
3837 format!("AND memories.namespace IN ({})", quoted.join(","))
3838 } else {
3839 String::new()
3840 }
3841 } else {
3842 String::new()
3843 };
3844 let effective_namespace = if hierarchy_active { None } else { namespace };
3845
3846 let fts_limit = (limit * 3).max(30);
3848 let fts_sql = format!(
3849 "SELECT m.id, m.tier, m.namespace, m.title, m.content, m.tags, m.priority,
3850 m.confidence, m.source, m.access_count, m.created_at, m.updated_at,
3851 m.last_accessed_at, m.expires_at, m.metadata, m.embedding,
3852 (fts.rank * -1) + (m.priority * 0.5) + (MIN(m.access_count, 50) * 0.1)
3853 + (m.confidence * 2.0)
3854 + (CASE m.tier WHEN 'long' THEN 3.0 WHEN 'mid' THEN 1.0 ELSE 0.0 END)
3855 + (1.0 / (1.0 + (julianday('now') - julianday(m.updated_at)) * 0.1))
3856 AS fts_score
3857 FROM memories_fts fts
3858 JOIN memories m ON m.rowid = fts.rowid
3859 WHERE memories_fts MATCH ?1
3860 AND (?2 IS NULL OR m.namespace = ?2)
3861 {fts_hierarchy_fragment}
3862 AND (m.expires_at IS NULL OR m.expires_at > ?3)
3863 AND (?4 IS NULL OR EXISTS (SELECT 1 FROM json_each(m.tags) WHERE json_each.value = ?4))
3864 AND (?5 IS NULL OR m.created_at >= ?5)
3865 AND (?6 IS NULL OR m.created_at <= ?6)
3866 {vis}
3867 ORDER BY fts_score DESC
3868 LIMIT ?7",
3869 vis = visibility_clause(8, "m"),
3870 );
3871 let mut fts_stmt = conn.prepare(&fts_sql)?;
3872
3873 let sem_sql = format!(
3875 "SELECT id, tier, namespace, title, content, tags, priority,
3876 confidence, source, access_count, created_at, updated_at,
3877 last_accessed_at, expires_at, metadata, embedding
3878 FROM memories
3879 WHERE embedding IS NOT NULL
3880 AND (?1 IS NULL OR namespace = ?1)
3881 {sem_hierarchy_fragment}
3882 AND (expires_at IS NULL OR expires_at > ?2)
3883 AND (?3 IS NULL OR EXISTS (SELECT 1 FROM json_each(memories.tags) WHERE json_each.value = ?3))
3884 AND (?4 IS NULL OR created_at >= ?4)
3885 AND (?5 IS NULL OR created_at <= ?5)
3886 {vis}",
3887 vis = visibility_clause(6, "memories"),
3888 );
3889 let mut sem_stmt = conn.prepare(&sem_sql)?;
3890
3891 let mut scored: HashMap<String, (Memory, f64, f64)> = HashMap::new(); let fts_rows = fts_stmt.query_map(
3895 params![
3896 fts_query,
3897 effective_namespace,
3898 now,
3899 tags_filter,
3900 since,
3901 until,
3902 fts_limit,
3903 vis_p,
3904 vis_t,
3905 vis_u,
3906 vis_o,
3907 ],
3908 |row| {
3909 let mem = row_to_memory(row)?;
3910 let fts_score: f64 = row.get(16)?;
3911 Ok((mem, fts_score))
3912 },
3913 )?;
3914
3915 let mut fts_candidates_count: usize = 0;
3920 let mut hnsw_candidates_count: usize = 0;
3921
3922 let mut max_fts_score: f64 = 1.0;
3923 for row in fts_rows {
3924 let (mem, fts_score) = row?;
3925 if fts_score > max_fts_score {
3926 max_fts_score = fts_score;
3927 }
3928 let cosine = get_embedding(conn, &mem.id)?.map_or(0.0, |emb| {
3930 f64::from(crate::embeddings::Embedder::cosine_similarity(
3931 query_embedding,
3932 &emb,
3933 ))
3934 });
3935 scored.insert(mem.id.clone(), (mem, fts_score, cosine));
3936 fts_candidates_count += 1;
3937 }
3938
3939 if let Some(idx) = vector_index {
3942 let ann_limit = (limit * 5).max(50);
3944 let hits = idx.search(query_embedding, ann_limit);
3945 for hit in hits {
3946 if scored.contains_key(&hit.id) {
3947 continue;
3948 }
3949 let cosine = f64::from(1.0 - hit.distance);
3950 if cosine > 0.2
3961 && let Some(mem) = get(conn, &hit.id)?
3962 {
3963 if let Some(ns) = namespace {
3967 if hierarchy_active {
3968 let ancestors = crate::models::namespace_ancestors(ns);
3969 if !ancestors.iter().any(|a| a == &mem.namespace) {
3970 continue;
3971 }
3972 } else if mem.namespace != ns {
3973 continue;
3974 }
3975 }
3976 if let Some(exp) = &mem.expires_at
3977 && exp.as_str() <= now.as_str()
3978 {
3979 continue;
3980 }
3981 if let Some(tf) = tags_filter
3982 && !mem.tags.iter().any(|t| t == tf)
3983 {
3984 continue;
3985 }
3986 if let Some(s) = since
3987 && mem.created_at.as_str() < s
3988 {
3989 continue;
3990 }
3991 if let Some(u) = until
3992 && mem.created_at.as_str() > u
3993 {
3994 continue;
3995 }
3996 if !is_visible(&mem, &prefixes) {
3998 continue;
3999 }
4000 scored.insert(mem.id.clone(), (mem, 0.0, cosine));
4001 hnsw_candidates_count += 1;
4002 }
4003 }
4004 } else {
4005 let sem_rows = sem_stmt.query_map(
4007 params![
4008 effective_namespace,
4009 now,
4010 tags_filter,
4011 since,
4012 until,
4013 vis_p,
4014 vis_t,
4015 vis_u,
4016 vis_o
4017 ],
4018 |row| {
4019 let mem = row_to_memory(row)?;
4020 let emb_bytes: Option<Vec<u8>> = row.get(15)?;
4021 Ok((mem, emb_bytes))
4022 },
4023 )?;
4024
4025 for row in sem_rows {
4026 let (mem, emb_bytes) = row?;
4027 if scored.contains_key(&mem.id) {
4028 continue;
4029 }
4030 if let Some(bytes) = emb_bytes
4031 && !bytes.is_empty()
4032 {
4033 let Ok(emb) = crate::embeddings::decode_embedding_blob(&bytes) else {
4037 tracing::warn!(
4038 memory_id = %mem.id,
4039 "skipping malformed embedding BLOB during semantic recall"
4040 );
4041 continue;
4042 };
4043 let cosine = f64::from(crate::embeddings::Embedder::cosine_similarity(
4044 query_embedding,
4045 &emb,
4046 ));
4047 if cosine > 0.2 {
4049 scored.insert(mem.id.clone(), (mem, 0.0, cosine));
4050 hnsw_candidates_count += 1;
4051 }
4052 }
4053 }
4054 }
4055
4056 let now_utc = Utc::now();
4065 let blend_weights: std::cell::RefCell<Vec<f64>> = std::cell::RefCell::new(Vec::new());
4070 let mut results: Vec<(Memory, f64)> = scored
4071 .into_values()
4072 .map(|(mem, fts_score, cosine)| {
4073 let norm_fts = if max_fts_score > 0.0 {
4074 fts_score / max_fts_score
4075 } else {
4076 0.0
4077 };
4078 let content_len = f64::from(i32::try_from(mem.content.len()).expect("usize as i64"));
4079 let semantic_weight = if content_len <= 500.0 {
4081 0.50
4082 } else if content_len >= 5000.0 {
4083 0.15
4084 } else {
4085 0.50 - 0.35 * ((content_len - 500.0) / 4500.0)
4086 };
4087 blend_weights.borrow_mut().push(semantic_weight);
4088 let blended = semantic_weight * cosine + (1.0 - semantic_weight) * norm_fts;
4089 let age_days = chrono::DateTime::parse_from_rfc3339(&mem.created_at)
4090 .ok()
4091 .map_or(0.0, |ts| {
4092 let secs = (now_utc - ts.with_timezone(&Utc)).num_seconds();
4093 #[allow(clippy::cast_precision_loss)]
4098 {
4099 secs as f64 / 86_400.0
4100 }
4101 });
4102 let decay = scoring.decay_multiplier(&mem.tier, age_days);
4103 (mem, blended * decay)
4104 })
4105 .collect();
4106
4107 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
4108 results.truncate(limit);
4109
4110 let boosted = if let (true, Some(anchor)) = (hierarchy_active, namespace) {
4112 apply_proximity_boost(results, anchor)
4113 } else {
4114 results
4115 };
4116
4117 let (budgeted, outcome) = apply_token_budget(boosted, budget_tokens);
4120
4121 for (mem, _) in &budgeted {
4123 if let Err(e) = touch(conn, &mem.id, short_extend, mid_extend) {
4124 tracing::warn!("touch failed for memory {}: {}", &mem.id, e);
4125 }
4126 }
4127
4128 let weights = blend_weights.into_inner();
4136 let blend_weight_avg = if weights.is_empty() {
4137 0.0
4138 } else {
4139 #[allow(clippy::cast_precision_loss)]
4140 let n = weights.len() as f64;
4141 weights.iter().sum::<f64>() / n
4142 };
4143 let telemetry = crate::models::RecallTelemetry {
4144 fts_candidates: fts_candidates_count,
4145 hnsw_candidates: hnsw_candidates_count,
4146 blend_weight_avg,
4147 };
4148
4149 Ok((budgeted, outcome, telemetry))
4150}
4151
4152pub fn checkpoint(conn: &Connection) -> Result<()> {
4154 conn.pragma_update(None, "wal_checkpoint", "TRUNCATE")?;
4155 Ok(())
4156}
4157
4158pub fn sync_state_observe(
4171 conn: &Connection,
4172 agent_id: &str,
4173 peer_id: &str,
4174 seen_at: &str,
4175) -> Result<()> {
4176 let now = Utc::now().to_rfc3339();
4177 conn.execute(
4178 "INSERT INTO sync_state (agent_id, peer_id, last_seen_at, last_pulled_at) \
4179 VALUES (?1, ?2, ?3, ?4) \
4180 ON CONFLICT(agent_id, peer_id) DO UPDATE SET \
4181 last_seen_at = CASE WHEN excluded.last_seen_at > last_seen_at \
4182 THEN excluded.last_seen_at \
4183 ELSE last_seen_at END, \
4184 last_pulled_at = excluded.last_pulled_at",
4185 params![agent_id, peer_id, seen_at, now],
4186 )?;
4187 Ok(())
4188}
4189
4190pub fn sync_state_load(conn: &Connection, agent_id: &str) -> Result<crate::models::VectorClock> {
4193 let mut stmt =
4194 conn.prepare("SELECT peer_id, last_seen_at FROM sync_state WHERE agent_id = ?1")?;
4195 let rows = stmt.query_map(params![agent_id], |row| {
4196 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
4197 })?;
4198 let mut clock = crate::models::VectorClock::default();
4199 for row in rows {
4200 let (peer, at) = row?;
4201 clock.entries.insert(peer, at);
4202 }
4203 Ok(clock)
4204}
4205
4206#[must_use]
4210#[allow(dead_code)] pub fn sync_state_last_pushed(conn: &Connection, agent_id: &str, peer_id: &str) -> Option<String> {
4212 conn.query_row(
4213 "SELECT last_pushed_at FROM sync_state WHERE agent_id = ?1 AND peer_id = ?2",
4214 params![agent_id, peer_id],
4215 |r| r.get::<_, Option<String>>(0),
4216 )
4217 .ok()
4218 .flatten()
4219}
4220
4221#[allow(dead_code)] pub fn sync_state_record_push(
4225 conn: &Connection,
4226 agent_id: &str,
4227 peer_id: &str,
4228 pushed_at: &str,
4229) -> Result<()> {
4230 let now = Utc::now().to_rfc3339();
4231 conn.execute(
4232 "INSERT INTO sync_state (agent_id, peer_id, last_seen_at, last_pulled_at, last_pushed_at) \
4233 VALUES (?1, ?2, ?3, ?3, ?4) \
4234 ON CONFLICT(agent_id, peer_id) DO UPDATE SET \
4235 last_pushed_at = CASE \
4236 WHEN excluded.last_pushed_at IS NULL THEN last_pushed_at \
4237 WHEN last_pushed_at IS NULL THEN excluded.last_pushed_at \
4238 WHEN excluded.last_pushed_at > last_pushed_at THEN excluded.last_pushed_at \
4239 ELSE last_pushed_at END",
4240 params![agent_id, peer_id, now, pushed_at],
4241 )?;
4242 Ok(())
4243}
4244
4245pub fn memories_updated_since(
4249 conn: &Connection,
4250 since: Option<&str>,
4251 limit: usize,
4252) -> Result<Vec<Memory>> {
4253 let mut stmt = conn.prepare(
4254 "SELECT id, tier, namespace, title, content, tags, priority, confidence, \
4255 source, access_count, created_at, updated_at, last_accessed_at, \
4256 expires_at, metadata \
4257 FROM memories \
4258 WHERE (?1 IS NULL OR updated_at > ?1) \
4259 ORDER BY updated_at ASC \
4260 LIMIT ?2",
4261 )?;
4262 let rows = stmt.query_map(params![since, limit], row_to_memory)?;
4263 rows.collect::<rusqlite::Result<Vec<_>>>()
4264 .map_err(Into::into)
4265}
4266
4267pub fn health_check(conn: &Connection) -> Result<bool> {
4269 let _: i64 = conn.query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))?;
4270 conn.execute(
4271 "INSERT INTO memories_fts(memories_fts) VALUES('integrity-check')",
4272 [],
4273 )?;
4274 Ok(true)
4275}
4276
4277pub fn set_namespace_standard(
4283 conn: &Connection,
4284 namespace: &str,
4285 standard_id: &str,
4286 parent: Option<&str>,
4287) -> Result<()> {
4288 let _mem = get(conn, standard_id)?
4290 .ok_or_else(|| anyhow::anyhow!("memory not found: {standard_id}"))?;
4291 let resolved_parent = match parent {
4293 Some(p) => {
4294 if p == namespace {
4295 anyhow::bail!("namespace cannot be its own parent");
4296 }
4297 Some(p.to_string())
4298 }
4299 None => auto_detect_parent(conn, namespace),
4300 };
4301 let now = chrono::Utc::now().to_rfc3339();
4302 conn.execute(
4303 "INSERT INTO namespace_meta (namespace, standard_id, updated_at, parent_namespace)
4304 VALUES (?1, ?2, ?3, ?4)
4305 ON CONFLICT(namespace) DO UPDATE SET standard_id = ?2, updated_at = ?3, parent_namespace = ?4",
4306 params![namespace, standard_id, now, resolved_parent],
4307 )?;
4308 Ok(())
4309}
4310
4311fn auto_detect_parent(conn: &Connection, namespace: &str) -> Option<String> {
4314 let mut candidate = namespace.to_string();
4315 while let Some(pos) = candidate.rfind('-') {
4316 candidate.truncate(pos);
4317 if candidate.is_empty() {
4318 break;
4319 }
4320 if get_namespace_standard(conn, &candidate)
4322 .ok()
4323 .flatten()
4324 .is_some()
4325 {
4326 return Some(candidate);
4327 }
4328 }
4329 None
4330}
4331
4332#[allow(clippy::unnecessary_wraps)]
4334pub fn get_namespace_standard(conn: &Connection, namespace: &str) -> Result<Option<String>> {
4335 let result = conn
4336 .query_row(
4337 "SELECT standard_id FROM namespace_meta WHERE namespace = ?1",
4338 params![namespace],
4339 |r| r.get(0),
4340 )
4341 .ok();
4342 Ok(result)
4343}
4344
4345pub fn get_namespace_parent(conn: &Connection, namespace: &str) -> Option<String> {
4347 conn.query_row(
4348 "SELECT parent_namespace FROM namespace_meta WHERE namespace = ?1 AND parent_namespace IS NOT NULL",
4349 params![namespace],
4350 |r| r.get(0),
4351 )
4352 .ok()
4353}
4354
4355#[allow(clippy::unnecessary_wraps)]
4360pub fn get_namespace_meta_entry(
4361 conn: &Connection,
4362 namespace: &str,
4363) -> Result<Option<crate::models::NamespaceMetaEntry>> {
4364 let row = conn
4365 .query_row(
4366 "SELECT namespace, standard_id, parent_namespace, updated_at
4367 FROM namespace_meta WHERE namespace = ?1",
4368 params![namespace],
4369 |r| {
4370 Ok(crate::models::NamespaceMetaEntry {
4371 namespace: r.get(0)?,
4372 standard_id: r.get(1)?,
4373 parent_namespace: r.get(2)?,
4374 updated_at: r.get::<_, Option<String>>(3)?.unwrap_or_default(),
4375 })
4376 },
4377 )
4378 .ok();
4379 Ok(row)
4380}
4381
4382pub fn clear_namespace_standard(conn: &Connection, namespace: &str) -> Result<bool> {
4384 let changed = conn.execute(
4385 "DELETE FROM namespace_meta WHERE namespace = ?1",
4386 params![namespace],
4387 )?;
4388 Ok(changed > 0)
4389}
4390
4391#[must_use]
4412pub fn build_namespace_chain(conn: &Connection, namespace: &str) -> Vec<String> {
4413 const MAX_EXPLICIT_DEPTH: usize = 8;
4414 let mut chain: Vec<String> = Vec::new();
4415
4416 if namespace == "*" {
4417 chain.push("*".to_string());
4418 return chain;
4419 }
4420
4421 chain.push("*".to_string());
4423
4424 let mut hierarchy_chain: Vec<String> = crate::models::namespace_ancestors(namespace)
4427 .into_iter()
4428 .rev()
4429 .collect();
4430
4431 if let Some(root) = hierarchy_chain.first().cloned() {
4435 let mut explicit_above: Vec<String> = Vec::new();
4436 let mut current = root;
4437 for _ in 0..MAX_EXPLICIT_DEPTH {
4438 match get_namespace_parent(conn, ¤t) {
4439 Some(p)
4440 if p != "*"
4441 && !explicit_above.contains(&p)
4442 && !hierarchy_chain.contains(&p) =>
4443 {
4444 explicit_above.push(p.clone());
4445 current = p;
4446 }
4447 _ => break,
4448 }
4449 }
4450 for p in explicit_above.into_iter().rev() {
4453 chain.push(p);
4454 }
4455 }
4456
4457 for entry in hierarchy_chain.drain(..) {
4459 if !chain.contains(&entry) {
4460 chain.push(entry);
4461 }
4462 }
4463
4464 chain
4465}
4466
4467fn read_namespace_policy(conn: &Connection, namespace: &str) -> Option<GovernancePolicy> {
4472 let standard_id = get_namespace_standard(conn, namespace).ok()??;
4473 let mem = get(conn, &standard_id).ok()??;
4474 match GovernancePolicy::from_metadata(&mem.metadata) {
4475 Some(Ok(p)) => Some(p),
4476 _ => None,
4477 }
4478}
4479
4480pub fn resolve_governance_policy(conn: &Connection, namespace: &str) -> Option<GovernancePolicy> {
4525 let chain = build_namespace_chain(conn, namespace);
4529 for level in chain.into_iter().rev() {
4530 if let Some(policy) = read_namespace_policy(conn, &level) {
4538 return Some(policy);
4539 }
4540 }
4544 None
4545}
4546
4547fn is_registered_agent(conn: &Connection, agent_id: &str) -> bool {
4549 let title = format!("agent:{agent_id}");
4550 conn.query_row(
4551 "SELECT 1 FROM memories WHERE namespace = ?1 AND title = ?2",
4552 params![AGENTS_NAMESPACE, &title],
4553 |r| r.get::<_, i64>(0),
4554 )
4555 .is_ok()
4556}
4557
4558fn evaluate_level(
4564 conn: &Connection,
4565 level: &GovernanceLevel,
4566 agent_id: &str,
4567 memory_owner: Option<&str>,
4568 namespace_owner: Option<&str>,
4569) -> GovernanceDecision {
4570 match level {
4571 GovernanceLevel::Any => GovernanceDecision::Allow,
4572 GovernanceLevel::Registered => {
4573 if is_registered_agent(conn, agent_id) {
4574 GovernanceDecision::Allow
4575 } else {
4576 GovernanceDecision::Deny(format!(
4577 "governance: caller '{agent_id}' is not a registered agent"
4578 ))
4579 }
4580 }
4581 GovernanceLevel::Owner => {
4582 let owner = memory_owner.or(namespace_owner);
4583 match owner {
4584 Some(o) if o == agent_id => GovernanceDecision::Allow,
4585 Some(o) => GovernanceDecision::Deny(format!(
4586 "governance: caller '{agent_id}' is not the owner ('{o}')"
4587 )),
4588 None => GovernanceDecision::Deny(
4589 "governance: owner-level action has no resolvable owner".into(),
4590 ),
4591 }
4592 }
4593 GovernanceLevel::Approve => {
4594 GovernanceDecision::Pending(String::new())
4598 }
4599 }
4600}
4601
4602fn namespace_owner(conn: &Connection, namespace: &str) -> Option<String> {
4605 let standard_id = get_namespace_standard(conn, namespace).ok().flatten()?;
4606 let mem = get(conn, &standard_id).ok().flatten()?;
4607 mem.metadata
4608 .get("agent_id")
4609 .and_then(|v| v.as_str())
4610 .map(str::to_string)
4611}
4612
4613pub fn enforce_governance(
4617 conn: &Connection,
4618 action: GovernedAction,
4619 namespace: &str,
4620 agent_id: &str,
4621 memory_id: Option<&str>,
4622 memory_owner: Option<&str>,
4623 payload: &serde_json::Value,
4624) -> Result<GovernanceDecision> {
4625 let Some(policy) = resolve_governance_policy(conn, namespace) else {
4627 return Ok(GovernanceDecision::Allow);
4628 };
4629 let level = match action {
4630 GovernedAction::Store => &policy.write,
4631 GovernedAction::Delete => &policy.delete,
4632 GovernedAction::Promote => &policy.promote,
4633 };
4634 let ns_owner = if matches!(action, GovernedAction::Store) {
4635 namespace_owner(conn, namespace)
4636 } else {
4637 None
4638 };
4639
4640 let decision = evaluate_level(conn, level, agent_id, memory_owner, ns_owner.as_deref());
4641 if let GovernanceDecision::Pending(_) = decision {
4642 let pending_id =
4643 queue_pending_action(conn, action, namespace, memory_id, agent_id, payload)?;
4644 return Ok(GovernanceDecision::Pending(pending_id));
4645 }
4646 Ok(decision)
4647}
4648
4649pub fn queue_pending_action(
4651 conn: &Connection,
4652 action: GovernedAction,
4653 namespace: &str,
4654 memory_id: Option<&str>,
4655 requested_by: &str,
4656 payload: &serde_json::Value,
4657) -> Result<String> {
4658 let id = uuid::Uuid::new_v4().to_string();
4659 let now = Utc::now().to_rfc3339();
4660 let payload_json = serde_json::to_string(payload)?;
4661 conn.execute(
4662 "INSERT INTO pending_actions (id, action_type, memory_id, namespace, payload, requested_by, requested_at, status)
4663 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 'pending')",
4664 params![
4665 id,
4666 action.as_str(),
4667 memory_id,
4668 namespace,
4669 payload_json,
4670 requested_by,
4671 now,
4672 ],
4673 )?;
4674 Ok(id)
4675}
4676
4677pub fn upsert_pending_action(conn: &Connection, pa: &PendingAction) -> Result<()> {
4685 let payload_json = serde_json::to_string(&pa.payload)?;
4686 let approvals_json = serde_json::to_string(&pa.approvals)?;
4687 conn.execute(
4688 "INSERT INTO pending_actions
4689 (id, action_type, memory_id, namespace, payload, requested_by,
4690 requested_at, status, decided_by, decided_at, approvals)
4691 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
4692 ON CONFLICT(id) DO UPDATE SET
4693 action_type = excluded.action_type,
4694 memory_id = excluded.memory_id,
4695 namespace = excluded.namespace,
4696 payload = excluded.payload,
4697 requested_by = excluded.requested_by,
4698 requested_at = excluded.requested_at,
4699 status = excluded.status,
4700 decided_by = excluded.decided_by,
4701 decided_at = excluded.decided_at,
4702 approvals = excluded.approvals",
4703 params![
4704 pa.id,
4705 pa.action_type,
4706 pa.memory_id,
4707 pa.namespace,
4708 payload_json,
4709 pa.requested_by,
4710 pa.requested_at,
4711 pa.status,
4712 pa.decided_by,
4713 pa.decided_at,
4714 approvals_json,
4715 ],
4716 )?;
4717 Ok(())
4718}
4719
4720pub fn list_pending_actions(
4721 conn: &Connection,
4722 status: Option<&str>,
4723 limit: usize,
4724) -> Result<Vec<PendingAction>> {
4725 let mut stmt = conn.prepare(
4726 "SELECT id, action_type, memory_id, namespace, payload, requested_by,
4727 requested_at, status, decided_by, decided_at, approvals
4728 FROM pending_actions
4729 WHERE (?1 IS NULL OR status = ?1)
4730 ORDER BY requested_at DESC
4731 LIMIT ?2",
4732 )?;
4733 let rows = stmt.query_map(params![status, limit], |row| {
4734 let payload_str: String = row.get(4)?;
4735 let payload: serde_json::Value =
4736 serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null);
4737 let approvals_str: String = row.get(10)?;
4738 let approvals: Vec<Approval> = serde_json::from_str(&approvals_str).unwrap_or_default();
4739 Ok(PendingAction {
4740 id: row.get(0)?,
4741 action_type: row.get(1)?,
4742 memory_id: row.get(2)?,
4743 namespace: row.get(3)?,
4744 payload,
4745 requested_by: row.get(5)?,
4746 requested_at: row.get(6)?,
4747 status: row.get(7)?,
4748 decided_by: row.get(8)?,
4749 decided_at: row.get(9)?,
4750 approvals,
4751 })
4752 })?;
4753 rows.collect::<rusqlite::Result<Vec<_>>>()
4754 .map_err(Into::into)
4755}
4756
4757pub fn get_pending_action(conn: &Connection, id: &str) -> Result<Option<PendingAction>> {
4758 let row = conn.query_row(
4759 "SELECT id, action_type, memory_id, namespace, payload, requested_by,
4760 requested_at, status, decided_by, decided_at, approvals
4761 FROM pending_actions WHERE id = ?1",
4762 params![id],
4763 |row| {
4764 let payload_str: String = row.get(4)?;
4765 let payload: serde_json::Value =
4766 serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null);
4767 let approvals_str: String = row.get(10)?;
4768 let approvals: Vec<Approval> = serde_json::from_str(&approvals_str).unwrap_or_default();
4769 Ok(PendingAction {
4770 id: row.get(0)?,
4771 action_type: row.get(1)?,
4772 memory_id: row.get(2)?,
4773 namespace: row.get(3)?,
4774 payload,
4775 requested_by: row.get(5)?,
4776 requested_at: row.get(6)?,
4777 status: row.get(7)?,
4778 decided_by: row.get(8)?,
4779 decided_at: row.get(9)?,
4780 approvals,
4781 })
4782 },
4783 );
4784 match row {
4785 Ok(p) => Ok(Some(p)),
4786 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
4787 Err(e) => Err(e.into()),
4788 }
4789}
4790
4791pub fn decide_pending_action(
4796 conn: &Connection,
4797 id: &str,
4798 approve: bool,
4799 decided_by: &str,
4800) -> Result<bool> {
4801 let new_status = if approve { "approved" } else { "rejected" };
4802 let now = Utc::now().to_rfc3339();
4803 let updated = conn.execute(
4804 "UPDATE pending_actions SET status = ?1, decided_by = ?2, decided_at = ?3
4805 WHERE id = ?4 AND status = 'pending'",
4806 params![new_status, decided_by, now, id],
4807 )?;
4808 Ok(updated > 0)
4809}
4810
4811#[derive(Debug, Clone, PartialEq, Eq)]
4813pub enum ApproveOutcome {
4814 Rejected(String),
4816 Pending { votes: usize, quorum: u32 },
4818 Approved,
4822}
4823
4824pub fn approve_with_approver_type(
4827 conn: &Connection,
4828 pending_id: &str,
4829 approver_agent_id: &str,
4830) -> Result<ApproveOutcome> {
4831 let Some(pa) = get_pending_action(conn, pending_id)? else {
4832 return Ok(ApproveOutcome::Rejected(format!(
4833 "pending action not found: {pending_id}"
4834 )));
4835 };
4836 if pa.status != "pending" {
4837 return Ok(ApproveOutcome::Rejected(format!(
4838 "already decided: status={}",
4839 pa.status
4840 )));
4841 }
4842 let approver =
4845 resolve_governance_policy(conn, &pa.namespace).map_or(ApproverType::Human, |p| p.approver);
4846
4847 match approver {
4848 ApproverType::Human => {
4849 let ok = decide_pending_action(conn, pending_id, true, approver_agent_id)?;
4850 if ok {
4851 Ok(ApproveOutcome::Approved)
4852 } else {
4853 Ok(ApproveOutcome::Rejected("decision write failed".into()))
4854 }
4855 }
4856 ApproverType::Agent(required) => {
4857 if approver_agent_id != required {
4858 return Ok(ApproveOutcome::Rejected(format!(
4859 "designated approver is '{required}'; got '{approver_agent_id}'"
4860 )));
4861 }
4862 let ok = decide_pending_action(conn, pending_id, true, approver_agent_id)?;
4863 if ok {
4864 Ok(ApproveOutcome::Approved)
4865 } else {
4866 Ok(ApproveOutcome::Rejected("decision write failed".into()))
4867 }
4868 }
4869 ApproverType::Consensus(quorum) => {
4870 if !is_registered_agent(conn, approver_agent_id) {
4882 return Ok(ApproveOutcome::Rejected(format!(
4883 "consensus voter '{approver_agent_id}' is not a registered agent"
4884 )));
4885 }
4886 let canonical_id = approver_agent_id.to_ascii_lowercase();
4887 let mut approvals = pa.approvals.clone();
4888 if approvals
4889 .iter()
4890 .any(|a| a.agent_id.eq_ignore_ascii_case(&canonical_id))
4891 {
4892 return Ok(ApproveOutcome::Pending {
4893 votes: approvals.len(),
4894 quorum,
4895 });
4896 }
4897 approvals.push(Approval {
4898 agent_id: canonical_id.clone(),
4899 approved_at: Utc::now().to_rfc3339(),
4900 });
4901 let approvals_json = serde_json::to_string(&approvals)?;
4902 conn.execute(
4903 "UPDATE pending_actions SET approvals = ?1 WHERE id = ?2 AND status = 'pending'",
4904 params![approvals_json, pending_id],
4905 )?;
4906 let votes = approvals.len();
4907 if u32::try_from(votes).unwrap_or(u32::MAX) >= quorum {
4908 let ok = decide_pending_action(conn, pending_id, true, &canonical_id)?;
4910 if ok {
4911 return Ok(ApproveOutcome::Approved);
4912 }
4913 return Ok(ApproveOutcome::Rejected(
4914 "decision write failed at consensus threshold".into(),
4915 ));
4916 }
4917 Ok(ApproveOutcome::Pending { votes, quorum })
4918 }
4919 }
4920}
4921
4922pub fn execute_pending_action(conn: &Connection, pending_id: &str) -> Result<Option<String>> {
4926 let Some(pa) = get_pending_action(conn, pending_id)? else {
4927 anyhow::bail!("pending action not found: {pending_id}");
4928 };
4929 if pa.status != "approved" {
4930 anyhow::bail!("cannot execute non-approved action (status={})", pa.status);
4931 }
4932 match pa.action_type.as_str() {
4933 "store" => {
4934 let mut mem: Memory = serde_json::from_value(pa.payload.clone())
4935 .map_err(|e| anyhow::anyhow!("invalid store payload: {e}"))?;
4936 mem.id = uuid::Uuid::new_v4().to_string();
4938 let now = Utc::now().to_rfc3339();
4939 mem.created_at.clone_from(&now);
4940 mem.updated_at = now;
4941 mem.access_count = 0;
4942 let actual_id = insert(conn, &mem)?;
4943 Ok(Some(actual_id))
4944 }
4945 "delete" => {
4946 if let Some(mid) = pa.memory_id.clone() {
4947 delete(conn, &mid)?;
4948 Ok(Some(mid))
4949 } else {
4950 Ok(None)
4951 }
4952 }
4953 "promote" => {
4954 if let Some(mid) = pa.memory_id.clone() {
4955 if let Some(to_ns) = pa.payload.get("to_namespace").and_then(|v| v.as_str()) {
4956 let clone_id = promote_to_namespace(conn, &mid, to_ns)?;
4958 return Ok(Some(clone_id));
4959 }
4960 let (_found, _changed) = update(
4962 conn,
4963 &mid,
4964 None,
4965 None,
4966 Some(&Tier::Long),
4967 None,
4968 None,
4969 None,
4970 None,
4971 Some(""),
4972 None,
4973 )?;
4974 Ok(Some(mid))
4975 } else {
4976 Ok(None)
4977 }
4978 }
4979 other => anyhow::bail!("unknown action_type: {other}"),
4980 }
4981}
4982
4983pub fn is_namespace_standard(conn: &Connection, id: &str) -> bool {
4985 conn.query_row(
4986 "SELECT COUNT(*) FROM namespace_meta WHERE standard_id = ?1",
4987 params![id],
4988 |r| r.get::<_, i64>(0),
4989 )
4990 .unwrap_or(0)
4991 > 0
4992}
4993
4994pub fn count_active_governance_rules(conn: &Connection) -> Result<usize> {
5000 let count: i64 = conn
5001 .query_row(
5002 "SELECT COUNT(*) FROM memories m
5003 INNER JOIN namespace_meta nm ON nm.standard_id = m.id
5004 WHERE json_extract(m.metadata, '$.governance') IS NOT NULL",
5005 [],
5006 |r| r.get(0),
5007 )
5008 .unwrap_or(0);
5009 Ok(usize::try_from(count.max(0)).unwrap_or(0))
5010}
5011
5012pub fn count_subscriptions(conn: &Connection) -> Result<usize> {
5016 let count: i64 = conn
5017 .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
5018 .unwrap_or(0);
5019 Ok(usize::try_from(count.max(0)).unwrap_or(0))
5020}
5021
5022pub fn count_pending_actions_by_status(conn: &Connection, status: &str) -> Result<usize> {
5026 let count: i64 = conn
5027 .query_row(
5028 "SELECT COUNT(*) FROM pending_actions WHERE status = ?1",
5029 params![status],
5030 |r| r.get(0),
5031 )
5032 .unwrap_or(0);
5033 Ok(usize::try_from(count.max(0)).unwrap_or(0))
5034}
5035
5036pub fn doctor_dim_violations(conn: &Connection) -> Result<Option<usize>> {
5068 let has_dim = conn
5069 .prepare("SELECT embedding_dim FROM memories LIMIT 0")
5070 .is_ok();
5071 if !has_dim {
5072 return Ok(None);
5073 }
5074 let n: i64 = conn
5078 .query_row(
5079 "WITH per_ns_modes AS (
5080 SELECT namespace, embedding_dim, COUNT(*) AS c
5081 FROM memories
5082 WHERE embedding IS NOT NULL AND embedding_dim IS NOT NULL
5083 GROUP BY namespace, embedding_dim
5084 ),
5085 ranked AS (
5086 SELECT namespace, embedding_dim,
5087 ROW_NUMBER() OVER (PARTITION BY namespace ORDER BY c DESC) AS rn
5088 FROM per_ns_modes
5089 ),
5090 modes AS (
5091 SELECT namespace, embedding_dim AS modal_dim
5092 FROM ranked WHERE rn = 1
5093 )
5094 SELECT COUNT(*)
5095 FROM memories m
5096 LEFT JOIN modes mo ON mo.namespace = m.namespace
5097 WHERE m.embedding IS NOT NULL
5098 AND (m.embedding_dim IS NULL
5099 OR (mo.modal_dim IS NOT NULL AND m.embedding_dim != mo.modal_dim))",
5100 [],
5101 |r| r.get(0),
5102 )
5103 .unwrap_or(0);
5104 Ok(Some(usize::try_from(n.max(0)).unwrap_or(0)))
5105}
5106
5107pub fn doctor_oldest_pending_age_secs(conn: &Connection) -> Result<Option<i64>> {
5115 let row: Option<String> = conn
5116 .query_row(
5117 "SELECT requested_at FROM pending_actions WHERE status = 'pending'
5118 ORDER BY requested_at ASC LIMIT 1",
5119 [],
5120 |r| r.get(0),
5121 )
5122 .ok();
5123 let Some(ts) = row else {
5124 return Ok(None);
5125 };
5126 let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&ts) else {
5127 return Ok(None);
5128 };
5129 let age = (Utc::now() - parsed.with_timezone(&Utc)).num_seconds();
5130 Ok(Some(age))
5131}
5132
5133pub fn doctor_governance_coverage(conn: &Connection) -> Result<(usize, usize)> {
5141 let with_policy: i64 = conn
5142 .query_row(
5143 "SELECT COUNT(*) FROM memories m
5144 INNER JOIN namespace_meta nm ON nm.standard_id = m.id
5145 WHERE json_extract(m.metadata, '$.governance') IS NOT NULL",
5146 [],
5147 |r| r.get(0),
5148 )
5149 .unwrap_or(0);
5150 let total_meta: i64 = conn
5151 .query_row("SELECT COUNT(*) FROM namespace_meta", [], |r| r.get(0))
5152 .unwrap_or(0);
5153 let with = usize::try_from(with_policy.max(0)).unwrap_or(0);
5154 let total = usize::try_from(total_meta.max(0)).unwrap_or(0);
5155 Ok((with, total.saturating_sub(with)))
5156}
5157
5158pub fn doctor_governance_depth_distribution(conn: &Connection) -> Result<Vec<usize>> {
5170 const MAX_DEPTH: usize = 16;
5171 let mut stmt = conn.prepare("SELECT namespace, parent_namespace FROM namespace_meta")?;
5172 let rows = stmt.query_map([], |r| {
5173 Ok((r.get::<_, String>(0)?, r.get::<_, Option<String>>(1)?))
5174 })?;
5175 let parent_map: HashMap<String, Option<String>> = rows
5176 .filter_map(rusqlite::Result::ok)
5177 .collect::<HashMap<_, _>>();
5178 let mut hist = vec![0_usize; MAX_DEPTH + 1];
5179 for ns in parent_map.keys() {
5180 let mut depth = 0_usize;
5181 let mut cur = parent_map.get(ns).cloned().flatten();
5182 while let Some(p) = cur {
5183 depth += 1;
5184 if depth >= MAX_DEPTH {
5185 break;
5186 }
5187 cur = parent_map.get(&p).cloned().flatten();
5188 }
5189 let bucket = depth.min(MAX_DEPTH);
5190 hist[bucket] += 1;
5191 }
5192 Ok(hist)
5193}
5194
5195pub fn doctor_webhook_delivery_totals(conn: &Connection) -> Result<(u64, u64)> {
5203 let dispatched: i64 = conn
5204 .query_row(
5205 "SELECT COALESCE(SUM(dispatch_count), 0) FROM subscriptions",
5206 [],
5207 |r| r.get(0),
5208 )
5209 .unwrap_or(0);
5210 let failed: i64 = conn
5211 .query_row(
5212 "SELECT COALESCE(SUM(failure_count), 0) FROM subscriptions",
5213 [],
5214 |r| r.get(0),
5215 )
5216 .unwrap_or(0);
5217 Ok((
5218 u64::try_from(dispatched.max(0)).unwrap_or(0),
5219 u64::try_from(failed.max(0)).unwrap_or(0),
5220 ))
5221}
5222
5223#[derive(Debug, Clone)]
5238pub struct CapabilityExpansionRow {
5239 pub id: String,
5240 pub agent_id: Option<String>,
5241 pub event_type: String,
5242 pub requested_family: Option<String>,
5243 pub granted: bool,
5244 pub attestation_tier: Option<String>,
5245 pub timestamp: String,
5246}
5247
5248pub fn record_capability_expansion(
5260 conn: &Connection,
5261 agent_id: Option<&str>,
5262 family: &str,
5263 granted: bool,
5264 attestation_tier: Option<&str>,
5265) {
5266 let id = uuid::Uuid::new_v4().to_string();
5267 let now = Utc::now().to_rfc3339();
5268 let result = conn.execute(
5269 "INSERT INTO audit_log (id, agent_id, event_type, requested_family, \
5270 granted, attestation_tier, timestamp) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
5271 rusqlite::params![
5272 id,
5273 agent_id,
5274 "capability_expansion",
5275 family,
5276 i32::from(granted),
5277 attestation_tier,
5278 now,
5279 ],
5280 );
5281 if let Err(e) = result {
5282 tracing::warn!(
5283 "audit_log insert failed (capability_expansion / agent={:?} / family={}): {e}",
5284 agent_id,
5285 family,
5286 );
5287 }
5288}
5289
5290pub fn list_capability_expansions(
5293 conn: &Connection,
5294 limit: usize,
5295 agent_filter: Option<&str>,
5296) -> Result<Vec<CapabilityExpansionRow>> {
5297 let n = (limit.min(10_000)) as i64;
5298 let map_row = |r: &rusqlite::Row<'_>| -> rusqlite::Result<CapabilityExpansionRow> {
5299 Ok(CapabilityExpansionRow {
5300 id: r.get(0)?,
5301 agent_id: r.get(1)?,
5302 event_type: r.get(2)?,
5303 requested_family: r.get(3)?,
5304 granted: r.get::<_, i64>(4)? != 0,
5305 attestation_tier: r.get(5)?,
5306 timestamp: r.get(6)?,
5307 })
5308 };
5309 if let Some(a) = agent_filter {
5310 let mut stmt = conn.prepare(
5311 "SELECT id, agent_id, event_type, requested_family, granted, \
5312 attestation_tier, timestamp FROM audit_log \
5313 WHERE event_type = 'capability_expansion' AND agent_id = ?1 \
5314 ORDER BY timestamp DESC LIMIT ?2",
5315 )?;
5316 let rows = stmt.query_map(rusqlite::params![a, n], map_row)?;
5317 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
5318 } else {
5319 let mut stmt = conn.prepare(
5320 "SELECT id, agent_id, event_type, requested_family, granted, \
5321 attestation_tier, timestamp FROM audit_log \
5322 WHERE event_type = 'capability_expansion' \
5323 ORDER BY timestamp DESC LIMIT ?1",
5324 )?;
5325 let rows = stmt.query_map(rusqlite::params![n], map_row)?;
5326 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
5327 }
5328}
5329
5330pub fn doctor_max_sync_skew_secs(conn: &Connection) -> Result<Option<i64>> {
5331 let mut stmt = match conn.prepare(
5332 "SELECT last_seen_at, last_pulled_at FROM sync_state WHERE last_pulled_at IS NOT NULL",
5333 ) {
5334 Ok(s) => s,
5335 Err(_) => return Ok(None),
5336 };
5337 let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)))?;
5338 let mut max_skew: Option<i64> = None;
5339 for row in rows {
5340 let Ok((seen, pulled)) = row else { continue };
5341 let Ok(s) = chrono::DateTime::parse_from_rfc3339(&seen) else {
5342 continue;
5343 };
5344 let Ok(p) = chrono::DateTime::parse_from_rfc3339(&pulled) else {
5345 continue;
5346 };
5347 let skew = (s.with_timezone(&Utc) - p.with_timezone(&Utc))
5348 .num_seconds()
5349 .abs();
5350 max_skew = Some(max_skew.map_or(skew, |m| m.max(skew)));
5351 }
5352 Ok(max_skew)
5353}
5354
5355#[cfg(test)]
5356mod tests {
5357 use super::*;
5358 use crate::models::{MID_TTL_EXTEND_SECS, Memory, SHORT_TTL_EXTEND_SECS, Tier};
5359
5360 fn test_db() -> Connection {
5361 open(std::path::Path::new(":memory:")).unwrap()
5362 }
5363
5364 fn make_memory(title: &str, ns: &str, tier: Tier, priority: i32) -> Memory {
5365 let now = chrono::Utc::now().to_rfc3339();
5366 Memory {
5367 id: uuid::Uuid::new_v4().to_string(),
5368 tier: tier.clone(),
5369 namespace: ns.to_string(),
5370 title: title.to_string(),
5371 content: format!("Content for {title}"),
5372 tags: vec![],
5373 priority,
5374 confidence: 1.0,
5375 source: "test".to_string(),
5376 access_count: 0,
5377 created_at: now.clone(),
5378 updated_at: now,
5379 last_accessed_at: None,
5380 expires_at: tier
5381 .default_ttl_secs()
5382 .map(|s| (chrono::Utc::now() + chrono::Duration::seconds(s)).to_rfc3339()),
5383 metadata: serde_json::json!({}),
5384 }
5385 }
5386
5387 #[test]
5388 fn open_creates_schema() {
5389 let conn = test_db();
5390 let count: i64 = conn
5391 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
5392 .unwrap();
5393 assert_eq!(count, 0);
5394 }
5395
5396 #[test]
5397 fn insert_and_get() {
5398 let conn = test_db();
5399 let mem = make_memory("Test insert", "test", Tier::Long, 5);
5400 let id = insert(&conn, &mem).unwrap();
5401 let got = get(&conn, &id).unwrap().unwrap();
5402 assert_eq!(got.title, "Test insert");
5403 assert_eq!(got.namespace, "test");
5404 assert_eq!(got.priority, 5);
5405 }
5406
5407 #[test]
5408 fn get_nonexistent() {
5409 let conn = test_db();
5410 let got = get(&conn, "nonexistent-id").unwrap();
5411 assert!(got.is_none());
5412 }
5413
5414 #[test]
5415 fn update_partial_fields() {
5416 let conn = test_db();
5417 let mem = make_memory("Original", "test", Tier::Mid, 5);
5418 let id = insert(&conn, &mem).unwrap();
5419
5420 let (found, content_changed) = update(
5421 &conn,
5422 &id,
5423 Some("Updated Title"),
5424 None,
5425 None,
5426 None,
5427 None,
5428 Some(9),
5429 None,
5430 None,
5431 None,
5432 )
5433 .unwrap();
5434 assert!(found);
5435 assert!(content_changed); let got = get(&conn, &id).unwrap().unwrap();
5438 assert_eq!(got.title, "Updated Title");
5439 assert_eq!(got.priority, 9);
5440 assert_eq!(got.content, mem.content); }
5442
5443 #[test]
5444 fn update_content_changed_flag() {
5445 let conn = test_db();
5446 let mem = make_memory("Stable", "test", Tier::Mid, 5);
5447 let id = insert(&conn, &mem).unwrap();
5448
5449 let (found, content_changed) = update(
5451 &conn,
5452 &id,
5453 None,
5454 None,
5455 None,
5456 None,
5457 None,
5458 Some(8),
5459 None,
5460 None,
5461 None,
5462 )
5463 .unwrap();
5464 assert!(found);
5465 assert!(!content_changed);
5466
5467 let (found, content_changed) = update(
5469 &conn,
5470 &id,
5471 None,
5472 Some("New content"),
5473 None,
5474 None,
5475 None,
5476 None,
5477 None,
5478 None,
5479 None,
5480 )
5481 .unwrap();
5482 assert!(found);
5483 assert!(content_changed);
5484 }
5485
5486 #[test]
5487 fn update_nonexistent_returns_false() {
5488 let conn = test_db();
5489 let (found, _) = update(
5490 &conn,
5491 "bad-id",
5492 Some("New"),
5493 None,
5494 None,
5495 None,
5496 None,
5497 None,
5498 None,
5499 None,
5500 None,
5501 )
5502 .unwrap();
5503 assert!(!found);
5504 }
5505
5506 #[test]
5507 fn update_tier_downgrade_protection() {
5508 let conn = test_db();
5509 let mem = make_memory("Permanent", "test", Tier::Long, 9);
5511 let id = insert(&conn, &mem).unwrap();
5512
5513 let (found, _) = update(
5514 &conn,
5515 &id,
5516 None,
5517 None,
5518 Some(&Tier::Short),
5519 None,
5520 None,
5521 None,
5522 None,
5523 None,
5524 None,
5525 )
5526 .unwrap();
5527 assert!(found);
5528 let got = get(&conn, &id).unwrap().unwrap();
5529 assert_eq!(got.tier, Tier::Long); let mem2 = make_memory("Working", "test", Tier::Mid, 5);
5533 let id2 = insert(&conn, &mem2).unwrap();
5534
5535 let (found, _) = update(
5536 &conn,
5537 &id2,
5538 None,
5539 None,
5540 Some(&Tier::Short),
5541 None,
5542 None,
5543 None,
5544 None,
5545 None,
5546 None,
5547 )
5548 .unwrap();
5549 assert!(found);
5550 let got2 = get(&conn, &id2).unwrap().unwrap();
5551 assert_eq!(got2.tier, Tier::Mid); let (found, _) = update(
5555 &conn,
5556 &id2,
5557 None,
5558 None,
5559 Some(&Tier::Long),
5560 None,
5561 None,
5562 None,
5563 None,
5564 None,
5565 None,
5566 )
5567 .unwrap();
5568 assert!(found);
5569 let got3 = get(&conn, &id2).unwrap().unwrap();
5570 assert_eq!(got3.tier, Tier::Long); }
5572
5573 #[test]
5574 fn update_title_collision_returns_error() {
5575 let conn = test_db();
5576 let mem_a = make_memory("Alpha", "test", Tier::Mid, 5);
5577 let mem_b = make_memory("Beta", "test", Tier::Mid, 5);
5578 let id_a = insert(&conn, &mem_a).unwrap();
5579 let _id_b = insert(&conn, &mem_b).unwrap();
5580
5581 let result = update(
5583 &conn,
5584 &id_a,
5585 Some("Beta"),
5586 None,
5587 None,
5588 None,
5589 None,
5590 None,
5591 None,
5592 None,
5593 None,
5594 );
5595 assert!(result.is_err());
5596 let err = result.unwrap_err().to_string();
5597 assert!(err.contains("already exists in namespace"));
5598 }
5599
5600 #[test]
5601 fn delete_existing() {
5602 let conn = test_db();
5603 let mem = make_memory("To delete", "test", Tier::Short, 3);
5604 let id = insert(&conn, &mem).unwrap();
5605 assert!(delete(&conn, &id).unwrap());
5606 assert!(get(&conn, &id).unwrap().is_none());
5607 }
5608
5609 #[test]
5610 fn delete_nonexistent() {
5611 let conn = test_db();
5612 assert!(!delete(&conn, "bad-id").unwrap());
5613 }
5614
5615 #[test]
5616 fn list_with_namespace_filter() {
5617 let conn = test_db();
5618 insert(&conn, &make_memory("A", "ns1", Tier::Long, 5)).unwrap();
5619 insert(&conn, &make_memory("B", "ns2", Tier::Long, 5)).unwrap();
5620 insert(&conn, &make_memory("C", "ns1", Tier::Long, 5)).unwrap();
5621
5622 let results = list(
5623 &conn,
5624 Some("ns1"),
5625 None,
5626 100,
5627 0,
5628 None,
5629 None,
5630 None,
5631 None,
5632 None,
5633 )
5634 .unwrap();
5635 assert_eq!(results.len(), 2);
5636 }
5637
5638 #[test]
5639 fn list_with_tier_filter() {
5640 let conn = test_db();
5641 insert(&conn, &make_memory("Long", "test", Tier::Long, 5)).unwrap();
5642 insert(&conn, &make_memory("Mid", "test", Tier::Mid, 5)).unwrap();
5643
5644 let results = list(
5645 &conn,
5646 None,
5647 Some(&Tier::Long),
5648 100,
5649 0,
5650 None,
5651 None,
5652 None,
5653 None,
5654 None,
5655 )
5656 .unwrap();
5657 assert_eq!(results.len(), 1);
5658 assert_eq!(results[0].title, "Long");
5659 }
5660
5661 #[test]
5662 fn list_with_limit() {
5663 let conn = test_db();
5664 for i in 0..5 {
5665 insert(
5666 &conn,
5667 &make_memory(&format!("Mem {i}"), "test", Tier::Long, 5),
5668 )
5669 .unwrap();
5670 }
5671 let results = list(&conn, None, None, 3, 0, None, None, None, None, None).unwrap();
5672 assert_eq!(results.len(), 3);
5673 }
5674
5675 #[test]
5676 fn search_keyword_match() {
5677 let conn = test_db();
5678 insert(
5679 &conn,
5680 &make_memory("PostgreSQL config", "test", Tier::Long, 5),
5681 )
5682 .unwrap();
5683 insert(&conn, &make_memory("Redis cache", "test", Tier::Long, 5)).unwrap();
5684
5685 let results = search(
5686 &conn,
5687 "PostgreSQL",
5688 None,
5689 None,
5690 10,
5691 None,
5692 None,
5693 None,
5694 None,
5695 None,
5696 None,
5697 )
5698 .unwrap();
5699 assert_eq!(results.len(), 1);
5700 assert!(results[0].title.contains("PostgreSQL"));
5701 }
5702
5703 #[test]
5704 fn search_no_match() {
5705 let conn = test_db();
5706 insert(&conn, &make_memory("PostgreSQL", "test", Tier::Long, 5)).unwrap();
5707 let results = search(
5708 &conn,
5709 "nonexistent_term_xyz",
5710 None,
5711 None,
5712 10,
5713 None,
5714 None,
5715 None,
5716 None,
5717 None,
5718 None,
5719 )
5720 .unwrap();
5721 assert_eq!(results.len(), 0);
5722 }
5723
5724 #[test]
5725 fn recall_returns_scored() {
5726 let conn = test_db();
5727 insert(
5728 &conn,
5729 &make_memory("Rust programming language", "test", Tier::Long, 8),
5730 )
5731 .unwrap();
5732 insert(
5733 &conn,
5734 &make_memory("Python scripting", "test", Tier::Long, 5),
5735 )
5736 .unwrap();
5737
5738 let (results, _tokens) = recall(
5739 &conn,
5740 "Rust programming",
5741 None,
5742 10,
5743 None,
5744 None,
5745 None,
5746 SHORT_TTL_EXTEND_SECS,
5747 MID_TTL_EXTEND_SECS,
5748 None,
5749 None,
5750 )
5751 .unwrap();
5752 assert!(!results.is_empty());
5753 let (mem, score) = &results[0];
5755 assert!(mem.title.contains("Rust"));
5756 assert!(*score > 0.0);
5757 }
5758
5759 #[test]
5760 fn recall_empty_context() {
5761 let conn = test_db();
5762 insert(&conn, &make_memory("Test", "test", Tier::Long, 5)).unwrap();
5763 let results = recall(
5765 &conn,
5766 "",
5767 None,
5768 10,
5769 None,
5770 None,
5771 None,
5772 SHORT_TTL_EXTEND_SECS,
5773 MID_TTL_EXTEND_SECS,
5774 None,
5775 None,
5776 );
5777 assert!(results.is_ok() || results.is_err());
5779 }
5780
5781 #[test]
5782 fn touch_increments_access_count() {
5783 let conn = test_db();
5784 let mem = make_memory("Touchable", "test", Tier::Mid, 5);
5785 let id = insert(&conn, &mem).unwrap();
5786 assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 0);
5787
5788 touch(&conn, &id, SHORT_TTL_EXTEND_SECS, MID_TTL_EXTEND_SECS).unwrap();
5789 assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 1);
5790
5791 touch(&conn, &id, SHORT_TTL_EXTEND_SECS, MID_TTL_EXTEND_SECS).unwrap();
5792 assert_eq!(get(&conn, &id).unwrap().unwrap().access_count, 2);
5793 }
5794
5795 #[test]
5796 fn find_contradictions_similar_titles() {
5797 let conn = test_db();
5798 insert(
5799 &conn,
5800 &make_memory("Database is PostgreSQL", "infra", Tier::Long, 8),
5801 )
5802 .unwrap();
5803 insert(
5804 &conn,
5805 &make_memory("Database is MySQL", "infra", Tier::Long, 5),
5806 )
5807 .unwrap();
5808
5809 let contradictions = find_contradictions(&conn, "Database is PostgreSQL", "infra").unwrap();
5810 assert!(!contradictions.is_empty());
5811 }
5812
5813 #[test]
5814 fn create_and_get_links() {
5815 let conn = test_db();
5816 let id1 = insert(&conn, &make_memory("Memory A", "test", Tier::Long, 5)).unwrap();
5817 let id2 = insert(&conn, &make_memory("Memory B", "test", Tier::Long, 5)).unwrap();
5818
5819 create_link(&conn, &id1, &id2, "related_to").unwrap();
5820 let links = get_links(&conn, &id1).unwrap();
5821 assert_eq!(links.len(), 1);
5822 assert_eq!(links[0].relation, "related_to");
5823 }
5824
5825 #[test]
5826 fn consolidate_merges_memories() {
5827 let conn = test_db();
5828 let id1 = insert(&conn, &make_memory("Part 1", "test", Tier::Mid, 5)).unwrap();
5829 let id2 = insert(&conn, &make_memory("Part 2", "test", Tier::Mid, 5)).unwrap();
5830
5831 let new_id = consolidate(
5832 &conn,
5833 &[id1.clone(), id2.clone()],
5834 "Combined",
5835 "Part 1 + Part 2",
5836 "test",
5837 &Tier::Long,
5838 "test",
5839 "test-consolidator",
5840 )
5841 .unwrap();
5842 assert!(get(&conn, &id1).unwrap().is_none());
5844 assert!(get(&conn, &id2).unwrap().is_none());
5845 let combined = get(&conn, &new_id).unwrap().unwrap();
5847 assert_eq!(combined.title, "Combined");
5848 assert_eq!(combined.tier, Tier::Long);
5849 }
5850
5851 #[test]
5852 fn stats_counts() {
5853 let conn = test_db();
5854 let path = std::path::Path::new(":memory:");
5855 insert(&conn, &make_memory("A", "ns1", Tier::Long, 5)).unwrap();
5856 insert(&conn, &make_memory("B", "ns1", Tier::Mid, 5)).unwrap();
5857 insert(&conn, &make_memory("C", "ns2", Tier::Short, 5)).unwrap();
5858
5859 let s = stats(&conn, path).unwrap();
5860 assert_eq!(s.total, 3);
5861 }
5862
5863 #[test]
5864 fn gc_removes_expired() {
5865 let conn = test_db();
5866 let mut mem = make_memory("Expired", "test", Tier::Short, 5);
5867 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string()); insert(&conn, &mem).unwrap();
5869
5870 let removed = gc(&conn, false).unwrap();
5871 assert_eq!(removed, 1);
5872 }
5873
5874 #[test]
5875 fn gc_preserves_long_term() {
5876 let conn = test_db();
5877 insert(&conn, &make_memory("Permanent", "test", Tier::Long, 5)).unwrap();
5878 let removed = gc(&conn, false).unwrap();
5879 assert_eq!(removed, 0);
5880 }
5881
5882 #[test]
5883 fn gc_archives_before_delete() {
5884 let conn = test_db();
5885 let mut mem = make_memory("Archivable", "test", Tier::Short, 5);
5886 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5887 insert(&conn, &mem).unwrap();
5888
5889 let removed = gc(&conn, true).unwrap();
5890 assert_eq!(removed, 1);
5891
5892 let archived = list_archived(&conn, None, 10, 0).unwrap();
5894 assert_eq!(archived.len(), 1);
5895 assert_eq!(archived[0]["title"], "Archivable");
5896 assert_eq!(archived[0]["archive_reason"], "ttl_expired");
5897 }
5898
5899 #[test]
5900 fn restore_archived_memory() {
5901 let conn = test_db();
5906 let mut mem = make_memory("Restorable", "test", Tier::Short, 5);
5907 let original_expiry = "2020-01-01T00:00:00+00:00".to_string();
5908 mem.expires_at = Some(original_expiry.clone());
5909 let id = insert(&conn, &mem).unwrap();
5910
5911 gc(&conn, true).unwrap();
5912 assert!(get(&conn, &id).unwrap().is_none()); let restored = restore_archived(&conn, &id).unwrap();
5915 assert!(restored);
5916
5917 let got = get(&conn, &id).unwrap().unwrap();
5918 assert_eq!(got.title, "Restorable");
5919 assert_eq!(
5920 got.tier.as_str(),
5921 "short",
5922 "G5: restore must preserve the original tier"
5923 );
5924 assert_eq!(
5925 got.expires_at,
5926 Some(original_expiry),
5927 "G5: restore must preserve the original expires_at"
5928 );
5929 }
5930
5931 #[test]
5932 fn purge_archive_removes_all() {
5933 let conn = test_db();
5934 let mut mem = make_memory("Purgeable", "test", Tier::Short, 5);
5935 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5936 insert(&conn, &mem).unwrap();
5937 gc(&conn, true).unwrap();
5938
5939 let purged = purge_archive(&conn, None).unwrap();
5940 assert_eq!(purged, 1);
5941 assert_eq!(list_archived(&conn, None, 10, 0).unwrap().len(), 0);
5942 }
5943
5944 #[test]
5945 fn purge_archive_rejects_negative_days() {
5946 let conn = test_db();
5947 let result = purge_archive(&conn, Some(-1));
5948 assert!(result.is_err());
5949 assert!(result.unwrap_err().to_string().contains("non-negative"));
5950 }
5951
5952 #[test]
5953 fn restore_rejects_active_id_collision() {
5954 let conn = test_db();
5955 let mut mem = make_memory("Collision Test", "test", Tier::Short, 5);
5956 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5957 let id = insert(&conn, &mem).unwrap();
5958
5959 gc(&conn, true).unwrap();
5961 assert!(get(&conn, &id).unwrap().is_none());
5962
5963 conn.execute(
5965 "INSERT INTO memories (id, tier, namespace, title, content, tags, priority, confidence, source, access_count, created_at, updated_at)
5966 VALUES (?1, 'long', 'test', 'Blocker Title', 'blocks restore', '[]', 5, 1.0, 'test', 0, datetime('now'), datetime('now'))",
5967 rusqlite::params![id],
5968 ).unwrap();
5969
5970 let result = restore_archived(&conn, &id);
5972 assert!(result.is_err());
5973 assert!(
5974 result
5975 .unwrap_err()
5976 .to_string()
5977 .contains("already exists in active table")
5978 );
5979 }
5980
5981 #[test]
5982 fn archive_stats_counts() {
5983 let conn = test_db();
5984 let mut m1 = make_memory("Stats A", "ns1", Tier::Short, 5);
5985 m1.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5986 let mut m2 = make_memory("Stats B", "ns1", Tier::Short, 5);
5987 m2.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
5988 insert(&conn, &m1).unwrap();
5989 insert(&conn, &m2).unwrap();
5990 gc(&conn, true).unwrap();
5991
5992 let stats = archive_stats(&conn).unwrap();
5993 assert_eq!(stats["archived_total"], 2);
5994 }
5995
5996 #[test]
5997 fn archive_memory_moves_live_row_to_archive() {
5998 let conn = test_db();
6003 let mem = make_memory("Archive me", "s29", Tier::Long, 5);
6004 let id = insert(&conn, &mem).unwrap();
6005
6006 let moved = archive_memory(&conn, &id, Some("explicit")).unwrap();
6007 assert!(moved, "live row must be archived on first call");
6008 assert!(
6009 get(&conn, &id).unwrap().is_none(),
6010 "row must be removed from active table"
6011 );
6012
6013 let archived = list_archived(&conn, None, 10, 0).unwrap();
6014 assert_eq!(archived.len(), 1);
6015 assert_eq!(archived[0]["id"], id);
6016 assert_eq!(archived[0]["archive_reason"], "explicit");
6017
6018 let second = archive_memory(&conn, &id, Some("explicit")).unwrap();
6020 assert!(
6021 !second,
6022 "second archive call must report no-op (no live row)"
6023 );
6024 }
6025
6026 #[test]
6027 fn archive_memory_missing_id_returns_false() {
6028 let conn = test_db();
6031 let moved = archive_memory(&conn, "nonexistent-id", None).unwrap();
6032 assert!(!moved);
6033 }
6034
6035 #[test]
6036 fn archive_memory_default_reason_is_archive() {
6037 let conn = test_db();
6038 let mem = make_memory("Default reason", "s29", Tier::Long, 5);
6039 let id = insert(&conn, &mem).unwrap();
6040 assert!(archive_memory(&conn, &id, None).unwrap());
6041 let archived = list_archived(&conn, None, 10, 0).unwrap();
6042 assert_eq!(archived[0]["archive_reason"], "archive");
6043 }
6044
6045 #[test]
6046 fn export_all_and_links() {
6047 let conn = test_db();
6048 let id1 = insert(&conn, &make_memory("Export A", "test", Tier::Long, 5)).unwrap();
6049 let id2 = insert(&conn, &make_memory("Export B", "test", Tier::Long, 5)).unwrap();
6050 create_link(&conn, &id1, &id2, "supersedes").unwrap();
6051
6052 let mems = export_all(&conn).unwrap();
6053 assert_eq!(mems.len(), 2);
6054 let links = export_links(&conn).unwrap();
6055 assert_eq!(links.len(), 1);
6056 }
6057
6058 #[test]
6059 fn list_namespaces_counts() {
6060 let conn = test_db();
6061 insert(&conn, &make_memory("A", "alpha", Tier::Long, 5)).unwrap();
6062 insert(&conn, &make_memory("B", "alpha", Tier::Long, 5)).unwrap();
6063 insert(&conn, &make_memory("C", "beta", Tier::Long, 5)).unwrap();
6064
6065 let ns = list_namespaces(&conn).unwrap();
6066 assert_eq!(ns.len(), 2);
6067 }
6068
6069 #[test]
6070 fn taxonomy_flat_namespaces_only() {
6071 let conn = test_db();
6073 insert(&conn, &make_memory("A", "alpha", Tier::Long, 5)).unwrap();
6074 insert(&conn, &make_memory("B", "alpha", Tier::Long, 5)).unwrap();
6075 insert(&conn, &make_memory("C", "beta", Tier::Long, 5)).unwrap();
6076
6077 let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
6078 assert_eq!(tax.total_count, 3);
6079 assert!(!tax.truncated);
6080 assert_eq!(tax.tree.namespace, "");
6081 assert_eq!(tax.tree.subtree_count, 3);
6082 assert_eq!(tax.tree.count, 0); assert_eq!(tax.tree.children.len(), 2);
6084 let alpha = tax
6085 .tree
6086 .children
6087 .iter()
6088 .find(|c| c.name == "alpha")
6089 .unwrap();
6090 assert_eq!(alpha.count, 2);
6091 assert_eq!(alpha.subtree_count, 2);
6092 assert!(alpha.children.is_empty());
6093 let beta = tax.tree.children.iter().find(|c| c.name == "beta").unwrap();
6094 assert_eq!(beta.count, 1);
6095 }
6096
6097 #[test]
6098 fn taxonomy_hierarchical_tree() {
6099 let conn = test_db();
6101 insert(&conn, &make_memory("a", "alphaone", Tier::Long, 5)).unwrap();
6102 insert(&conn, &make_memory("b", "alphaone/eng", Tier::Long, 5)).unwrap();
6103 insert(
6104 &conn,
6105 &make_memory("c", "alphaone/eng/platform", Tier::Long, 5),
6106 )
6107 .unwrap();
6108 insert(
6109 &conn,
6110 &make_memory("d", "alphaone/eng/platform", Tier::Long, 5),
6111 )
6112 .unwrap();
6113 insert(&conn, &make_memory("e", "alphaone/sales", Tier::Long, 5)).unwrap();
6114
6115 let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
6116 assert_eq!(tax.total_count, 5);
6117 assert_eq!(tax.tree.subtree_count, 5);
6118 assert_eq!(tax.tree.children.len(), 1);
6119
6120 let alphaone = &tax.tree.children[0];
6121 assert_eq!(alphaone.name, "alphaone");
6122 assert_eq!(alphaone.namespace, "alphaone");
6123 assert_eq!(alphaone.count, 1); assert_eq!(alphaone.subtree_count, 5);
6125 assert_eq!(alphaone.children.len(), 2);
6126
6127 let eng = alphaone.children.iter().find(|c| c.name == "eng").unwrap();
6128 assert_eq!(eng.namespace, "alphaone/eng");
6129 assert_eq!(eng.count, 1);
6130 assert_eq!(eng.subtree_count, 3);
6131 let platform = &eng.children[0];
6132 assert_eq!(platform.name, "platform");
6133 assert_eq!(platform.namespace, "alphaone/eng/platform");
6134 assert_eq!(platform.count, 2);
6135 assert_eq!(platform.subtree_count, 2);
6136 assert!(platform.children.is_empty());
6137 }
6138
6139 #[test]
6140 fn taxonomy_prefix_scopes_subtree() {
6141 let conn = test_db();
6142 insert(&conn, &make_memory("a", "alphaone/eng", Tier::Long, 5)).unwrap();
6143 insert(
6144 &conn,
6145 &make_memory("b", "alphaone/eng/platform", Tier::Long, 5),
6146 )
6147 .unwrap();
6148 insert(&conn, &make_memory("c", "alphaone/sales", Tier::Long, 5)).unwrap();
6149 insert(&conn, &make_memory("d", "alphaone-sibling", Tier::Long, 5)).unwrap();
6151 insert(&conn, &make_memory("e", "other", Tier::Long, 5)).unwrap();
6152
6153 let tax = get_taxonomy(&conn, Some("alphaone/eng"), 8, 1000).unwrap();
6154 assert_eq!(tax.total_count, 2);
6155 assert_eq!(tax.tree.namespace, "alphaone/eng");
6156 assert_eq!(tax.tree.name, "eng");
6157 assert_eq!(tax.tree.count, 1);
6158 assert_eq!(tax.tree.subtree_count, 2);
6159 assert_eq!(tax.tree.children.len(), 1);
6160 assert_eq!(tax.tree.children[0].name, "platform");
6161 assert_eq!(tax.tree.children[0].count, 1);
6162 }
6163
6164 #[test]
6165 fn taxonomy_depth_clamps_but_preserves_subtree_counts() {
6166 let conn = test_db();
6167 insert(
6168 &conn,
6169 &make_memory("a", "alphaone/eng/platform/db", Tier::Long, 5),
6170 )
6171 .unwrap();
6172 insert(
6173 &conn,
6174 &make_memory("b", "alphaone/eng/platform/api", Tier::Long, 5),
6175 )
6176 .unwrap();
6177
6178 let tax = get_taxonomy(&conn, None, 2, 1000).unwrap();
6179 assert_eq!(tax.total_count, 2);
6180 let alphaone = &tax.tree.children[0];
6181 let eng = &alphaone.children[0];
6182 assert!(eng.children.is_empty());
6186 assert_eq!(eng.subtree_count, 2);
6187 assert_eq!(eng.count, 0); }
6189
6190 #[test]
6191 fn taxonomy_excludes_expired_memories() {
6192 let conn = test_db();
6195 let mut alive = make_memory("alive", "alpha", Tier::Long, 5);
6196 let mut dead = make_memory("dead", "alpha", Tier::Short, 5);
6197 dead.expires_at = Some("2000-01-01T00:00:00Z".to_string());
6199 alive.expires_at = None;
6200 insert(&conn, &alive).unwrap();
6201 insert(&conn, &dead).unwrap();
6202
6203 let tax = get_taxonomy(&conn, None, 8, 1000).unwrap();
6204 assert_eq!(tax.total_count, 1);
6205 assert_eq!(tax.tree.children.len(), 1);
6206 assert_eq!(tax.tree.children[0].count, 1);
6207 }
6208
6209 #[test]
6210 fn taxonomy_truncates_at_limit_but_total_stays_honest() {
6211 let conn = test_db();
6212 for ns in ["aa", "bb", "cc", "dd", "ee"] {
6213 insert(&conn, &make_memory("m", ns, Tier::Long, 5)).unwrap();
6214 }
6215 let tax = get_taxonomy(&conn, None, 8, 2).unwrap();
6216 assert_eq!(tax.total_count, 5);
6219 assert!(tax.truncated);
6220 assert_eq!(tax.tree.children.len(), 2);
6221 }
6222
6223 #[test]
6224 fn forget_by_namespace() {
6225 let conn = test_db();
6226 insert(&conn, &make_memory("A", "delete-me", Tier::Long, 5)).unwrap();
6227 insert(&conn, &make_memory("B", "delete-me", Tier::Long, 5)).unwrap();
6228 insert(&conn, &make_memory("C", "keep", Tier::Long, 5)).unwrap();
6229
6230 let deleted = forget(&conn, Some("delete-me"), None, None, false).unwrap();
6231 assert_eq!(deleted, 2);
6232 let remaining = list(&conn, None, None, 100, 0, None, None, None, None, None).unwrap();
6233 assert_eq!(remaining.len(), 1);
6234 }
6235
6236 #[test]
6237 fn set_and_get_embedding() {
6238 let conn = test_db();
6239 let mem = make_memory("Embed test", "test", Tier::Long, 5);
6240 let id = insert(&conn, &mem).unwrap();
6241
6242 let emb = vec![0.1f32, 0.2, 0.3, 0.4];
6243 set_embedding(&conn, &id, &emb).unwrap();
6244
6245 let got = get_embedding(&conn, &id).unwrap().unwrap();
6246 assert_eq!(got.len(), 4);
6247 assert!((got[0] - 0.1).abs() < 1e-6);
6248 }
6249
6250 fn insert_with_embedding(
6253 conn: &Connection,
6254 title: &str,
6255 ns: &str,
6256 embedding: &[f32],
6257 ) -> String {
6258 let mem = make_memory(title, ns, Tier::Long, 5);
6259 let id = insert(conn, &mem).unwrap();
6260 set_embedding(conn, &id, embedding).unwrap();
6261 id
6262 }
6263
6264 #[test]
6265 fn check_duplicate_empty_db_returns_no_match() {
6266 let conn = test_db();
6267 let q = vec![1.0_f32, 0.0, 0.0];
6268 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6269 assert!(!r.is_duplicate);
6270 assert!(r.nearest.is_none());
6271 assert_eq!(r.candidates_scanned, 0);
6272 }
6273
6274 #[test]
6275 fn check_duplicate_finds_highest_cosine_match() {
6276 let conn = test_db();
6277 let id_a = insert_with_embedding(&conn, "alpha", "ns", &[1.0, 0.0, 0.0]);
6282 let _id_b = insert_with_embedding(&conn, "beta", "ns", &[0.7, 0.7, 0.0]);
6283 let _id_c = insert_with_embedding(&conn, "gamma", "ns", &[0.0, 1.0, 0.0]);
6284
6285 let q = vec![1.0_f32, 0.0, 0.0];
6286 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6287 let nearest = r.nearest.expect("expected a nearest match");
6288 assert_eq!(nearest.id, id_a);
6289 assert!(nearest.similarity > 0.99);
6290 assert_eq!(r.candidates_scanned, 3);
6291 assert!(r.is_duplicate);
6292 assert!((r.threshold - 0.85).abs() < 1e-6);
6293 }
6294
6295 #[test]
6296 fn check_duplicate_below_threshold_not_flagged_but_returns_nearest() {
6297 let conn = test_db();
6298 let id_b = insert_with_embedding(&conn, "beta", "ns", &[0.7, 0.7, 0.0]);
6299
6300 let q = vec![1.0_f32, 0.0, 0.0];
6302 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6303 let nearest = r
6304 .nearest
6305 .expect("nearest must surface even when below threshold");
6306 assert_eq!(nearest.id, id_b);
6307 assert!(!r.is_duplicate);
6308 }
6309
6310 #[test]
6311 fn check_duplicate_threshold_clamped_to_floor() {
6312 let conn = test_db();
6313 let _ = insert_with_embedding(&conn, "x", "ns", &[1.0, 0.0, 0.0]);
6317 let q = vec![0.0_f32, 1.0, 0.0]; let r = check_duplicate(&conn, &q, None, 0.0).unwrap();
6319 assert!((r.threshold - DUPLICATE_THRESHOLD_MIN).abs() < 1e-6);
6320 assert!(!r.is_duplicate);
6321 }
6322
6323 #[test]
6324 fn check_duplicate_namespace_filter_isolates_scan() {
6325 let conn = test_db();
6326 let _hit_in_other_ns = insert_with_embedding(&conn, "x", "other", &[1.0, 0.0, 0.0]);
6327 let id_target = insert_with_embedding(&conn, "y", "ns", &[0.6, 0.8, 0.0]);
6328
6329 let q = vec![1.0_f32, 0.0, 0.0];
6330 let r = check_duplicate(&conn, &q, Some("ns"), 0.85).unwrap();
6331 assert_eq!(r.candidates_scanned, 1);
6332 assert_eq!(r.nearest.expect("namespace filter ignored").id, id_target);
6333 }
6334
6335 #[test]
6336 fn check_duplicate_skips_expired_rows() {
6337 let conn = test_db();
6338 let mut mem = make_memory("expired", "ns", Tier::Short, 5);
6341 mem.expires_at = Some((chrono::Utc::now() - chrono::Duration::seconds(60)).to_rfc3339());
6342 let id = insert(&conn, &mem).unwrap();
6343 set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
6344
6345 let q = vec![1.0_f32, 0.0, 0.0];
6346 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6347 assert_eq!(r.candidates_scanned, 0);
6348 assert!(r.nearest.is_none());
6349 }
6350
6351 #[test]
6352 fn check_duplicate_skips_unembedded_rows() {
6353 let conn = test_db();
6354 let id_embedded = insert_with_embedding(&conn, "with-emb", "ns", &[1.0, 0.0, 0.0]);
6357 let mem = make_memory("no-emb", "ns", Tier::Long, 5);
6358 let _ = insert(&conn, &mem).unwrap();
6359
6360 let q = vec![1.0_f32, 0.0, 0.0];
6361 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6362 assert_eq!(r.candidates_scanned, 1);
6363 assert_eq!(r.nearest.expect("embedded match").id, id_embedded);
6364 }
6365
6366 #[test]
6367 fn check_duplicate_skips_blob_with_non_multiple_of_4_length() {
6368 let conn = test_db();
6374 let mem = make_memory("malformed-blob", "ns", Tier::Long, 5);
6375 let id = insert(&conn, &mem).unwrap();
6376 conn.execute(
6379 "UPDATE memories SET embedding = ?1 WHERE id = ?2",
6380 params![&[0u8; 7][..], &id],
6381 )
6382 .unwrap();
6383
6384 let q = vec![1.0_f32, 0.0];
6385 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6386 assert_eq!(
6387 r.candidates_scanned, 0,
6388 "malformed blob must be skipped, not silently truncated"
6389 );
6390 assert!(r.nearest.is_none());
6391 }
6392
6393 #[test]
6394 fn check_duplicate_skips_blob_with_dimension_mismatch() {
6395 let conn = test_db();
6400 let _id = insert_with_embedding(&conn, "different-dim", "ns", &[1.0, 0.0, 0.0]);
6402
6403 let q = vec![1.0_f32, 0.0, 0.0, 0.0];
6405 let r = check_duplicate(&conn, &q, None, 0.85).unwrap();
6406 assert_eq!(
6407 r.candidates_scanned, 0,
6408 "dimension-mismatched candidate must be skipped"
6409 );
6410 assert!(r.nearest.is_none());
6411 }
6412
6413 #[test]
6414 fn get_unembedded_returns_memoryless() {
6415 let conn = test_db();
6416 let mem = make_memory("No embed", "test", Tier::Long, 5);
6417 insert(&conn, &mem).unwrap();
6418
6419 let unembedded = get_unembedded_ids(&conn).unwrap();
6420 assert_eq!(unembedded.len(), 1);
6421 }
6422
6423 #[test]
6424 fn health_check_passes() {
6425 let conn = test_db();
6426 assert!(health_check(&conn).unwrap());
6427 }
6428
6429 #[test]
6430 fn sanitize_fts_strips_operators_and_quotes() {
6431 let sanitized = sanitize_fts_query("test* \"injection\" (drop)", true);
6433 assert!(!sanitized.contains('*'));
6434 assert!(!sanitized.contains('('));
6435 assert!(!sanitized.contains(')'));
6436 let sanitized2 = sanitize_fts_query("hello AND world OR NOT NEAR test", true);
6438 assert!(sanitized2.contains("hello"));
6439 assert!(sanitized2.contains("world"));
6440 assert!(sanitized2.contains("test"));
6441 let sanitized3 = sanitize_fts_query("", true);
6443 assert_eq!(sanitized3, "\"_empty_\"");
6444 let sanitized4 = sanitize_fts_query("-secret +required", true);
6450 assert!(!sanitized4.contains('+'));
6451 assert!(sanitized4.contains("secret"));
6452 assert!(sanitized4.contains("required"));
6453 let sanitized5 = sanitize_fts_query("well-known", true);
6455 assert!(sanitized5.contains("well-known"));
6456 }
6457
6458 #[test]
6459 fn get_by_prefix_8char() {
6460 let conn = test_db();
6461 let mem = make_memory("Prefix test", "test", Tier::Long, 5);
6462 let id = insert(&conn, &mem).unwrap();
6463 let prefix = &id[..8];
6464 let got = get_by_prefix(&conn, prefix).unwrap().unwrap();
6465 assert_eq!(got.id, id);
6466 assert_eq!(got.title, "Prefix test");
6467 }
6468
6469 #[test]
6470 fn get_by_prefix_full_uuid() {
6471 let conn = test_db();
6472 let mem = make_memory("Full UUID prefix", "test", Tier::Long, 5);
6473 let id = insert(&conn, &mem).unwrap();
6474 let got = get_by_prefix(&conn, &id).unwrap().unwrap();
6476 assert_eq!(got.id, id);
6477 }
6478
6479 #[test]
6480 fn get_by_prefix_nonexistent() {
6481 let conn = test_db();
6482 let got = get_by_prefix(&conn, "ffffffff").unwrap();
6483 assert!(got.is_none());
6484 }
6485
6486 #[test]
6487 fn get_by_prefix_ambiguous() {
6488 let conn = test_db();
6489 let mut mem1 = make_memory("Ambig A", "test", Tier::Long, 5);
6491 mem1.id = "aaaa1111-0000-0000-0000-000000000001".to_string();
6492 insert(&conn, &mem1).unwrap();
6493 let mut mem2 = make_memory("Ambig B", "test2", Tier::Long, 5);
6494 mem2.id = "aaaa2222-0000-0000-0000-000000000002".to_string();
6495 insert(&conn, &mem2).unwrap();
6496 let result = get_by_prefix(&conn, "aaaa");
6497 assert!(result.is_err());
6498 let err_msg = result.unwrap_err().to_string();
6499 assert!(err_msg.contains("ambiguous"));
6500 assert!(err_msg.contains("2 matches"));
6501 assert!(
6503 err_msg.contains("aaaa1111-0000-0000-0000-000000000001"),
6504 "error should list matching IDs, got: {err_msg}"
6505 );
6506 assert!(err_msg.contains("aaaa2222-0000-0000-0000-000000000002"));
6507 }
6508
6509 #[test]
6510 fn resolve_id_exact_then_prefix() {
6511 let conn = test_db();
6512 let mem = make_memory("Resolve test", "test", Tier::Long, 5);
6513 let id = insert(&conn, &mem).unwrap();
6514 let got = resolve_id(&conn, &id).unwrap().unwrap();
6516 assert_eq!(got.id, id);
6517 let got2 = resolve_id(&conn, &id[..8]).unwrap().unwrap();
6519 assert_eq!(got2.id, id);
6520 let got3 = resolve_id(&conn, "zzzzzzzz").unwrap();
6522 assert!(got3.is_none());
6523 }
6524
6525 #[test]
6526 fn insert_if_newer_updates() {
6527 let conn = test_db();
6528 let mut mem = make_memory("Sync test", "test", Tier::Long, 5);
6529 let id = insert(&conn, &mem).unwrap();
6530
6531 mem.id = id.clone();
6532 mem.content = "Updated via sync".to_string();
6533 mem.updated_at = (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
6534 let result_id = insert_if_newer(&conn, &mem).unwrap();
6535 assert_eq!(result_id, id);
6536
6537 let got = get(&conn, &id).unwrap().unwrap();
6538 assert_eq!(got.content, "Updated via sync");
6539 }
6540
6541 #[test]
6544 fn metadata_default_empty_object() {
6545 let conn = test_db();
6546 let mem = make_memory("Default metadata", "test", Tier::Long, 5);
6547 let id = insert(&conn, &mem).unwrap();
6548 let got = get(&conn, &id).unwrap().unwrap();
6549 assert_eq!(got.metadata, serde_json::json!({}));
6550 }
6551
6552 #[test]
6553 fn metadata_store_and_retrieve() {
6554 let conn = test_db();
6555 let mut mem = make_memory("With metadata", "test", Tier::Long, 5);
6556 mem.metadata = serde_json::json!({"agent_id": "claude-1", "session": 42});
6557 let id = insert(&conn, &mem).unwrap();
6558 let got = get(&conn, &id).unwrap().unwrap();
6559 assert_eq!(got.metadata["agent_id"], "claude-1");
6560 assert_eq!(got.metadata["session"], 42);
6561 }
6562
6563 #[test]
6564 fn metadata_roundtrip_nested_json() {
6565 let conn = test_db();
6566 let mut mem = make_memory("Nested metadata", "test", Tier::Long, 5);
6567 mem.metadata = serde_json::json!({
6568 "agent": {"type": "ai:claude", "version": "4.6"},
6569 "tags_extra": ["experimental"],
6570 "score": 0.95
6571 });
6572 let id = insert(&conn, &mem).unwrap();
6573 let got = get(&conn, &id).unwrap().unwrap();
6574 assert_eq!(got.metadata["agent"]["type"], "ai:claude");
6575 assert_eq!(got.metadata["tags_extra"][0], "experimental");
6576 assert!((got.metadata["score"].as_f64().unwrap() - 0.95).abs() < f64::EPSILON);
6577 }
6578
6579 #[test]
6580 fn metadata_preserved_on_update() {
6581 let conn = test_db();
6582 let mut mem = make_memory("Update metadata", "test", Tier::Long, 5);
6583 mem.metadata = serde_json::json!({"key": "original"});
6584 let id = insert(&conn, &mem).unwrap();
6585
6586 let (found, _) = update(
6588 &conn,
6589 &id,
6590 None,
6591 Some("new content"),
6592 None,
6593 None,
6594 None,
6595 None,
6596 None,
6597 None,
6598 None,
6599 )
6600 .unwrap();
6601 assert!(found);
6602 let got = get(&conn, &id).unwrap().unwrap();
6603 assert_eq!(got.metadata["key"], "original");
6604 assert_eq!(got.content, "new content");
6605
6606 let new_meta = serde_json::json!({"key": "updated", "extra": true});
6608 let (found, _) = update(
6609 &conn,
6610 &id,
6611 None,
6612 None,
6613 None,
6614 None,
6615 None,
6616 None,
6617 None,
6618 None,
6619 Some(&new_meta),
6620 )
6621 .unwrap();
6622 assert!(found);
6623 let got = get(&conn, &id).unwrap().unwrap();
6624 assert_eq!(got.metadata["key"], "updated");
6625 assert_eq!(got.metadata["extra"], true);
6626 }
6627
6628 #[test]
6629 fn metadata_preserved_on_upsert() {
6630 let conn = test_db();
6631 let mut mem = make_memory("Upsert meta", "test", Tier::Long, 5);
6632 mem.metadata = serde_json::json!({"version": 1});
6633 insert(&conn, &mem).unwrap();
6634
6635 let mut mem2 = make_memory("Upsert meta", "test", Tier::Long, 5);
6637 mem2.metadata = serde_json::json!({"version": 2});
6638 let id = insert(&conn, &mem2).unwrap();
6639 let got = get(&conn, &id).unwrap().unwrap();
6640 assert_eq!(got.metadata["version"], 2);
6641 }
6642
6643 #[test]
6644 fn metadata_in_list_and_search() {
6645 let conn = test_db();
6646 let mut mem = make_memory("Searchable metadata", "test", Tier::Long, 8);
6647 mem.metadata = serde_json::json!({"source_model": "opus"});
6648 insert(&conn, &mem).unwrap();
6649
6650 let results = list(
6651 &conn,
6652 Some("test"),
6653 None,
6654 10,
6655 0,
6656 None,
6657 None,
6658 None,
6659 None,
6660 None,
6661 )
6662 .unwrap();
6663 assert_eq!(results.len(), 1);
6664 assert_eq!(results[0].metadata["source_model"], "opus");
6665
6666 let results = search(
6667 &conn,
6668 "Searchable",
6669 Some("test"),
6670 None,
6671 10,
6672 None,
6673 None,
6674 None,
6675 None,
6676 None,
6677 None,
6678 )
6679 .unwrap();
6680 assert_eq!(results.len(), 1);
6681 assert_eq!(results[0].metadata["source_model"], "opus");
6682 }
6683
6684 #[test]
6685 fn metadata_in_recall() {
6686 let conn = test_db();
6687 let mut mem = make_memory("Recallable metadata", "test", Tier::Long, 8);
6688 mem.metadata = serde_json::json!({"context": "test-recall"});
6689 insert(&conn, &mem).unwrap();
6690
6691 let (results, _tokens) = recall(
6692 &conn,
6693 "Recallable",
6694 Some("test"),
6695 10,
6696 None,
6697 None,
6698 None,
6699 3600,
6700 86400,
6701 None,
6702 None,
6703 )
6704 .unwrap();
6705 assert!(!results.is_empty());
6706 assert_eq!(results[0].0.metadata["context"], "test-recall");
6707 }
6708
6709 #[test]
6710 fn metadata_in_export_import() {
6711 let conn = test_db();
6712 let mut mem = make_memory("Export metadata", "test", Tier::Long, 5);
6713 mem.metadata = serde_json::json!({"exported": true});
6714 insert(&conn, &mem).unwrap();
6715
6716 let exported = export_all(&conn).unwrap();
6717 assert_eq!(exported.len(), 1);
6718 assert_eq!(exported[0].metadata["exported"], true);
6719
6720 let conn2 = test_db();
6722 insert(&conn2, &exported[0]).unwrap();
6723 let got = get(&conn2, &exported[0].id).unwrap().unwrap();
6724 assert_eq!(got.metadata["exported"], true);
6725 }
6726
6727 #[test]
6728 fn metadata_schema_migration() {
6729 let conn = test_db();
6732 let mem = make_memory("Migration test", "test", Tier::Long, 5);
6733 let id = insert(&conn, &mem).unwrap();
6734
6735 let metadata_str: String = conn
6737 .query_row(
6738 "SELECT metadata FROM memories WHERE id = ?1",
6739 params![id],
6740 |r| r.get(0),
6741 )
6742 .unwrap();
6743 assert_eq!(metadata_str, "{}");
6744 }
6745
6746 #[test]
6747 fn metadata_survives_archive_restore_cycle() {
6748 let conn = test_db();
6749 let mut mem = make_memory("Archivable", "test", Tier::Short, 5);
6750 mem.metadata = serde_json::json!({"origin": "archive-test"});
6751 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
6753 let id = insert(&conn, &mem).unwrap();
6754
6755 let deleted = gc(&conn, true).unwrap();
6757 assert_eq!(deleted, 1);
6758
6759 let archived = list_archived(&conn, None, 10, 0).unwrap();
6761 assert_eq!(archived.len(), 1);
6762 assert_eq!(archived[0]["metadata"]["origin"], "archive-test");
6763
6764 let restored = restore_archived(&conn, &id).unwrap();
6766 assert!(restored);
6767 let got = get(&conn, &id).unwrap().unwrap();
6768 assert_eq!(got.metadata["origin"], "archive-test");
6769 }
6770
6771 #[test]
6772 fn metadata_in_insert_if_newer() {
6773 let conn = test_db();
6774 let mut mem = make_memory("Sync metadata", "test", Tier::Long, 5);
6775 mem.metadata = serde_json::json!({"version": 1});
6776 let id = insert(&conn, &mem).unwrap();
6777
6778 mem.id = id.clone();
6780 mem.metadata = serde_json::json!({"version": 2, "synced": true});
6781 mem.updated_at = (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339();
6782 insert_if_newer(&conn, &mem).unwrap();
6783
6784 let got = get(&conn, &id).unwrap().unwrap();
6785 assert_eq!(got.metadata["version"], 2);
6786 assert_eq!(got.metadata["synced"], true);
6787
6788 mem.metadata = serde_json::json!({"version": 0, "stale": true});
6790 mem.updated_at = "2020-01-01T00:00:00+00:00".to_string();
6791 insert_if_newer(&conn, &mem).unwrap();
6792
6793 let got = get(&conn, &id).unwrap().unwrap();
6794 assert_eq!(got.metadata["version"], 2); assert!(got.metadata.get("stale").is_none());
6796 }
6797
6798 #[test]
6799 fn metadata_merged_in_consolidate() {
6800 let conn = test_db();
6801 let mut mem_a = make_memory("Consolidate A", "test", Tier::Long, 5);
6802 mem_a.metadata = serde_json::json!({"agent": "claude", "shared": "from_a"});
6803 let id_a = insert(&conn, &mem_a).unwrap();
6804
6805 let mut mem_b = make_memory("Consolidate B", "test", Tier::Long, 7);
6806 mem_b.metadata = serde_json::json!({"model": "opus", "shared": "from_b"});
6807 let id_b = insert(&conn, &mem_b).unwrap();
6808
6809 let new_id = consolidate(
6810 &conn,
6811 &[id_a, id_b],
6812 "Merged",
6813 "Combined content",
6814 "test",
6815 &Tier::Long,
6816 "consolidation",
6817 "test-consolidator",
6818 )
6819 .unwrap();
6820
6821 let got = get(&conn, &new_id).unwrap().unwrap();
6822 assert_eq!(got.metadata["agent"], "claude");
6824 assert_eq!(got.metadata["model"], "opus");
6825 assert_eq!(got.metadata["shared"], "from_b");
6826 }
6827
6828 #[test]
6829 fn metadata_consolidate_rejects_oversized_merge() {
6830 let conn = test_db();
6831 let mut mem_a = make_memory("Big meta A", "test", Tier::Long, 5);
6833 let big_val_a: serde_json::Map<String, serde_json::Value> = (0..500)
6834 .map(|i| {
6835 (
6836 format!("key_a_{i}"),
6837 serde_json::Value::String("x".repeat(60)),
6838 )
6839 })
6840 .collect();
6841 mem_a.metadata = serde_json::Value::Object(big_val_a);
6842 let id_a = insert(&conn, &mem_a).unwrap();
6843
6844 let mut mem_b = make_memory("Big meta B", "test", Tier::Long, 5);
6845 let big_val_b: serde_json::Map<String, serde_json::Value> = (0..500)
6846 .map(|i| {
6847 (
6848 format!("key_b_{i}"),
6849 serde_json::Value::String("x".repeat(60)),
6850 )
6851 })
6852 .collect();
6853 mem_b.metadata = serde_json::Value::Object(big_val_b);
6854 let id_b = insert(&conn, &mem_b).unwrap();
6855
6856 let result = consolidate(
6858 &conn,
6859 &[id_a, id_b],
6860 "Oversized merge",
6861 "Should fail",
6862 "test",
6863 &Tier::Long,
6864 "consolidation",
6865 "test-consolidator",
6866 );
6867 let err = result.expect_err("consolidate should fail for oversized merged metadata");
6868 let msg = err.to_string();
6869 assert!(
6870 msg.contains("merged metadata exceeds size limit"),
6871 "expected metadata size error, got: {msg}"
6872 );
6873 }
6874
6875 #[test]
6876 fn metadata_special_characters_roundtrip() {
6877 let conn = test_db();
6878 let mut mem = make_memory("Special chars metadata", "test", Tier::Long, 5);
6879 mem.metadata = serde_json::json!({
6880 "pipe": "a|b|c",
6881 "newline": "line1\nline2",
6882 "tab": "col1\tcol2",
6883 "backslash": "path\\to\\file",
6884 "unicode": "\u{1F600}\u{1F4A9}",
6885 "cjk": "\u{4e16}\u{754c}",
6886 "empty": "",
6887 "nested_special": {"inner|key": "val\nue"}
6888 });
6889 let id = insert(&conn, &mem).unwrap();
6890 let got = get(&conn, &id).unwrap().unwrap();
6891 assert_eq!(got.metadata["pipe"], "a|b|c");
6892 assert_eq!(got.metadata["newline"], "line1\nline2");
6893 assert_eq!(got.metadata["unicode"], "\u{1F600}\u{1F4A9}");
6894 assert_eq!(got.metadata["cjk"], "\u{4e16}\u{754c}");
6895 assert_eq!(got.metadata["nested_special"]["inner|key"], "val\nue");
6896 }
6897
6898 #[test]
6899 fn metadata_corrupt_column_falls_back_to_empty() {
6900 let conn = test_db();
6901 let mem = make_memory("Corrupt test", "test", Tier::Long, 5);
6902 let id = insert(&conn, &mem).unwrap();
6903
6904 conn.execute(
6906 "UPDATE memories SET metadata = 'NOT VALID JSON {{{{' WHERE id = ?1",
6907 params![id],
6908 )
6909 .unwrap();
6910
6911 let got = get(&conn, &id).unwrap().unwrap();
6913 assert_eq!(got.metadata, serde_json::json!({}));
6914 }
6915
6916 #[test]
6917 fn metadata_restore_resets_corrupt_archived_metadata() {
6918 let conn = test_db();
6919 let mut mem = make_memory("Corrupt archive", "test", Tier::Short, 5);
6920 mem.metadata = serde_json::json!({"valid": true});
6921 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
6922 let id = insert(&conn, &mem).unwrap();
6923
6924 gc(&conn, true).unwrap();
6926
6927 conn.execute(
6929 "UPDATE archived_memories SET metadata = 'CORRUPT JSON' WHERE id = ?1",
6930 params![id],
6931 )
6932 .unwrap();
6933
6934 let restored = restore_archived(&conn, &id).unwrap();
6936 assert!(restored);
6937 let got = get(&conn, &id).unwrap().unwrap();
6938 assert_eq!(got.metadata, serde_json::json!({}));
6939 }
6940
6941 #[test]
6942 fn scope_index_exists_after_migration() {
6943 let conn = test_db();
6946 let has_col: bool = conn
6947 .prepare("SELECT scope_idx FROM memories LIMIT 0")
6948 .is_ok();
6949 assert!(has_col, "scope_idx generated column missing");
6950 let idx_exists: i64 = conn
6951 .query_row(
6952 "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name='idx_memories_scope_idx'",
6953 [],
6954 |row| row.get(0),
6955 )
6956 .unwrap();
6957 assert_eq!(idx_exists, 1, "idx_memories_scope_idx missing");
6958 }
6959
6960 #[test]
6961 fn scope_index_used_for_direct_scope_filter() {
6962 let conn = test_db();
6975 for i in 0..200 {
6977 let scope = if i % 3 == 0 { "collective" } else { "private" };
6978 let mut mem = make_memory(&format!("row-{i}"), "test", Tier::Long, 5);
6979 mem.metadata = serde_json::json!({"scope": scope});
6980 insert(&conn, &mem).unwrap();
6981 }
6982 conn.execute("ANALYZE", []).unwrap();
6983 let plan: Vec<String> = conn
6984 .prepare("EXPLAIN QUERY PLAN SELECT id FROM memories WHERE scope_idx = ?1")
6985 .unwrap()
6986 .query_map(params!["collective"], |row| row.get::<_, String>(3))
6987 .unwrap()
6988 .collect::<rusqlite::Result<_>>()
6989 .unwrap();
6990 let joined = plan.join("\n");
6991 assert!(
6992 joined.contains("idx_memories_scope_idx"),
6993 "direct scope filter must use idx_memories_scope_idx; got:\n{joined}"
6994 );
6995 }
6996
6997 #[test]
6998 fn scope_idx_reflects_metadata_on_insert_and_update() {
6999 let conn = test_db();
7002 let mut mem = make_memory("scope-tracking", "test", Tier::Long, 5);
7003 mem.metadata = serde_json::json!({"scope": "team"});
7004 let id = insert(&conn, &mem).unwrap();
7005 let scope: String = conn
7006 .query_row(
7007 "SELECT scope_idx FROM memories WHERE id = ?1",
7008 params![id],
7009 |r| r.get(0),
7010 )
7011 .unwrap();
7012 assert_eq!(scope, "team");
7013
7014 let new_meta = serde_json::json!({"scope": "unit"});
7016 update(
7017 &conn,
7018 &id,
7019 None,
7020 None,
7021 None,
7022 None,
7023 None,
7024 None,
7025 None,
7026 None,
7027 Some(&new_meta),
7028 )
7029 .unwrap();
7030 let scope2: String = conn
7031 .query_row(
7032 "SELECT scope_idx FROM memories WHERE id = ?1",
7033 params![id],
7034 |r| r.get(0),
7035 )
7036 .unwrap();
7037 assert_eq!(scope2, "unit");
7038
7039 let mut bare = make_memory("no-scope-key", "test", Tier::Long, 5);
7041 bare.metadata = serde_json::json!({});
7042 let id2 = insert(&conn, &bare).unwrap();
7043 let scope3: String = conn
7044 .query_row(
7045 "SELECT scope_idx FROM memories WHERE id = ?1",
7046 params![id2],
7047 |r| r.get(0),
7048 )
7049 .unwrap();
7050 assert_eq!(scope3, "private");
7051 }
7052
7053 #[test]
7054 fn auto_purge_archive_respects_max_days() {
7055 let conn = test_db();
7056 let mut mem = make_memory("Purge test", "test", Tier::Short, 5);
7057 mem.expires_at = Some("2020-01-01T00:00:00+00:00".to_string());
7058 insert(&conn, &mem).unwrap();
7059 gc(&conn, true).unwrap();
7060
7061 let archived = list_archived(&conn, None, 10, 0).unwrap();
7063 assert_eq!(archived.len(), 1);
7064
7065 conn.execute(
7067 "UPDATE archived_memories SET archived_at = ?1",
7068 params![(chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339()],
7069 )
7070 .unwrap();
7071
7072 let purged = auto_purge_archive(&conn, None).unwrap();
7074 assert_eq!(purged, 0);
7075 assert_eq!(list_archived(&conn, None, 10, 0).unwrap().len(), 1);
7076
7077 let purged = auto_purge_archive(&conn, Some(0)).unwrap();
7079 assert_eq!(purged, 0);
7080
7081 let purged = auto_purge_archive(&conn, Some(90)).unwrap();
7083 assert_eq!(purged, 0);
7084
7085 let purged = auto_purge_archive(&conn, Some(7)).unwrap();
7087 assert_eq!(purged, 1);
7088 assert!(list_archived(&conn, None, 10, 0).unwrap().is_empty());
7089 }
7090
7091 fn column_exists(conn: &Connection, table: &str, column: &str) -> bool {
7096 let mut stmt = conn
7097 .prepare(&format!("PRAGMA table_info({table})"))
7098 .unwrap();
7099 let cols: Vec<String> = stmt
7100 .query_map([], |row| row.get::<_, String>(1))
7101 .unwrap()
7102 .filter_map(Result::ok)
7103 .collect();
7104 cols.iter().any(|c| c == column)
7105 }
7106
7107 fn index_exists(conn: &Connection, name: &str) -> bool {
7108 conn.query_row(
7109 "SELECT 1 FROM sqlite_master WHERE type='index' AND name=?1",
7110 params![name],
7111 |r| r.get::<_, i64>(0),
7112 )
7113 .is_ok()
7114 }
7115
7116 #[test]
7117 fn schema_v15_memory_links_has_temporal_columns() {
7118 let conn = test_db();
7119 assert!(column_exists(&conn, "memory_links", "valid_from"));
7120 assert!(column_exists(&conn, "memory_links", "valid_until"));
7121 assert!(column_exists(&conn, "memory_links", "observed_by"));
7122 assert!(column_exists(&conn, "memory_links", "signature"));
7123 }
7124
7125 #[test]
7126 fn schema_v15_memory_links_temporal_indexes_exist() {
7127 let conn = test_db();
7128 assert!(index_exists(&conn, "idx_links_temporal_src"));
7129 assert!(index_exists(&conn, "idx_links_temporal_tgt"));
7130 assert!(index_exists(&conn, "idx_links_relation"));
7131 }
7132
7133 #[test]
7134 fn schema_v15_entity_aliases_table_exists() {
7135 let conn = test_db();
7136 let count: i64 = conn
7137 .query_row("SELECT COUNT(*) FROM entity_aliases", [], |r| r.get(0))
7138 .unwrap();
7139 assert_eq!(count, 0);
7140 assert!(index_exists(&conn, "idx_entity_aliases_alias"));
7141 }
7142
7143 #[test]
7144 fn schema_v15_entity_aliases_primary_key_unique() {
7145 let conn = test_db();
7146 let now = chrono::Utc::now().to_rfc3339();
7147 conn.execute(
7148 "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
7149 params!["e1", "Alpha", &now],
7150 )
7151 .unwrap();
7152 let dup = conn.execute(
7153 "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
7154 params!["e1", "Alpha", &now],
7155 );
7156 assert!(dup.is_err(), "expected PK uniqueness violation");
7157 }
7158
7159 #[test]
7162 fn entity_register_creates_new_entity_with_aliases() {
7163 let conn = test_db();
7164 let aliases = vec!["pa".to_string(), "Project A".to_string()];
7165 let reg = entity_register(
7166 &conn,
7167 "Project Alpha",
7168 "projects/alpha",
7169 &aliases,
7170 &serde_json::json!({}),
7171 Some("test-agent"),
7172 )
7173 .unwrap();
7174 assert!(reg.created, "first registration must be created=true");
7175 assert_eq!(reg.canonical_name, "Project Alpha");
7176 assert_eq!(reg.namespace, "projects/alpha");
7177 assert_eq!(reg.aliases, vec!["Project A".to_string(), "pa".to_string()]);
7180
7181 let m = get(&conn, ®.entity_id).unwrap().unwrap();
7182 assert_eq!(m.title, "Project Alpha");
7183 assert_eq!(m.tier.rank(), Tier::Long.rank());
7184 assert!(m.tags.contains(&"entity".to_string()));
7185 assert_eq!(m.metadata["kind"], "entity");
7186 assert_eq!(m.metadata["agent_id"], "test-agent");
7187 }
7188
7189 #[test]
7190 fn entity_register_reuses_existing_and_merges_aliases() {
7191 let conn = test_db();
7192 let first = entity_register(
7193 &conn,
7194 "Project Alpha",
7195 "projects/alpha",
7196 &["pa".to_string()],
7197 &serde_json::json!({}),
7198 Some("a1"),
7199 )
7200 .unwrap();
7201 let second = entity_register(
7202 &conn,
7203 "Project Alpha",
7204 "projects/alpha",
7205 &["pa".to_string(), "alpha".to_string()],
7206 &serde_json::json!({}),
7207 Some("a2"),
7208 )
7209 .unwrap();
7210 assert!(first.created);
7211 assert!(!second.created, "second call must reuse the entity");
7212 assert_eq!(first.entity_id, second.entity_id);
7213 assert_eq!(second.aliases, vec!["pa".to_string(), "alpha".to_string()]);
7214 }
7215
7216 #[test]
7217 fn entity_register_errors_on_collision_with_non_entity_memory() {
7218 let conn = test_db();
7219 let mem = make_memory("Conflict", "projects/alpha", Tier::Long, 5);
7220 insert(&conn, &mem).unwrap();
7221 let err = entity_register(
7222 &conn,
7223 "Conflict",
7224 "projects/alpha",
7225 &[],
7226 &serde_json::json!({}),
7227 None,
7228 )
7229 .unwrap_err();
7230 let msg = format!("{err}");
7231 assert!(
7232 msg.contains("non-entity memory"),
7233 "expected collision error, got: {msg}"
7234 );
7235 }
7236
7237 #[test]
7238 fn entity_register_skips_blank_aliases() {
7239 let conn = test_db();
7240 let reg = entity_register(
7241 &conn,
7242 "Trim Test",
7243 "test",
7244 &[String::new(), " ".to_string(), "ok".to_string()],
7245 &serde_json::json!({}),
7246 None,
7247 )
7248 .unwrap();
7249 assert_eq!(reg.aliases, vec!["ok".to_string()]);
7250 }
7251
7252 #[test]
7253 fn entity_register_preserves_caller_metadata_keys() {
7254 let conn = test_db();
7255 let extra = serde_json::json!({"team": "platform", "kind": "ignored"});
7256 let reg = entity_register(&conn, "Service X", "svc", &[], &extra, None).unwrap();
7257 let m = get(&conn, ®.entity_id).unwrap().unwrap();
7258 assert_eq!(m.metadata["team"], "platform");
7259 assert_eq!(m.metadata["kind"], "entity");
7262 }
7263
7264 #[test]
7265 fn entity_get_by_alias_returns_record_with_full_alias_set() {
7266 let conn = test_db();
7267 let reg = entity_register(
7268 &conn,
7269 "Project Alpha",
7270 "projects/alpha",
7271 &["pa".to_string(), "alpha".to_string()],
7272 &serde_json::json!({}),
7273 None,
7274 )
7275 .unwrap();
7276 let got = entity_get_by_alias(&conn, "pa", None).unwrap().unwrap();
7277 assert_eq!(got.entity_id, reg.entity_id);
7278 assert_eq!(got.canonical_name, "Project Alpha");
7279 assert_eq!(got.namespace, "projects/alpha");
7280 assert_eq!(got.aliases, vec!["alpha".to_string(), "pa".to_string()]);
7283 }
7284
7285 #[test]
7286 fn entity_get_by_alias_returns_none_for_unknown_alias() {
7287 let conn = test_db();
7288 let got = entity_get_by_alias(&conn, "missing", None).unwrap();
7289 assert!(got.is_none());
7290 }
7291
7292 #[test]
7293 fn entity_get_by_alias_filters_by_namespace() {
7294 let conn = test_db();
7295 entity_register(
7296 &conn,
7297 "Acme",
7298 "ns_a",
7299 &["a".to_string()],
7300 &serde_json::json!({}),
7301 None,
7302 )
7303 .unwrap();
7304 entity_register(
7305 &conn,
7306 "Acme Corp",
7307 "ns_b",
7308 &["a".to_string()],
7309 &serde_json::json!({}),
7310 None,
7311 )
7312 .unwrap();
7313 let in_a = entity_get_by_alias(&conn, "a", Some("ns_a"))
7314 .unwrap()
7315 .unwrap();
7316 assert_eq!(in_a.namespace, "ns_a");
7317 assert_eq!(in_a.canonical_name, "Acme");
7318 let in_b = entity_get_by_alias(&conn, "a", Some("ns_b"))
7319 .unwrap()
7320 .unwrap();
7321 assert_eq!(in_b.namespace, "ns_b");
7322 assert_eq!(in_b.canonical_name, "Acme Corp");
7323 }
7324
7325 #[test]
7326 fn entity_get_by_alias_without_namespace_picks_most_recent() {
7327 let conn = test_db();
7328 entity_register(
7330 &conn,
7331 "Older",
7332 "ns_old",
7333 &["dup".to_string()],
7334 &serde_json::json!({}),
7335 None,
7336 )
7337 .unwrap();
7338 std::thread::sleep(std::time::Duration::from_millis(5));
7340 entity_register(
7341 &conn,
7342 "Newer",
7343 "ns_new",
7344 &["dup".to_string()],
7345 &serde_json::json!({}),
7346 None,
7347 )
7348 .unwrap();
7349 let got = entity_get_by_alias(&conn, "dup", None).unwrap().unwrap();
7350 assert_eq!(got.canonical_name, "Newer");
7351 assert_eq!(got.namespace, "ns_new");
7352 }
7353
7354 #[test]
7355 fn entity_get_by_alias_ignores_non_entity_memory_with_matching_alias() {
7356 let conn = test_db();
7357 let mut mem = make_memory("Decoy", "test", Tier::Long, 5);
7361 mem.metadata = serde_json::json!({});
7362 let mid = insert(&conn, &mem).unwrap();
7363 let now = chrono::Utc::now().to_rfc3339();
7364 conn.execute(
7365 "INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
7366 params![&mid, "decoy", &now],
7367 )
7368 .unwrap();
7369 let got = entity_get_by_alias(&conn, "decoy", None).unwrap();
7370 assert!(got.is_none(), "non-entity memories must not resolve");
7371 }
7372
7373 #[test]
7374 fn entity_register_idempotent_aliases_are_deduped() {
7375 let conn = test_db();
7376 let reg = entity_register(
7377 &conn,
7378 "Dedup",
7379 "test",
7380 &["x".to_string(), "x".to_string(), "y".to_string()],
7381 &serde_json::json!({}),
7382 None,
7383 )
7384 .unwrap();
7385 assert_eq!(reg.aliases.len(), 2);
7387 assert!(reg.aliases.contains(&"x".to_string()));
7388 assert!(reg.aliases.contains(&"y".to_string()));
7389 }
7390
7391 fn insert_link_at(
7396 conn: &Connection,
7397 source_id: &str,
7398 target_id: &str,
7399 relation: &str,
7400 valid_from: &str,
7401 ) {
7402 let now = chrono::Utc::now().to_rfc3339();
7403 conn.execute(
7404 "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
7405 VALUES (?1, ?2, ?3, ?4, ?5)",
7406 params![source_id, target_id, relation, now, valid_from],
7407 )
7408 .unwrap();
7409 }
7410
7411 #[test]
7412 fn create_link_populates_valid_from_for_new_rows() {
7413 let conn = test_db();
7414 let src = make_memory("kg-src", "test", Tier::Long, 5);
7415 let tgt = make_memory("kg-tgt", "test", Tier::Long, 5);
7416 insert(&conn, &src).unwrap();
7417 insert(&conn, &tgt).unwrap();
7418 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7419 let valid_from: Option<String> = conn
7420 .query_row(
7421 "SELECT valid_from FROM memory_links WHERE source_id = ?1",
7422 params![&src.id],
7423 |r| r.get(0),
7424 )
7425 .unwrap();
7426 assert!(
7427 valid_from.is_some(),
7428 "create_link must populate valid_from so kg_timeline can see new links"
7429 );
7430 }
7431
7432 #[test]
7433 fn kg_timeline_returns_events_ordered_by_valid_from_ascending() {
7434 let conn = test_db();
7435 let src = make_memory("alpha", "kg/projects/alpha", Tier::Long, 5);
7436 let s1 = make_memory("kickoff", "kg/projects/alpha", Tier::Long, 5);
7437 let s2 = make_memory("design phase", "kg/projects/alpha", Tier::Long, 5);
7438 let s3 = make_memory("implementation", "kg/projects/alpha", Tier::Long, 5);
7439 insert(&conn, &src).unwrap();
7440 insert(&conn, &s1).unwrap();
7441 insert(&conn, &s2).unwrap();
7442 insert(&conn, &s3).unwrap();
7443
7444 insert_link_at(
7447 &conn,
7448 &src.id,
7449 &s2.id,
7450 "supersedes",
7451 "2026-02-03T00:00:00+00:00",
7452 );
7453 insert_link_at(
7454 &conn,
7455 &src.id,
7456 &s1.id,
7457 "related_to",
7458 "2026-01-15T00:00:00+00:00",
7459 );
7460 insert_link_at(
7461 &conn,
7462 &src.id,
7463 &s3.id,
7464 "supersedes",
7465 "2026-03-22T00:00:00+00:00",
7466 );
7467
7468 let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
7469 assert_eq!(events.len(), 3);
7470 assert_eq!(events[0].target_id, s1.id);
7471 assert_eq!(events[1].target_id, s2.id);
7472 assert_eq!(events[2].target_id, s3.id);
7473 assert_eq!(events[0].title, "kickoff");
7474 assert_eq!(events[1].relation, "supersedes");
7475 assert_eq!(events[0].target_namespace, "kg/projects/alpha");
7476 }
7477
7478 #[test]
7479 fn kg_timeline_filters_by_since_inclusive() {
7480 let conn = test_db();
7481 let src = make_memory("e", "ns", Tier::Long, 5);
7482 let t1 = make_memory("e1", "ns", Tier::Long, 5);
7483 let t2 = make_memory("e2", "ns", Tier::Long, 5);
7484 insert(&conn, &src).unwrap();
7485 insert(&conn, &t1).unwrap();
7486 insert(&conn, &t2).unwrap();
7487 insert_link_at(&conn, &src.id, &t1.id, "rel", "2026-01-01T00:00:00+00:00");
7488 insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-03-01T00:00:00+00:00");
7489
7490 let events = kg_timeline(
7491 &conn,
7492 &src.id,
7493 Some("2026-02-01T00:00:00+00:00"),
7494 None,
7495 None,
7496 )
7497 .unwrap();
7498 assert_eq!(events.len(), 1);
7499 assert_eq!(events[0].target_id, t2.id);
7500
7501 let on_boundary = kg_timeline(
7503 &conn,
7504 &src.id,
7505 Some("2026-03-01T00:00:00+00:00"),
7506 None,
7507 None,
7508 )
7509 .unwrap();
7510 assert_eq!(on_boundary.len(), 1);
7511 }
7512
7513 #[test]
7514 fn kg_timeline_filters_by_until_inclusive() {
7515 let conn = test_db();
7516 let src = make_memory("e", "ns", Tier::Long, 5);
7517 let t1 = make_memory("e1", "ns", Tier::Long, 5);
7518 let t2 = make_memory("e2", "ns", Tier::Long, 5);
7519 insert(&conn, &src).unwrap();
7520 insert(&conn, &t1).unwrap();
7521 insert(&conn, &t2).unwrap();
7522 insert_link_at(&conn, &src.id, &t1.id, "rel", "2026-01-01T00:00:00+00:00");
7523 insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-03-01T00:00:00+00:00");
7524
7525 let events = kg_timeline(
7526 &conn,
7527 &src.id,
7528 None,
7529 Some("2026-02-01T00:00:00+00:00"),
7530 None,
7531 )
7532 .unwrap();
7533 assert_eq!(events.len(), 1);
7534 assert_eq!(events[0].target_id, t1.id);
7535 }
7536
7537 #[test]
7538 fn kg_timeline_skips_links_with_null_valid_from() {
7539 let conn = test_db();
7540 let src = make_memory("s", "ns", Tier::Long, 5);
7541 let t1 = make_memory("t1", "ns", Tier::Long, 5);
7542 let t2 = make_memory("t2", "ns", Tier::Long, 5);
7543 insert(&conn, &src).unwrap();
7544 insert(&conn, &t1).unwrap();
7545 insert(&conn, &t2).unwrap();
7546 let now = chrono::Utc::now().to_rfc3339();
7549 conn.execute(
7550 "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
7551 VALUES (?1, ?2, 'rel', ?3, NULL)",
7552 params![&src.id, &t1.id, &now],
7553 )
7554 .unwrap();
7555 insert_link_at(&conn, &src.id, &t2.id, "rel", "2026-01-01T00:00:00+00:00");
7556
7557 let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
7558 assert_eq!(events.len(), 1);
7559 assert_eq!(events[0].target_id, t2.id);
7560 }
7561
7562 #[test]
7563 fn kg_timeline_excludes_links_where_source_is_target() {
7564 let conn = test_db();
7569 let entity = make_memory("entity", "ns", Tier::Long, 5);
7570 let other = make_memory("other", "ns", Tier::Long, 5);
7571 insert(&conn, &entity).unwrap();
7572 insert(&conn, &other).unwrap();
7573 insert_link_at(
7574 &conn,
7575 &other.id,
7576 &entity.id,
7577 "rel",
7578 "2026-01-01T00:00:00+00:00",
7579 );
7580 let events = kg_timeline(&conn, &entity.id, None, None, None).unwrap();
7581 assert!(events.is_empty());
7582 }
7583
7584 #[test]
7585 fn kg_timeline_limit_clamped_to_max() {
7586 let conn = test_db();
7587 let src = make_memory("s", "ns", Tier::Long, 5);
7588 insert(&conn, &src).unwrap();
7589 for i in 0..5 {
7590 let t = make_memory(&format!("t{i}"), "ns", Tier::Long, 5);
7591 insert(&conn, &t).unwrap();
7592 insert_link_at(
7593 &conn,
7594 &src.id,
7595 &t.id,
7596 "rel",
7597 &format!("2026-01-0{}T00:00:00+00:00", i + 1),
7598 );
7599 }
7600 let events = kg_timeline(&conn, &src.id, None, None, Some(usize::MAX)).unwrap();
7604 assert_eq!(events.len(), 5);
7605
7606 let one = kg_timeline(&conn, &src.id, None, None, Some(0)).unwrap();
7608 assert_eq!(one.len(), 1);
7609 }
7610
7611 #[test]
7612 fn kg_timeline_carries_observed_by_and_valid_until() {
7613 let conn = test_db();
7614 let src = make_memory("s", "ns", Tier::Long, 5);
7615 let t = make_memory("t", "ns", Tier::Long, 5);
7616 insert(&conn, &src).unwrap();
7617 insert(&conn, &t).unwrap();
7618 let now = chrono::Utc::now().to_rfc3339();
7619 conn.execute(
7620 "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from, valid_until, observed_by) \
7621 VALUES (?1, ?2, 'supersedes', ?3, '2026-01-01T00:00:00+00:00', '2026-12-31T23:59:59+00:00', 'agent-pm-1')",
7622 params![&src.id, &t.id, &now],
7623 )
7624 .unwrap();
7625 let events = kg_timeline(&conn, &src.id, None, None, None).unwrap();
7626 assert_eq!(events.len(), 1);
7627 assert_eq!(events[0].observed_by.as_deref(), Some("agent-pm-1"));
7628 assert_eq!(
7629 events[0].valid_until.as_deref(),
7630 Some("2026-12-31T23:59:59+00:00")
7631 );
7632 }
7633
7634 #[test]
7635 fn kg_timeline_empty_for_unknown_source() {
7636 let conn = test_db();
7637 let events = kg_timeline(&conn, "nonexistent-id", None, None, None).unwrap();
7638 assert!(events.is_empty());
7639 }
7640
7641 #[test]
7644 fn invalidate_link_sets_valid_until_to_provided_timestamp() {
7645 let conn = test_db();
7646 let src = make_memory("inv-s", "test", Tier::Long, 5);
7647 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7648 insert(&conn, &src).unwrap();
7649 insert(&conn, &tgt).unwrap();
7650 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7651 let stamp = "2026-12-31T23:59:59+00:00";
7652 let res = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(stamp))
7653 .unwrap()
7654 .expect("link must exist");
7655 assert_eq!(res.valid_until, stamp);
7656 assert!(res.previous_valid_until.is_none());
7657 let stored: Option<String> = conn
7658 .query_row(
7659 "SELECT valid_until FROM memory_links \
7660 WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3",
7661 params![&src.id, &tgt.id, "related_to"],
7662 |r| r.get(0),
7663 )
7664 .unwrap();
7665 assert_eq!(stored.as_deref(), Some(stamp));
7666 }
7667
7668 #[test]
7669 fn invalidate_link_defaults_to_now_when_no_timestamp_provided() {
7670 let conn = test_db();
7671 let src = make_memory("inv-s", "test", Tier::Long, 5);
7672 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7673 insert(&conn, &src).unwrap();
7674 insert(&conn, &tgt).unwrap();
7675 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7676 let res = invalidate_link(&conn, &src.id, &tgt.id, "related_to", None)
7677 .unwrap()
7678 .expect("link must exist");
7679 let parsed = chrono::DateTime::parse_from_rfc3339(&res.valid_until)
7683 .expect("default valid_until must be RFC3339");
7684 let now = chrono::Utc::now();
7685 let drift = now.signed_duration_since(parsed.with_timezone(&chrono::Utc));
7686 assert!(
7687 drift.num_seconds().abs() < 60,
7688 "default valid_until {} should be near now {now}",
7689 res.valid_until
7690 );
7691 }
7692
7693 #[test]
7694 fn invalidate_link_returns_none_for_unknown_triple() {
7695 let conn = test_db();
7696 let res = invalidate_link(&conn, "missing-src", "missing-tgt", "related_to", None).unwrap();
7698 assert!(res.is_none());
7699 }
7700
7701 #[test]
7702 fn invalidate_link_returns_none_when_relation_does_not_match() {
7703 let conn = test_db();
7705 let src = make_memory("inv-s", "test", Tier::Long, 5);
7706 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7707 insert(&conn, &src).unwrap();
7708 insert(&conn, &tgt).unwrap();
7709 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7710 let res = invalidate_link(&conn, &src.id, &tgt.id, "supersedes", None).unwrap();
7711 assert!(res.is_none(), "must not match across relation values");
7712 }
7713
7714 #[test]
7715 fn invalidate_link_overwrites_existing_valid_until_and_reports_prior() {
7716 let conn = test_db();
7717 let src = make_memory("inv-s", "test", Tier::Long, 5);
7718 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7719 insert(&conn, &src).unwrap();
7720 insert(&conn, &tgt).unwrap();
7721 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7722 let first = "2026-06-01T00:00:00+00:00";
7723 let second = "2026-12-01T00:00:00+00:00";
7724 let r1 = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(first))
7725 .unwrap()
7726 .unwrap();
7727 assert!(r1.previous_valid_until.is_none());
7728 let r2 = invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(second))
7729 .unwrap()
7730 .unwrap();
7731 assert_eq!(r2.previous_valid_until.as_deref(), Some(first));
7732 assert_eq!(r2.valid_until, second);
7733 }
7734
7735 #[test]
7736 fn invalidate_link_distinguishes_relation_when_multiple_links_share_endpoints() {
7737 let conn = test_db();
7740 let src = make_memory("inv-s", "test", Tier::Long, 5);
7741 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7742 insert(&conn, &src).unwrap();
7743 insert(&conn, &tgt).unwrap();
7744 create_link(&conn, &src.id, &tgt.id, "related_to").unwrap();
7745 create_link(&conn, &src.id, &tgt.id, "supersedes").unwrap();
7746 let stamp = "2026-07-15T12:00:00+00:00";
7747 invalidate_link(&conn, &src.id, &tgt.id, "related_to", Some(stamp))
7748 .unwrap()
7749 .unwrap();
7750 let related: Option<String> = conn
7751 .query_row(
7752 "SELECT valid_until FROM memory_links \
7753 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'related_to'",
7754 params![&src.id, &tgt.id],
7755 |r| r.get(0),
7756 )
7757 .unwrap();
7758 let supers: Option<String> = conn
7759 .query_row(
7760 "SELECT valid_until FROM memory_links \
7761 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'supersedes'",
7762 params![&src.id, &tgt.id],
7763 |r| r.get(0),
7764 )
7765 .unwrap();
7766 assert_eq!(related.as_deref(), Some(stamp));
7767 assert!(
7768 supers.is_none(),
7769 "the sibling 'supersedes' link must remain valid"
7770 );
7771 }
7772
7773 #[test]
7774 fn invalidate_link_preserves_other_columns() {
7775 let conn = test_db();
7778 let src = make_memory("inv-s", "test", Tier::Long, 5);
7779 let tgt = make_memory("inv-t", "test", Tier::Long, 5);
7780 insert(&conn, &src).unwrap();
7781 insert(&conn, &tgt).unwrap();
7782 let now = chrono::Utc::now().to_rfc3339();
7783 conn.execute(
7784 "INSERT INTO memory_links \
7785 (source_id, target_id, relation, created_at, valid_from, observed_by) \
7786 VALUES (?1, ?2, 'related_to', ?3, '2026-01-01T00:00:00+00:00', 'agent-x')",
7787 params![&src.id, &tgt.id, &now],
7788 )
7789 .unwrap();
7790 invalidate_link(
7791 &conn,
7792 &src.id,
7793 &tgt.id,
7794 "related_to",
7795 Some("2026-12-31T23:59:59+00:00"),
7796 )
7797 .unwrap()
7798 .unwrap();
7799 let (vf, ob, ca): (Option<String>, Option<String>, String) = conn
7800 .query_row(
7801 "SELECT valid_from, observed_by, created_at FROM memory_links \
7802 WHERE source_id = ?1 AND target_id = ?2 AND relation = 'related_to'",
7803 params![&src.id, &tgt.id],
7804 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
7805 )
7806 .unwrap();
7807 assert_eq!(vf.as_deref(), Some("2026-01-01T00:00:00+00:00"));
7808 assert_eq!(ob.as_deref(), Some("agent-x"));
7809 assert_eq!(ca, now);
7810 }
7811
7812 fn insert_link_full(
7818 conn: &Connection,
7819 source_id: &str,
7820 target_id: &str,
7821 relation: &str,
7822 valid_from: Option<&str>,
7823 valid_until: Option<&str>,
7824 observed_by: Option<&str>,
7825 ) {
7826 let now = chrono::Utc::now().to_rfc3339();
7827 conn.execute(
7828 "INSERT INTO memory_links \
7829 (source_id, target_id, relation, created_at, valid_from, valid_until, observed_by) \
7830 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
7831 params![
7832 source_id,
7833 target_id,
7834 relation,
7835 now,
7836 valid_from,
7837 valid_until,
7838 observed_by
7839 ],
7840 )
7841 .unwrap();
7842 }
7843
7844 #[test]
7845 fn kg_query_returns_outbound_neighbors_at_depth_1() {
7846 let conn = test_db();
7847 let src = make_memory("alpha", "kg/projects/alpha", Tier::Long, 5);
7848 let n1 = make_memory("kickoff", "kg/projects/alpha", Tier::Long, 5);
7849 let n2 = make_memory("design", "kg/projects/alpha", Tier::Long, 5);
7850 insert(&conn, &src).unwrap();
7851 insert(&conn, &n1).unwrap();
7852 insert(&conn, &n2).unwrap();
7853 insert_link_full(
7854 &conn,
7855 &src.id,
7856 &n1.id,
7857 "related_to",
7858 Some("2026-01-15T00:00:00+00:00"),
7859 None,
7860 Some("agent-1"),
7861 );
7862 insert_link_full(
7863 &conn,
7864 &src.id,
7865 &n2.id,
7866 "supersedes",
7867 Some("2026-02-03T00:00:00+00:00"),
7868 None,
7869 Some("agent-2"),
7870 );
7871
7872 let nodes = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
7873 assert_eq!(nodes.len(), 2);
7874 assert_eq!(nodes[0].target_id, n1.id);
7876 assert_eq!(nodes[1].target_id, n2.id);
7877 assert_eq!(nodes[0].title, "kickoff");
7878 assert_eq!(nodes[0].relation, "related_to");
7879 assert_eq!(nodes[0].observed_by.as_deref(), Some("agent-1"));
7880 assert_eq!(nodes[0].depth, 1);
7881 assert_eq!(nodes[0].path, format!("{}->{}", src.id, n1.id));
7882 assert_eq!(nodes[0].target_namespace, "kg/projects/alpha");
7883 }
7884
7885 #[test]
7886 fn kg_query_filters_by_valid_at_window() {
7887 let conn = test_db();
7888 let src = make_memory("e", "ns", Tier::Long, 5);
7889 let t1 = make_memory("e1", "ns", Tier::Long, 5);
7890 let t2 = make_memory("e2", "ns", Tier::Long, 5);
7891 insert(&conn, &src).unwrap();
7892 insert(&conn, &t1).unwrap();
7893 insert(&conn, &t2).unwrap();
7894 insert_link_full(
7896 &conn,
7897 &src.id,
7898 &t1.id,
7899 "related_to",
7900 Some("2026-01-01T00:00:00+00:00"),
7901 Some("2026-02-01T00:00:00+00:00"),
7902 None,
7903 );
7904 insert_link_full(
7905 &conn,
7906 &src.id,
7907 &t2.id,
7908 "related_to",
7909 Some("2026-03-01T00:00:00+00:00"),
7910 None,
7911 None,
7912 );
7913
7914 let n_jan = kg_query(
7916 &conn,
7917 &src.id,
7918 1,
7919 Some("2026-01-15T00:00:00+00:00"),
7920 None,
7921 None,
7922 )
7923 .unwrap();
7924 assert_eq!(n_jan.len(), 1);
7925 assert_eq!(n_jan[0].target_id, t1.id);
7926
7927 let n_feb = kg_query(
7930 &conn,
7931 &src.id,
7932 1,
7933 Some("2026-02-15T00:00:00+00:00"),
7934 None,
7935 None,
7936 )
7937 .unwrap();
7938 assert!(n_feb.is_empty());
7939
7940 let n_apr = kg_query(
7942 &conn,
7943 &src.id,
7944 1,
7945 Some("2026-04-01T00:00:00+00:00"),
7946 None,
7947 None,
7948 )
7949 .unwrap();
7950 assert_eq!(n_apr.len(), 1);
7951 assert_eq!(n_apr[0].target_id, t2.id);
7952 }
7953
7954 #[test]
7955 fn kg_query_skips_null_valid_from_when_valid_at_filter_active() {
7956 let conn = test_db();
7957 let src = make_memory("s", "ns", Tier::Long, 5);
7958 let t = make_memory("t", "ns", Tier::Long, 5);
7959 insert(&conn, &src).unwrap();
7960 insert(&conn, &t).unwrap();
7961 insert_link_full(&conn, &src.id, &t.id, "related_to", None, None, None);
7964
7965 let with_filter = kg_query(
7966 &conn,
7967 &src.id,
7968 1,
7969 Some("2026-01-15T00:00:00+00:00"),
7970 None,
7971 None,
7972 )
7973 .unwrap();
7974 assert!(with_filter.is_empty());
7975
7976 let without = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
7978 assert_eq!(without.len(), 1);
7979 assert_eq!(without[0].target_id, t.id);
7980 }
7981
7982 #[test]
7983 fn kg_query_filters_by_allowed_agents() {
7984 let conn = test_db();
7985 let src = make_memory("s", "ns", Tier::Long, 5);
7986 let t1 = make_memory("t1", "ns", Tier::Long, 5);
7987 let t2 = make_memory("t2", "ns", Tier::Long, 5);
7988 let t3 = make_memory("t3", "ns", Tier::Long, 5);
7989 insert(&conn, &src).unwrap();
7990 insert(&conn, &t1).unwrap();
7991 insert(&conn, &t2).unwrap();
7992 insert(&conn, &t3).unwrap();
7993 insert_link_full(
7994 &conn,
7995 &src.id,
7996 &t1.id,
7997 "related_to",
7998 Some("2026-01-01T00:00:00+00:00"),
7999 None,
8000 Some("agent-a"),
8001 );
8002 insert_link_full(
8003 &conn,
8004 &src.id,
8005 &t2.id,
8006 "related_to",
8007 Some("2026-01-02T00:00:00+00:00"),
8008 None,
8009 Some("agent-b"),
8010 );
8011 insert_link_full(
8014 &conn,
8015 &src.id,
8016 &t3.id,
8017 "related_to",
8018 Some("2026-01-03T00:00:00+00:00"),
8019 None,
8020 None,
8021 );
8022
8023 let allow_a = vec!["agent-a".to_string()];
8024 let only_a = kg_query(&conn, &src.id, 1, None, Some(&allow_a), None).unwrap();
8025 assert_eq!(only_a.len(), 1);
8026 assert_eq!(only_a[0].target_id, t1.id);
8027
8028 let allow_both = vec!["agent-a".to_string(), "agent-b".to_string()];
8029 let both = kg_query(&conn, &src.id, 1, None, Some(&allow_both), None).unwrap();
8030 assert_eq!(both.len(), 2);
8031 }
8032
8033 #[test]
8034 fn kg_query_empty_allowed_agents_returns_zero_rows() {
8035 let conn = test_db();
8036 let src = make_memory("s", "ns", Tier::Long, 5);
8037 let t = make_memory("t", "ns", Tier::Long, 5);
8038 insert(&conn, &src).unwrap();
8039 insert(&conn, &t).unwrap();
8040 insert_link_full(
8041 &conn,
8042 &src.id,
8043 &t.id,
8044 "related_to",
8045 Some("2026-01-01T00:00:00+00:00"),
8046 None,
8047 Some("agent-a"),
8048 );
8049
8050 let unfiltered = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
8052 assert_eq!(unfiltered.len(), 1);
8053
8054 let empty: Vec<String> = Vec::new();
8057 let none = kg_query(&conn, &src.id, 1, None, Some(&empty), None).unwrap();
8058 assert!(none.is_empty());
8059 }
8060
8061 #[test]
8062 fn kg_query_rejects_max_depth_zero() {
8063 let conn = test_db();
8064 let src = make_memory("s", "ns", Tier::Long, 5);
8065 insert(&conn, &src).unwrap();
8066 let err = kg_query(&conn, &src.id, 0, None, None, None).unwrap_err();
8067 assert!(err.to_string().contains("max_depth"));
8068 }
8069
8070 #[test]
8071 fn kg_query_rejects_unsupported_max_depth() {
8072 let conn = test_db();
8076 let src = make_memory("s", "ns", Tier::Long, 5);
8077 insert(&conn, &src).unwrap();
8078 let err = kg_query(
8079 &conn,
8080 &src.id,
8081 KG_QUERY_MAX_SUPPORTED_DEPTH + 1,
8082 None,
8083 None,
8084 None,
8085 )
8086 .unwrap_err();
8087 let msg = err.to_string();
8088 assert!(msg.contains(&format!("max_depth={}", KG_QUERY_MAX_SUPPORTED_DEPTH + 1)));
8089 assert!(msg.contains(&format!("supported depth={KG_QUERY_MAX_SUPPORTED_DEPTH}")));
8090 }
8091
8092 #[test]
8093 fn kg_query_traverses_multiple_hops() {
8094 let conn = test_db();
8097 let src = make_memory("src", "ns", Tier::Long, 5);
8098 let mid = make_memory("mid", "ns", Tier::Long, 5);
8099 let leaf = make_memory("leaf", "ns", Tier::Long, 5);
8100 insert(&conn, &src).unwrap();
8101 insert(&conn, &mid).unwrap();
8102 insert(&conn, &leaf).unwrap();
8103 insert_link_full(
8104 &conn,
8105 &src.id,
8106 &mid.id,
8107 "related_to",
8108 Some("2026-01-01T00:00:00+00:00"),
8109 None,
8110 Some("agent-x"),
8111 );
8112 insert_link_full(
8113 &conn,
8114 &mid.id,
8115 &leaf.id,
8116 "supersedes",
8117 Some("2026-01-02T00:00:00+00:00"),
8118 None,
8119 Some("agent-x"),
8120 );
8121
8122 let d1 = kg_query(&conn, &src.id, 1, None, None, None).unwrap();
8124 assert_eq!(d1.len(), 1);
8125 assert_eq!(d1[0].target_id, mid.id);
8126 assert_eq!(d1[0].depth, 1);
8127
8128 let d2 = kg_query(&conn, &src.id, 2, None, None, None).unwrap();
8130 assert_eq!(d2.len(), 2);
8131 assert_eq!(d2[0].target_id, mid.id);
8132 assert_eq!(d2[0].depth, 1);
8133 assert_eq!(d2[0].path, format!("{}->{}", src.id, mid.id));
8134 assert_eq!(d2[1].target_id, leaf.id);
8135 assert_eq!(d2[1].depth, 2);
8136 assert_eq!(d2[1].relation, "supersedes");
8137 assert_eq!(d2[1].path, format!("{}->{}->{}", src.id, mid.id, leaf.id));
8138 }
8139
8140 #[test]
8141 fn kg_query_multi_hop_respects_valid_at_per_hop() {
8142 let conn = test_db();
8147 let src = make_memory("s", "ns", Tier::Long, 5);
8148 let mid = make_memory("m", "ns", Tier::Long, 5);
8149 let leaf = make_memory("l", "ns", Tier::Long, 5);
8150 insert(&conn, &src).unwrap();
8151 insert(&conn, &mid).unwrap();
8152 insert(&conn, &leaf).unwrap();
8153 insert_link_full(
8154 &conn,
8155 &src.id,
8156 &mid.id,
8157 "related_to",
8158 Some("2026-01-01T00:00:00+00:00"),
8159 Some("2026-02-01T00:00:00+00:00"),
8160 None,
8161 );
8162 insert_link_full(
8163 &conn,
8164 &mid.id,
8165 &leaf.id,
8166 "related_to",
8167 Some("2026-04-01T00:00:00+00:00"),
8168 None,
8169 None,
8170 );
8171
8172 let mid_only = kg_query(
8173 &conn,
8174 &src.id,
8175 3,
8176 Some("2026-01-15T00:00:00+00:00"),
8177 None,
8178 None,
8179 )
8180 .unwrap();
8181 assert_eq!(mid_only.len(), 1);
8182 assert_eq!(mid_only[0].target_id, mid.id);
8183
8184 let neither = kg_query(
8185 &conn,
8186 &src.id,
8187 3,
8188 Some("2026-04-15T00:00:00+00:00"),
8189 None,
8190 None,
8191 )
8192 .unwrap();
8193 assert!(neither.is_empty());
8194 }
8195
8196 #[test]
8197 fn kg_query_detects_cycles() {
8198 let conn = test_db();
8202 let a = make_memory("a", "ns", Tier::Long, 5);
8203 let b = make_memory("b", "ns", Tier::Long, 5);
8204 let c = make_memory("c", "ns", Tier::Long, 5);
8205 insert(&conn, &a).unwrap();
8206 insert(&conn, &b).unwrap();
8207 insert(&conn, &c).unwrap();
8208 insert_link_full(
8209 &conn,
8210 &a.id,
8211 &b.id,
8212 "related_to",
8213 Some("2026-01-01T00:00:00+00:00"),
8214 None,
8215 None,
8216 );
8217 insert_link_full(
8218 &conn,
8219 &b.id,
8220 &c.id,
8221 "related_to",
8222 Some("2026-01-02T00:00:00+00:00"),
8223 None,
8224 None,
8225 );
8226 insert_link_full(
8227 &conn,
8228 &c.id,
8229 &a.id,
8230 "related_to",
8231 Some("2026-01-03T00:00:00+00:00"),
8232 None,
8233 None,
8234 );
8235
8236 let nodes = kg_query(&conn, &a.id, 5, None, None, None).unwrap();
8237 assert_eq!(nodes.len(), 2);
8243 assert_eq!(nodes[0].target_id, b.id);
8244 assert_eq!(nodes[0].depth, 1);
8245 assert_eq!(nodes[1].target_id, c.id);
8246 assert_eq!(nodes[1].depth, 2);
8247 }
8248
8249 #[test]
8250 fn kg_query_multi_hop_filters_by_allowed_agents_per_hop() {
8251 let conn = test_db();
8254 let src = make_memory("s", "ns", Tier::Long, 5);
8255 let mid = make_memory("m", "ns", Tier::Long, 5);
8256 let leaf = make_memory("l", "ns", Tier::Long, 5);
8257 insert(&conn, &src).unwrap();
8258 insert(&conn, &mid).unwrap();
8259 insert(&conn, &leaf).unwrap();
8260 insert_link_full(
8261 &conn,
8262 &src.id,
8263 &mid.id,
8264 "related_to",
8265 Some("2026-01-01T00:00:00+00:00"),
8266 None,
8267 Some("agent-a"),
8268 );
8269 insert_link_full(
8270 &conn,
8271 &mid.id,
8272 &leaf.id,
8273 "related_to",
8274 Some("2026-01-02T00:00:00+00:00"),
8275 None,
8276 Some("agent-b"),
8277 );
8278
8279 let allow_a = vec!["agent-a".to_string()];
8280 let only_first = kg_query(&conn, &src.id, 3, None, Some(&allow_a), None).unwrap();
8281 assert_eq!(only_first.len(), 1);
8282 assert_eq!(only_first[0].target_id, mid.id);
8283
8284 let allow_both = vec!["agent-a".to_string(), "agent-b".to_string()];
8285 let both = kg_query(&conn, &src.id, 3, None, Some(&allow_both), None).unwrap();
8286 assert_eq!(both.len(), 2);
8287 assert_eq!(both[1].target_id, leaf.id);
8288 assert_eq!(both[1].depth, 2);
8289 }
8290
8291 #[test]
8292 fn kg_query_limit_clamped_to_max() {
8293 let conn = test_db();
8294 let src = make_memory("s", "ns", Tier::Long, 5);
8295 insert(&conn, &src).unwrap();
8296 for i in 0..3 {
8297 let t = make_memory(&format!("t{i}"), "ns", Tier::Long, 5);
8298 insert(&conn, &t).unwrap();
8299 insert_link_full(
8300 &conn,
8301 &src.id,
8302 &t.id,
8303 "related_to",
8304 Some(&format!("2026-01-{:02}T00:00:00+00:00", i + 1)),
8305 None,
8306 None,
8307 );
8308 }
8309
8310 let all = kg_query(&conn, &src.id, 1, None, None, Some(usize::MAX)).unwrap();
8313 assert_eq!(all.len(), 3);
8314
8315 let one = kg_query(&conn, &src.id, 1, None, None, Some(0)).unwrap();
8317 assert_eq!(one.len(), 1);
8318 }
8319
8320 #[test]
8321 fn kg_query_empty_for_unknown_source() {
8322 let conn = test_db();
8323 let nodes = kg_query(&conn, "no-such-id", 1, None, None, None).unwrap();
8324 assert!(nodes.is_empty());
8325 }
8326
8327 #[test]
8328 fn schema_v15_existing_links_get_valid_from_backfilled() {
8329 let path = std::env::temp_dir().join(format!(
8336 "ai_memory_v15_backfill_{}.db",
8337 uuid::Uuid::new_v4()
8338 ));
8339 {
8340 let conn = open(&path).unwrap();
8341 let src = make_memory("src", "test", Tier::Long, 5);
8342 let tgt = make_memory("tgt", "test", Tier::Long, 5);
8343 insert(&conn, &src).unwrap();
8344 insert(&conn, &tgt).unwrap();
8345 conn.execute(
8348 "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from) \
8349 VALUES (?1, ?2, 'related_to', ?3, NULL)",
8350 params![&src.id, &tgt.id, &chrono::Utc::now().to_rfc3339()],
8351 )
8352 .unwrap();
8353 conn.execute("DELETE FROM schema_version", []).unwrap();
8355 conn.execute("INSERT INTO schema_version (version) VALUES (14)", [])
8356 .unwrap();
8357 }
8358
8359 let conn2 = open(&path).unwrap();
8360 let backfilled: Option<String> = conn2
8361 .query_row("SELECT valid_from FROM memory_links LIMIT 1", [], |r| {
8362 r.get(0)
8363 })
8364 .unwrap();
8365 assert!(
8366 backfilled.is_some(),
8367 "expected valid_from to be backfilled, got NULL"
8368 );
8369 let _ = std::fs::remove_file(&path);
8370 }
8371
8372 #[test]
8373 fn namespace_prefix_query_index_available() {
8374 let conn = test_db();
8375 let result: Option<String> = conn
8381 .query_row(
8382 "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_memories_namespace'",
8383 [],
8384 |r| r.get(0),
8385 )
8386 .unwrap();
8387 assert_eq!(
8388 result,
8389 Some("idx_memories_namespace".to_string()),
8390 "idx_memories_namespace index should exist"
8391 );
8392
8393 let count: i64 = conn
8395 .query_row(
8396 "SELECT COUNT(*) FROM memories WHERE namespace LIKE 'test/%'",
8397 [],
8398 |r| r.get(0),
8399 )
8400 .unwrap();
8401 assert_eq!(count, 0);
8402 }
8403
8404 #[test]
8409 fn doctor_dim_violations_post_p2_returns_zero_on_fresh_db() {
8410 let conn = test_db();
8415 let result = doctor_dim_violations(&conn).unwrap();
8416 assert_eq!(result, Some(0));
8417 }
8418
8419 #[test]
8420 fn doctor_oldest_pending_age_secs_empty_queue() {
8421 let conn = test_db();
8422 let age = doctor_oldest_pending_age_secs(&conn).unwrap();
8423 assert_eq!(age, None);
8424 }
8425
8426 #[test]
8427 fn doctor_oldest_pending_age_secs_reports_age() {
8428 let conn = test_db();
8429 let one_hour_ago = (Utc::now() - chrono::Duration::hours(1)).to_rfc3339();
8430 conn.execute(
8431 "INSERT INTO pending_actions (id, action_type, namespace, payload, requested_by, requested_at, status)
8432 VALUES ('p1', 'store', 'ns', '{}', 'agent', ?1, 'pending')",
8433 params![one_hour_ago],
8434 )
8435 .unwrap();
8436 let age = doctor_oldest_pending_age_secs(&conn).unwrap().unwrap();
8437 assert!((3500..=3700).contains(&age), "expected ~3600s, got {age}");
8439 }
8440
8441 #[test]
8442 fn doctor_governance_coverage_with_namespace_meta() {
8443 let conn = test_db();
8444 let (with, without) = doctor_governance_coverage(&conn).unwrap();
8446 assert_eq!((with, without), (0, 0));
8447 }
8448
8449 #[test]
8450 fn doctor_governance_depth_distribution_chains() {
8451 let conn = test_db();
8452 let now = Utc::now().to_rfc3339();
8454 conn.execute(
8455 "INSERT INTO namespace_meta (namespace, parent_namespace, updated_at) VALUES ('root', NULL, ?1)",
8456 params![now],
8457 ).unwrap();
8458 conn.execute(
8459 "INSERT INTO namespace_meta (namespace, parent_namespace, updated_at) VALUES ('a', 'root', ?1)",
8460 params![now],
8461 ).unwrap();
8462 conn.execute(
8463 "INSERT INTO namespace_meta (namespace, parent_namespace, updated_at) VALUES ('a/b', 'a', ?1)",
8464 params![now],
8465 ).unwrap();
8466 conn.execute(
8467 "INSERT INTO namespace_meta (namespace, parent_namespace, updated_at) VALUES ('a/b/c', 'a/b', ?1)",
8468 params![now],
8469 ).unwrap();
8470 let dist = doctor_governance_depth_distribution(&conn).unwrap();
8471 assert_eq!(dist[0], 1, "root has depth 0");
8472 assert_eq!(dist[1], 1, "a has depth 1");
8473 assert_eq!(dist[2], 1, "a/b has depth 2");
8474 assert_eq!(dist[3], 1, "a/b/c has depth 3");
8475 }
8476
8477 #[test]
8478 fn doctor_webhook_delivery_totals_empty() {
8479 let conn = test_db();
8480 let (dispatched, failed) = doctor_webhook_delivery_totals(&conn).unwrap();
8481 assert_eq!((dispatched, failed), (0, 0));
8482 }
8483
8484 #[test]
8485 fn doctor_max_sync_skew_secs_empty() {
8486 let conn = test_db();
8487 let skew = doctor_max_sync_skew_secs(&conn).unwrap();
8488 assert_eq!(skew, None);
8489 }
8490
8491 #[test]
8494 fn audit_log_record_and_list_grant_and_deny() {
8495 let conn = test_db();
8496 record_capability_expansion(&conn, Some("alice"), "graph", true, None);
8497 record_capability_expansion(&conn, Some("bob"), "power", false, None);
8498 let rows = list_capability_expansions(&conn, 50, None).unwrap();
8499 assert_eq!(rows.len(), 2);
8500 assert!(rows[0].timestamp >= rows[1].timestamp);
8502 let grant_row = rows
8503 .iter()
8504 .find(|r| r.agent_id.as_deref() == Some("alice"))
8505 .unwrap();
8506 assert!(grant_row.granted);
8507 assert_eq!(grant_row.requested_family.as_deref(), Some("graph"));
8508 let deny_row = rows
8509 .iter()
8510 .find(|r| r.agent_id.as_deref() == Some("bob"))
8511 .unwrap();
8512 assert!(!deny_row.granted);
8513 assert_eq!(deny_row.requested_family.as_deref(), Some("power"));
8514 }
8515
8516 #[test]
8517 fn audit_log_filter_by_agent() {
8518 let conn = test_db();
8519 record_capability_expansion(&conn, Some("alice"), "graph", true, None);
8520 record_capability_expansion(&conn, Some("bob"), "power", false, None);
8521 let alice = list_capability_expansions(&conn, 50, Some("alice")).unwrap();
8522 assert_eq!(alice.len(), 1);
8523 assert_eq!(alice[0].agent_id.as_deref(), Some("alice"));
8524 let none_match = list_capability_expansions(&conn, 50, Some("nobody")).unwrap();
8525 assert!(none_match.is_empty());
8526 }
8527
8528 #[test]
8529 fn audit_log_anonymous_caller() {
8530 let conn = test_db();
8531 record_capability_expansion(&conn, None, "core", true, None);
8532 let rows = list_capability_expansions(&conn, 50, None).unwrap();
8533 assert_eq!(rows.len(), 1);
8534 assert!(rows[0].agent_id.is_none());
8535 }
8536
8537 #[test]
8538 fn audit_log_migration_idempotent_on_re_open() {
8539 let p = tempfile::NamedTempFile::new().unwrap();
8542 let p = p.path().to_path_buf();
8543 let _ = open(&p).unwrap();
8544 let conn = open(&p).unwrap();
8545 let cnt: i64 = conn
8547 .query_row(
8548 "SELECT count(*) FROM sqlite_master WHERE name LIKE 'idx_audit_log_%'",
8549 [],
8550 |r| r.get(0),
8551 )
8552 .unwrap();
8553 assert_eq!(
8554 cnt, 3,
8555 "expected 3 audit_log indexes (agent_id, ts, event_type)"
8556 );
8557 }
8558}