1use crate::types::{
5 ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6 KnowledgeCoverageRecord, KnowledgeItemRecord, KnowledgeItemStatus, KnowledgePromotionRequest,
7 KnowledgePromotionResult, KnowledgeSpaceRecord, MemoryChunk, MemoryConfig, MemoryError,
8 MemoryResult, MemoryStats, MemoryTier, ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
9};
10use chrono::{DateTime, Utc};
11use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
12use sqlite_vec::sqlite3_vec_init;
13use std::collections::HashSet;
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Mutex;
18
19type ProjectIndexStatusRow = (
20 Option<String>,
21 Option<i64>,
22 Option<i64>,
23 Option<i64>,
24 Option<i64>,
25 Option<i64>,
26);
27
28pub struct MemoryDatabase {
30 conn: Arc<Mutex<Connection>>,
31 db_path: std::path::PathBuf,
32}
33
34impl MemoryDatabase {
35 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
37 unsafe {
39 sqlite3_auto_extension(Some(std::mem::transmute::<
40 *const (),
41 unsafe extern "C" fn(
42 *mut rusqlite::ffi::sqlite3,
43 *mut *mut i8,
44 *const rusqlite::ffi::sqlite3_api_routines,
45 ) -> i32,
46 >(sqlite3_vec_init as *const ())));
47 }
48
49 let conn = Connection::open(db_path)?;
50 conn.busy_timeout(Duration::from_secs(10))?;
51
52 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
55 conn.execute("PRAGMA synchronous = NORMAL", [])?;
56
57 let db = Self {
58 conn: Arc::new(Mutex::new(conn)),
59 db_path: db_path.to_path_buf(),
60 };
61
62 db.init_schema().await?;
64 if let Err(err) = db.validate_vector_tables().await {
65 match &err {
66 crate::types::MemoryError::Database(db_err)
67 if Self::is_vector_table_error(db_err) =>
68 {
69 tracing::warn!(
70 "Detected vector table corruption during startup ({}). Recreating vector tables.",
71 db_err
72 );
73 db.recreate_vector_tables().await?;
74 }
75 _ => return Err(err),
76 }
77 }
78 db.validate_integrity().await?;
79
80 Ok(db)
81 }
82
83 async fn validate_integrity(&self) -> MemoryResult<()> {
85 let conn = self.conn.lock().await;
86 let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
87 {
88 Ok(value) => value,
89 Err(err) => {
90 tracing::warn!(
94 "Skipping strict PRAGMA quick_check due to probe error: {}",
95 err
96 );
97 return Ok(());
98 }
99 };
100 if check.trim().eq_ignore_ascii_case("ok") {
101 return Ok(());
102 }
103
104 let lowered = check.to_lowercase();
105 if lowered.contains("malformed")
106 || lowered.contains("corrupt")
107 || lowered.contains("database disk image is malformed")
108 {
109 return Err(crate::types::MemoryError::InvalidConfig(format!(
110 "malformed database integrity check: {}",
111 check
112 )));
113 }
114
115 tracing::warn!(
116 "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
117 check
118 );
119 Ok(())
120 }
121
122 async fn init_schema(&self) -> MemoryResult<()> {
124 let conn = self.conn.lock().await;
125
126 conn.execute(
130 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
131 id TEXT PRIMARY KEY,
132 content TEXT NOT NULL,
133 session_id TEXT NOT NULL,
134 project_id TEXT,
135 source TEXT NOT NULL,
136 created_at TEXT NOT NULL,
137 token_count INTEGER NOT NULL DEFAULT 0,
138 metadata TEXT
139 )",
140 [],
141 )?;
142 let session_existing_cols: HashSet<String> = {
143 let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
144 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
145 rows.collect::<Result<HashSet<_>, _>>()?
146 };
147 if !session_existing_cols.contains("source_path") {
148 conn.execute(
149 "ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
150 [],
151 )?;
152 }
153 if !session_existing_cols.contains("source_mtime") {
154 conn.execute(
155 "ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
156 [],
157 )?;
158 }
159 if !session_existing_cols.contains("source_size") {
160 conn.execute(
161 "ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
162 [],
163 )?;
164 }
165 if !session_existing_cols.contains("source_hash") {
166 conn.execute(
167 "ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
168 [],
169 )?;
170 }
171
172 conn.execute(
174 &format!(
175 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
176 chunk_id TEXT PRIMARY KEY,
177 embedding float[{}]
178 )",
179 DEFAULT_EMBEDDING_DIMENSION
180 ),
181 [],
182 )?;
183
184 conn.execute(
186 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
187 id TEXT PRIMARY KEY,
188 content TEXT NOT NULL,
189 project_id TEXT NOT NULL,
190 session_id TEXT,
191 source TEXT NOT NULL,
192 created_at TEXT NOT NULL,
193 token_count INTEGER NOT NULL DEFAULT 0,
194 metadata TEXT
195 )",
196 [],
197 )?;
198
199 let existing_cols: HashSet<String> = {
202 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
203 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
204 rows.collect::<Result<HashSet<_>, _>>()?
205 };
206
207 if !existing_cols.contains("source_path") {
208 conn.execute(
209 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
210 [],
211 )?;
212 }
213 if !existing_cols.contains("source_mtime") {
214 conn.execute(
215 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
216 [],
217 )?;
218 }
219 if !existing_cols.contains("source_size") {
220 conn.execute(
221 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
222 [],
223 )?;
224 }
225 if !existing_cols.contains("source_hash") {
226 conn.execute(
227 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
228 [],
229 )?;
230 }
231
232 conn.execute(
234 &format!(
235 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
236 chunk_id TEXT PRIMARY KEY,
237 embedding float[{}]
238 )",
239 DEFAULT_EMBEDDING_DIMENSION
240 ),
241 [],
242 )?;
243
244 conn.execute(
246 "CREATE TABLE IF NOT EXISTS project_file_index (
247 project_id TEXT NOT NULL,
248 path TEXT NOT NULL,
249 mtime INTEGER NOT NULL,
250 size INTEGER NOT NULL,
251 hash TEXT NOT NULL,
252 indexed_at TEXT NOT NULL,
253 PRIMARY KEY(project_id, path)
254 )",
255 [],
256 )?;
257 conn.execute(
258 "CREATE TABLE IF NOT EXISTS session_file_index (
259 session_id TEXT NOT NULL,
260 path TEXT NOT NULL,
261 mtime INTEGER NOT NULL,
262 size INTEGER NOT NULL,
263 hash TEXT NOT NULL,
264 indexed_at TEXT NOT NULL,
265 PRIMARY KEY(session_id, path)
266 )",
267 [],
268 )?;
269
270 conn.execute(
271 "CREATE TABLE IF NOT EXISTS project_index_status (
272 project_id TEXT PRIMARY KEY,
273 last_indexed_at TEXT,
274 last_total_files INTEGER,
275 last_processed_files INTEGER,
276 last_indexed_files INTEGER,
277 last_skipped_files INTEGER,
278 last_errors INTEGER
279 )",
280 [],
281 )?;
282
283 conn.execute(
285 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
286 id TEXT PRIMARY KEY,
287 content TEXT NOT NULL,
288 source TEXT NOT NULL,
289 created_at TEXT NOT NULL,
290 token_count INTEGER NOT NULL DEFAULT 0,
291 metadata TEXT
292 )",
293 [],
294 )?;
295 let global_existing_cols: HashSet<String> = {
296 let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
297 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
298 rows.collect::<Result<HashSet<_>, _>>()?
299 };
300 if !global_existing_cols.contains("source_path") {
301 conn.execute(
302 "ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
303 [],
304 )?;
305 }
306 if !global_existing_cols.contains("source_mtime") {
307 conn.execute(
308 "ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
309 [],
310 )?;
311 }
312 if !global_existing_cols.contains("source_size") {
313 conn.execute(
314 "ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
315 [],
316 )?;
317 }
318 if !global_existing_cols.contains("source_hash") {
319 conn.execute(
320 "ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
321 [],
322 )?;
323 }
324
325 conn.execute(
327 &format!(
328 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
329 chunk_id TEXT PRIMARY KEY,
330 embedding float[{}]
331 )",
332 DEFAULT_EMBEDDING_DIMENSION
333 ),
334 [],
335 )?;
336
337 conn.execute(
339 "CREATE TABLE IF NOT EXISTS memory_config (
340 project_id TEXT PRIMARY KEY,
341 max_chunks INTEGER NOT NULL DEFAULT 10000,
342 chunk_size INTEGER NOT NULL DEFAULT 512,
343 retrieval_k INTEGER NOT NULL DEFAULT 5,
344 auto_cleanup INTEGER NOT NULL DEFAULT 1,
345 session_retention_days INTEGER NOT NULL DEFAULT 30,
346 token_budget INTEGER NOT NULL DEFAULT 5000,
347 chunk_overlap INTEGER NOT NULL DEFAULT 64,
348 updated_at TEXT NOT NULL
349 )",
350 [],
351 )?;
352
353 conn.execute(
355 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
356 id TEXT PRIMARY KEY,
357 cleanup_type TEXT NOT NULL,
358 tier TEXT NOT NULL,
359 project_id TEXT,
360 session_id TEXT,
361 chunks_deleted INTEGER NOT NULL DEFAULT 0,
362 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
363 created_at TEXT NOT NULL
364 )",
365 [],
366 )?;
367
368 conn.execute(
370 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
371 [],
372 )?;
373 conn.execute(
374 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
375 [],
376 )?;
377 conn.execute(
378 "CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
379 [],
380 )?;
381 conn.execute(
382 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
383 [],
384 )?;
385 conn.execute(
386 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
387 [],
388 )?;
389 conn.execute(
390 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
391 [],
392 )?;
393 conn.execute(
394 "CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
395 [],
396 )?;
397 conn.execute(
398 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
399 [],
400 )?;
401 conn.execute(
402 "CREATE TABLE IF NOT EXISTS global_file_index (
403 path TEXT PRIMARY KEY,
404 mtime INTEGER NOT NULL,
405 size INTEGER NOT NULL,
406 hash TEXT NOT NULL,
407 indexed_at TEXT NOT NULL
408 )",
409 [],
410 )?;
411
412 conn.execute(
414 "CREATE TABLE IF NOT EXISTS knowledge_spaces (
415 id TEXT PRIMARY KEY,
416 scope TEXT NOT NULL,
417 project_id TEXT,
418 namespace TEXT,
419 title TEXT,
420 description TEXT,
421 trust_level TEXT NOT NULL,
422 metadata TEXT,
423 created_at_ms INTEGER NOT NULL,
424 updated_at_ms INTEGER NOT NULL
425 )",
426 [],
427 )?;
428 conn.execute(
429 "CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_spaces_scope_project_namespace
430 ON knowledge_spaces(scope, IFNULL(project_id, ''), IFNULL(namespace, ''))",
431 [],
432 )?;
433 conn.execute(
434 "CREATE TABLE IF NOT EXISTS knowledge_items (
435 id TEXT PRIMARY KEY,
436 space_id TEXT NOT NULL,
437 coverage_key TEXT NOT NULL,
438 dedupe_key TEXT NOT NULL,
439 item_type TEXT NOT NULL,
440 title TEXT NOT NULL,
441 summary TEXT,
442 payload TEXT NOT NULL,
443 trust_level TEXT NOT NULL,
444 status TEXT NOT NULL,
445 run_id TEXT,
446 artifact_refs TEXT NOT NULL,
447 source_memory_ids TEXT NOT NULL,
448 freshness_expires_at_ms INTEGER,
449 metadata TEXT,
450 created_at_ms INTEGER NOT NULL,
451 updated_at_ms INTEGER NOT NULL,
452 FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
453 )",
454 [],
455 )?;
456 conn.execute(
457 "CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_items_space_dedupe
458 ON knowledge_items(space_id, dedupe_key)",
459 [],
460 )?;
461 conn.execute(
462 "CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_coverage
463 ON knowledge_items(space_id, coverage_key)",
464 [],
465 )?;
466 conn.execute(
467 "CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_created
468 ON knowledge_items(space_id, created_at_ms DESC)",
469 [],
470 )?;
471 conn.execute(
472 "CREATE TABLE IF NOT EXISTS knowledge_coverage (
473 coverage_key TEXT NOT NULL,
474 space_id TEXT NOT NULL,
475 latest_item_id TEXT,
476 latest_dedupe_key TEXT,
477 last_seen_at_ms INTEGER NOT NULL,
478 last_promoted_at_ms INTEGER,
479 freshness_expires_at_ms INTEGER,
480 metadata TEXT,
481 PRIMARY KEY(coverage_key, space_id),
482 FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
483 )",
484 [],
485 )?;
486 conn.execute(
487 "CREATE INDEX IF NOT EXISTS idx_knowledge_coverage_space_seen
488 ON knowledge_coverage(space_id, last_seen_at_ms DESC)",
489 [],
490 )?;
491
492 conn.execute(
494 "CREATE TABLE IF NOT EXISTS memory_records (
495 id TEXT PRIMARY KEY,
496 user_id TEXT NOT NULL,
497 source_type TEXT NOT NULL,
498 content TEXT NOT NULL,
499 content_hash TEXT NOT NULL,
500 run_id TEXT NOT NULL,
501 session_id TEXT,
502 message_id TEXT,
503 tool_name TEXT,
504 project_tag TEXT,
505 channel_tag TEXT,
506 host_tag TEXT,
507 metadata TEXT,
508 provenance TEXT,
509 redaction_status TEXT NOT NULL,
510 redaction_count INTEGER NOT NULL DEFAULT 0,
511 visibility TEXT NOT NULL DEFAULT 'private',
512 demoted INTEGER NOT NULL DEFAULT 0,
513 score_boost REAL NOT NULL DEFAULT 0.0,
514 created_at_ms INTEGER NOT NULL,
515 updated_at_ms INTEGER NOT NULL,
516 expires_at_ms INTEGER
517 )",
518 [],
519 )?;
520 conn.execute(
521 "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
522 ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
523 [],
524 )?;
525 conn.execute(
526 "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
527 ON memory_records(user_id, created_at_ms DESC)",
528 [],
529 )?;
530 conn.execute(
531 "CREATE INDEX IF NOT EXISTS idx_memory_records_run
532 ON memory_records(run_id)",
533 [],
534 )?;
535 conn.execute(
536 "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
537 id UNINDEXED,
538 user_id UNINDEXED,
539 content
540 )",
541 [],
542 )?;
543 conn.execute(
544 "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
545 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
546 END",
547 [],
548 )?;
549 conn.execute(
550 "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
551 DELETE FROM memory_records_fts WHERE id = old.id;
552 END",
553 [],
554 )?;
555 conn.execute(
556 "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
557 DELETE FROM memory_records_fts WHERE id = old.id;
558 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
559 END",
560 [],
561 )?;
562
563 conn.execute(
564 "CREATE TABLE IF NOT EXISTS memory_nodes (
565 id TEXT PRIMARY KEY,
566 uri TEXT NOT NULL UNIQUE,
567 parent_uri TEXT,
568 node_type TEXT NOT NULL,
569 created_at TEXT NOT NULL,
570 updated_at TEXT NOT NULL,
571 metadata TEXT
572 )",
573 [],
574 )?;
575 conn.execute(
576 "CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
577 [],
578 )?;
579 conn.execute(
580 "CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
581 [],
582 )?;
583
584 conn.execute(
585 "CREATE TABLE IF NOT EXISTS memory_layers (
586 id TEXT PRIMARY KEY,
587 node_id TEXT NOT NULL,
588 layer_type TEXT NOT NULL,
589 content TEXT NOT NULL,
590 token_count INTEGER NOT NULL,
591 embedding_id TEXT,
592 created_at TEXT NOT NULL,
593 source_chunk_id TEXT,
594 FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
595 )",
596 [],
597 )?;
598 conn.execute(
599 "CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
600 [],
601 )?;
602 conn.execute(
603 "CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
604 [],
605 )?;
606
607 conn.execute(
608 "CREATE TABLE IF NOT EXISTS memory_retrieval_state (
609 node_id TEXT PRIMARY KEY,
610 active_layer TEXT NOT NULL DEFAULT 'L0',
611 last_accessed TEXT,
612 access_count INTEGER DEFAULT 0,
613 FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
614 )",
615 [],
616 )?;
617
618 Ok(())
619 }
620
621 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
624 let conn = self.conn.lock().await;
625 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
626
627 for table in [
628 "session_memory_vectors",
629 "project_memory_vectors",
630 "global_memory_vectors",
631 ] {
632 let sql = format!("SELECT COUNT(*) FROM {}", table);
633 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
634
635 if row_count > 0 {
638 let probe_sql = format!(
639 "SELECT chunk_id, distance
640 FROM {}
641 WHERE embedding MATCH ?1 AND k = 1",
642 table
643 );
644 let mut stmt = conn.prepare(&probe_sql)?;
645 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
646 let _ = rows.next()?;
647 }
648 }
649 Ok(())
650 }
651
652 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
653 let text = err.to_string().to_lowercase();
654 text.contains("vector blob")
655 || text.contains("chunks iter error")
656 || text.contains("chunks iter")
657 || text.contains("internal sqlite-vec error")
658 || text.contains("insert rowids id")
659 || text.contains("sql logic error")
660 || text.contains("database disk image is malformed")
661 || text.contains("session_memory_vectors")
662 || text.contains("project_memory_vectors")
663 || text.contains("global_memory_vectors")
664 || text.contains("vec0")
665 }
666
667 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
668 let conn = self.conn.lock().await;
669
670 for base in [
671 "session_memory_vectors",
672 "project_memory_vectors",
673 "global_memory_vectors",
674 ] {
675 for name in [
677 base.to_string(),
678 format!("{}_chunks", base),
679 format!("{}_info", base),
680 format!("{}_rowids", base),
681 format!("{}_vector_chunks00", base),
682 ] {
683 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
684 conn.execute(&sql, [])?;
685 }
686
687 let like_pattern = format!("{base}_%");
689 let mut stmt = conn.prepare(
690 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
691 )?;
692 let table_names = stmt
693 .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
694 .collect::<Result<Vec<_>, _>>()?;
695 drop(stmt);
696 for name in table_names {
697 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
698 conn.execute(&sql, [])?;
699 }
700 }
701
702 conn.execute(
703 &format!(
704 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
705 chunk_id TEXT PRIMARY KEY,
706 embedding float[{}]
707 )",
708 DEFAULT_EMBEDDING_DIMENSION
709 ),
710 [],
711 )?;
712
713 conn.execute(
714 &format!(
715 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
716 chunk_id TEXT PRIMARY KEY,
717 embedding float[{}]
718 )",
719 DEFAULT_EMBEDDING_DIMENSION
720 ),
721 [],
722 )?;
723
724 conn.execute(
725 &format!(
726 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
727 chunk_id TEXT PRIMARY KEY,
728 embedding float[{}]
729 )",
730 DEFAULT_EMBEDDING_DIMENSION
731 ),
732 [],
733 )?;
734
735 Ok(())
736 }
737
738 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
741 match self.validate_vector_tables().await {
742 Ok(()) => Ok(false),
743 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
744 tracing::warn!(
745 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
746 err
747 );
748 self.recreate_vector_tables().await?;
749 Ok(true)
750 }
751 Err(err) => Err(err),
752 }
753 }
754
755 pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
759 let table_names = {
760 let conn = self.conn.lock().await;
761 let mut stmt = conn.prepare(
762 "SELECT name FROM sqlite_master
763 WHERE type='table'
764 AND name NOT LIKE 'sqlite_%'
765 ORDER BY name",
766 )?;
767 let names = stmt
768 .query_map([], |row| row.get::<_, String>(0))?
769 .collect::<Result<Vec<_>, _>>()?;
770 names
771 };
772
773 {
774 let conn = self.conn.lock().await;
775 for table in table_names {
776 let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
777 let _ = conn.execute(&sql, []);
778 }
779 }
780
781 self.init_schema().await
782 }
783
784 pub async fn try_repair_after_error(
787 &self,
788 err: &crate::types::MemoryError,
789 ) -> MemoryResult<bool> {
790 match err {
791 crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
792 tracing::warn!(
793 "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
794 db_err
795 );
796 self.recreate_vector_tables().await?;
797 Ok(true)
798 }
799 _ => Ok(false),
800 }
801 }
802
803 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
805 let conn = self.conn.lock().await;
806
807 let (chunks_table, vectors_table) = match chunk.tier {
808 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
809 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
810 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
811 };
812
813 let created_at_str = chunk.created_at.to_rfc3339();
814 let metadata_str = chunk
815 .metadata
816 .as_ref()
817 .map(|m| m.to_string())
818 .unwrap_or_default();
819
820 match chunk.tier {
822 MemoryTier::Session => {
823 conn.execute(
824 &format!(
825 "INSERT INTO {} (
826 id, content, session_id, project_id, source, created_at, token_count, metadata,
827 source_path, source_mtime, source_size, source_hash
828 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
829 chunks_table
830 ),
831 params![
832 chunk.id,
833 chunk.content,
834 chunk.session_id.as_ref().unwrap_or(&String::new()),
835 chunk.project_id,
836 chunk.source,
837 created_at_str,
838 chunk.token_count,
839 metadata_str,
840 chunk.source_path.clone(),
841 chunk.source_mtime,
842 chunk.source_size,
843 chunk.source_hash.clone()
844 ],
845 )?;
846 }
847 MemoryTier::Project => {
848 conn.execute(
849 &format!(
850 "INSERT INTO {} (
851 id, content, project_id, session_id, source, created_at, token_count, metadata,
852 source_path, source_mtime, source_size, source_hash
853 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
854 chunks_table
855 ),
856 params![
857 chunk.id,
858 chunk.content,
859 chunk.project_id.as_ref().unwrap_or(&String::new()),
860 chunk.session_id,
861 chunk.source,
862 created_at_str,
863 chunk.token_count,
864 metadata_str,
865 chunk.source_path.clone(),
866 chunk.source_mtime,
867 chunk.source_size,
868 chunk.source_hash.clone()
869 ],
870 )?;
871 }
872 MemoryTier::Global => {
873 conn.execute(
874 &format!(
875 "INSERT INTO {} (
876 id, content, source, created_at, token_count, metadata,
877 source_path, source_mtime, source_size, source_hash
878 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
879 chunks_table
880 ),
881 params![
882 chunk.id,
883 chunk.content,
884 chunk.source,
885 created_at_str,
886 chunk.token_count,
887 metadata_str,
888 chunk.source_path.clone(),
889 chunk.source_mtime,
890 chunk.source_size,
891 chunk.source_hash.clone()
892 ],
893 )?;
894 }
895 }
896
897 let embedding_json = format!(
899 "[{}]",
900 embedding
901 .iter()
902 .map(|f| f.to_string())
903 .collect::<Vec<_>>()
904 .join(",")
905 );
906 conn.execute(
907 &format!(
908 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
909 vectors_table
910 ),
911 params![chunk.id, embedding_json],
912 )?;
913
914 Ok(())
915 }
916
917 pub async fn search_similar(
919 &self,
920 query_embedding: &[f32],
921 tier: MemoryTier,
922 project_id: Option<&str>,
923 session_id: Option<&str>,
924 limit: i64,
925 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
926 let conn = self.conn.lock().await;
927
928 let (chunks_table, vectors_table) = match tier {
929 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
930 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
931 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
932 };
933
934 let embedding_json = format!(
935 "[{}]",
936 query_embedding
937 .iter()
938 .map(|f| f.to_string())
939 .collect::<Vec<_>>()
940 .join(",")
941 );
942
943 let results = match tier {
945 MemoryTier::Session => {
946 if let Some(sid) = session_id {
947 let sql = format!(
948 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
949 c.source_path, c.source_mtime, c.source_size, c.source_hash,
950 v.distance
951 FROM {} AS v
952 JOIN {} AS c ON v.chunk_id = c.id
953 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
954 ORDER BY v.distance",
955 vectors_table, chunks_table
956 );
957 let mut stmt = conn.prepare(&sql)?;
958 let results = stmt
959 .query_map(params![sid, embedding_json, limit], |row| {
960 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
961 })?
962 .collect::<Result<Vec<_>, _>>()?;
963 results
964 } else if let Some(pid) = project_id {
965 let sql = format!(
966 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
967 c.source_path, c.source_mtime, c.source_size, c.source_hash,
968 v.distance
969 FROM {} AS v
970 JOIN {} AS c ON v.chunk_id = c.id
971 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
972 ORDER BY v.distance",
973 vectors_table, chunks_table
974 );
975 let mut stmt = conn.prepare(&sql)?;
976 let results = stmt
977 .query_map(params![pid, embedding_json, limit], |row| {
978 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
979 })?
980 .collect::<Result<Vec<_>, _>>()?;
981 results
982 } else {
983 let sql = format!(
984 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
985 c.source_path, c.source_mtime, c.source_size, c.source_hash,
986 v.distance
987 FROM {} AS v
988 JOIN {} AS c ON v.chunk_id = c.id
989 WHERE v.embedding MATCH ?1 AND k = ?2
990 ORDER BY v.distance",
991 vectors_table, chunks_table
992 );
993 let mut stmt = conn.prepare(&sql)?;
994 let results = stmt
995 .query_map(params![embedding_json, limit], |row| {
996 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
997 })?
998 .collect::<Result<Vec<_>, _>>()?;
999 results
1000 }
1001 }
1002 MemoryTier::Project => {
1003 if let Some(pid) = project_id {
1004 let sql = format!(
1005 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
1006 c.source_path, c.source_mtime, c.source_size, c.source_hash,
1007 v.distance
1008 FROM {} AS v
1009 JOIN {} AS c ON v.chunk_id = c.id
1010 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
1011 ORDER BY v.distance",
1012 vectors_table, chunks_table
1013 );
1014 let mut stmt = conn.prepare(&sql)?;
1015 let results = stmt
1016 .query_map(params![pid, embedding_json, limit], |row| {
1017 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1018 })?
1019 .collect::<Result<Vec<_>, _>>()?;
1020 results
1021 } else {
1022 let sql = format!(
1023 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
1024 c.source_path, c.source_mtime, c.source_size, c.source_hash,
1025 v.distance
1026 FROM {} AS v
1027 JOIN {} AS c ON v.chunk_id = c.id
1028 WHERE v.embedding MATCH ?1 AND k = ?2
1029 ORDER BY v.distance",
1030 vectors_table, chunks_table
1031 );
1032 let mut stmt = conn.prepare(&sql)?;
1033 let results = stmt
1034 .query_map(params![embedding_json, limit], |row| {
1035 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1036 })?
1037 .collect::<Result<Vec<_>, _>>()?;
1038 results
1039 }
1040 }
1041 MemoryTier::Global => {
1042 let sql = format!(
1043 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
1044 c.source_path, c.source_mtime, c.source_size, c.source_hash,
1045 v.distance
1046 FROM {} AS v
1047 JOIN {} AS c ON v.chunk_id = c.id
1048 WHERE v.embedding MATCH ?1 AND k = ?2
1049 ORDER BY v.distance",
1050 vectors_table, chunks_table
1051 );
1052 let mut stmt = conn.prepare(&sql)?;
1053 let results = stmt
1054 .query_map(params![embedding_json, limit], |row| {
1055 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1056 })?
1057 .collect::<Result<Vec<_>, _>>()?;
1058 results
1059 }
1060 };
1061
1062 Ok(results)
1063 }
1064
1065 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1067 let conn = self.conn.lock().await;
1068
1069 let mut stmt = conn.prepare(
1070 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1071 source_path, source_mtime, source_size, source_hash
1072 FROM session_memory_chunks
1073 WHERE session_id = ?1
1074 ORDER BY created_at DESC",
1075 )?;
1076
1077 let chunks = stmt
1078 .query_map(params![session_id], |row| {
1079 row_to_chunk(row, MemoryTier::Session)
1080 })?
1081 .collect::<Result<Vec<_>, _>>()?;
1082
1083 Ok(chunks)
1084 }
1085
1086 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1088 let conn = self.conn.lock().await;
1089
1090 let mut stmt = conn.prepare(
1091 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1092 source_path, source_mtime, source_size, source_hash
1093 FROM project_memory_chunks
1094 WHERE project_id = ?1
1095 ORDER BY created_at DESC",
1096 )?;
1097
1098 let chunks = stmt
1099 .query_map(params![project_id], |row| {
1100 row_to_chunk(row, MemoryTier::Project)
1101 })?
1102 .collect::<Result<Vec<_>, _>>()?;
1103
1104 Ok(chunks)
1105 }
1106
1107 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
1109 let conn = self.conn.lock().await;
1110
1111 let mut stmt = conn.prepare(
1112 "SELECT id, content, source, created_at, token_count, metadata,
1113 source_path, source_mtime, source_size, source_hash
1114 FROM global_memory_chunks
1115 ORDER BY created_at DESC
1116 LIMIT ?1",
1117 )?;
1118
1119 let chunks = stmt
1120 .query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
1121 .collect::<Result<Vec<_>, _>>()?;
1122
1123 Ok(chunks)
1124 }
1125
1126 pub async fn global_chunk_exists_by_source_hash(
1127 &self,
1128 source_hash: &str,
1129 ) -> MemoryResult<bool> {
1130 let conn = self.conn.lock().await;
1131 let exists = conn
1132 .query_row(
1133 "SELECT 1 FROM global_memory_chunks WHERE source_hash = ?1 LIMIT 1",
1134 params![source_hash],
1135 |_row| Ok(()),
1136 )
1137 .optional()?
1138 .is_some();
1139 Ok(exists)
1140 }
1141
1142 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
1144 let conn = self.conn.lock().await;
1145
1146 let count: i64 = conn.query_row(
1148 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
1149 params![session_id],
1150 |row| row.get(0),
1151 )?;
1152
1153 conn.execute(
1155 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1156 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
1157 params![session_id],
1158 )?;
1159
1160 conn.execute(
1162 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
1163 params![session_id],
1164 )?;
1165
1166 Ok(count as u64)
1167 }
1168
1169 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
1171 let conn = self.conn.lock().await;
1172
1173 let count: i64 = conn.query_row(
1175 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1176 params![project_id],
1177 |row| row.get(0),
1178 )?;
1179
1180 conn.execute(
1182 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1183 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
1184 params![project_id],
1185 )?;
1186
1187 conn.execute(
1189 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
1190 params![project_id],
1191 )?;
1192
1193 Ok(count as u64)
1194 }
1195
1196 pub async fn clear_global_memory_by_source_prefix(
1198 &self,
1199 source_prefix: &str,
1200 ) -> MemoryResult<u64> {
1201 let conn = self.conn.lock().await;
1202 let like = format!("{}%", source_prefix);
1203
1204 let count: i64 = conn.query_row(
1205 "SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
1206 params![like],
1207 |row| row.get(0),
1208 )?;
1209
1210 conn.execute(
1211 "DELETE FROM global_memory_vectors WHERE chunk_id IN
1212 (SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
1213 params![like],
1214 )?;
1215
1216 conn.execute(
1217 "DELETE FROM global_memory_chunks WHERE source LIKE ?1",
1218 params![like],
1219 )?;
1220
1221 Ok(count as u64)
1222 }
1223
1224 pub async fn delete_chunk(
1226 &self,
1227 tier: MemoryTier,
1228 chunk_id: &str,
1229 project_id: Option<&str>,
1230 session_id: Option<&str>,
1231 ) -> MemoryResult<u64> {
1232 let conn = self.conn.lock().await;
1233
1234 let deleted = match tier {
1235 MemoryTier::Session => {
1236 let Some(session_id) = session_id else {
1237 return Err(MemoryError::InvalidConfig(
1238 "session_id is required to delete session memory chunks".to_string(),
1239 ));
1240 };
1241 conn.execute(
1242 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1243 (SELECT id FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2)",
1244 params![chunk_id, session_id],
1245 )?;
1246 conn.execute(
1247 "DELETE FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2",
1248 params![chunk_id, session_id],
1249 )?
1250 }
1251 MemoryTier::Project => {
1252 let Some(project_id) = project_id else {
1253 return Err(MemoryError::InvalidConfig(
1254 "project_id is required to delete project memory chunks".to_string(),
1255 ));
1256 };
1257 conn.execute(
1258 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1259 (SELECT id FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2)",
1260 params![chunk_id, project_id],
1261 )?;
1262 conn.execute(
1263 "DELETE FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2",
1264 params![chunk_id, project_id],
1265 )?
1266 }
1267 MemoryTier::Global => {
1268 conn.execute(
1269 "DELETE FROM global_memory_vectors WHERE chunk_id IN
1270 (SELECT id FROM global_memory_chunks WHERE id = ?1)",
1271 params![chunk_id],
1272 )?;
1273 conn.execute(
1274 "DELETE FROM global_memory_chunks WHERE id = ?1",
1275 params![chunk_id],
1276 )?
1277 }
1278 };
1279
1280 Ok(deleted as u64)
1281 }
1282
1283 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
1285 let conn = self.conn.lock().await;
1286
1287 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
1288 let cutoff_str = cutoff.to_rfc3339();
1289
1290 let count: i64 = conn.query_row(
1292 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
1293 params![cutoff_str],
1294 |row| row.get(0),
1295 )?;
1296
1297 conn.execute(
1299 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1300 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
1301 params![cutoff_str],
1302 )?;
1303
1304 conn.execute(
1306 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1307 params![cutoff_str],
1308 )?;
1309
1310 Ok(count as u64)
1311 }
1312
1313 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1315 let conn = self.conn.lock().await;
1316
1317 let result: Option<MemoryConfig> = conn
1318 .query_row(
1319 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
1320 session_retention_days, token_budget, chunk_overlap
1321 FROM memory_config WHERE project_id = ?1",
1322 params![project_id],
1323 |row| {
1324 Ok(MemoryConfig {
1325 max_chunks: row.get(0)?,
1326 chunk_size: row.get(1)?,
1327 retrieval_k: row.get(2)?,
1328 auto_cleanup: row.get::<_, i64>(3)? != 0,
1329 session_retention_days: row.get(4)?,
1330 token_budget: row.get(5)?,
1331 chunk_overlap: row.get(6)?,
1332 })
1333 },
1334 )
1335 .optional()?;
1336
1337 match result {
1338 Some(config) => Ok(config),
1339 None => {
1340 let config = MemoryConfig::default();
1342 let updated_at = Utc::now().to_rfc3339();
1343
1344 conn.execute(
1345 "INSERT INTO memory_config
1346 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1347 session_retention_days, token_budget, chunk_overlap, updated_at)
1348 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1349 params![
1350 project_id,
1351 config.max_chunks,
1352 config.chunk_size,
1353 config.retrieval_k,
1354 config.auto_cleanup as i64,
1355 config.session_retention_days,
1356 config.token_budget,
1357 config.chunk_overlap,
1358 updated_at
1359 ],
1360 )?;
1361
1362 Ok(config)
1363 }
1364 }
1365 }
1366
1367 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1369 let conn = self.conn.lock().await;
1370
1371 let updated_at = Utc::now().to_rfc3339();
1372
1373 conn.execute(
1374 "INSERT OR REPLACE INTO memory_config
1375 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1376 session_retention_days, token_budget, chunk_overlap, updated_at)
1377 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1378 params![
1379 project_id,
1380 config.max_chunks,
1381 config.chunk_size,
1382 config.retrieval_k,
1383 config.auto_cleanup as i64,
1384 config.session_retention_days,
1385 config.token_budget,
1386 config.chunk_overlap,
1387 updated_at
1388 ],
1389 )?;
1390
1391 Ok(())
1392 }
1393
1394 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
1396 let conn = self.conn.lock().await;
1397 conn.execute(
1398 "INSERT OR REPLACE INTO knowledge_spaces
1399 (id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms)
1400 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1401 params![
1402 space.id,
1403 space.scope.to_string(),
1404 space.project_id,
1405 space.namespace,
1406 space.title,
1407 space.description,
1408 space.trust_level.to_string(),
1409 space.metadata.as_ref().map(|value| value.to_string()),
1410 space.created_at_ms as i64,
1411 space.updated_at_ms as i64,
1412 ],
1413 )?;
1414 Ok(())
1415 }
1416
1417 pub async fn get_knowledge_space(
1419 &self,
1420 id: &str,
1421 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
1422 let conn = self.conn.lock().await;
1423 Ok(
1424 conn.query_row(
1425 "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1426 FROM knowledge_spaces WHERE id = ?1",
1427 params![id],
1428 row_to_knowledge_space,
1429 )
1430 .optional()?,
1431 )
1432 }
1433
1434 pub async fn list_knowledge_spaces(
1436 &self,
1437 project_id: Option<&str>,
1438 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
1439 let conn = self.conn.lock().await;
1440 let mut stmt = if project_id.is_some() {
1441 conn.prepare(
1442 "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1443 FROM knowledge_spaces WHERE project_id = ?1 ORDER BY updated_at_ms DESC",
1444 )?
1445 } else {
1446 conn.prepare(
1447 "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1448 FROM knowledge_spaces ORDER BY updated_at_ms DESC",
1449 )?
1450 };
1451 let rows = if let Some(project_id) = project_id {
1452 stmt.query_map(params![project_id], row_to_knowledge_space)?
1453 } else {
1454 stmt.query_map([], row_to_knowledge_space)?
1455 };
1456 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1457 }
1458
1459 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
1461 let conn = self.conn.lock().await;
1462 conn.execute(
1463 "INSERT OR REPLACE INTO knowledge_items
1464 (id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
1465 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
1466 params![
1467 item.id,
1468 item.space_id,
1469 item.coverage_key,
1470 item.dedupe_key,
1471 item.item_type,
1472 item.title,
1473 item.summary,
1474 item.payload.to_string(),
1475 item.trust_level.to_string(),
1476 item.status.to_string(),
1477 item.run_id,
1478 serde_json::to_string(&item.artifact_refs)?,
1479 serde_json::to_string(&item.source_memory_ids)?,
1480 item.freshness_expires_at_ms.map(|value| value as i64),
1481 item.metadata.as_ref().map(|value| value.to_string()),
1482 item.created_at_ms as i64,
1483 item.updated_at_ms as i64,
1484 ],
1485 )?;
1486 Ok(())
1487 }
1488
1489 pub async fn list_knowledge_items(
1491 &self,
1492 space_id: &str,
1493 coverage_key: Option<&str>,
1494 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
1495 let conn = self.conn.lock().await;
1496 let mut stmt = if coverage_key.is_some() {
1497 conn.prepare(
1498 "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1499 FROM knowledge_items WHERE space_id = ?1 AND coverage_key = ?2 ORDER BY created_at_ms DESC",
1500 )?
1501 } else {
1502 conn.prepare(
1503 "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1504 FROM knowledge_items WHERE space_id = ?1 ORDER BY created_at_ms DESC",
1505 )?
1506 };
1507 let rows = if let Some(coverage_key) = coverage_key {
1508 stmt.query_map(params![space_id, coverage_key], row_to_knowledge_item)?
1509 } else {
1510 stmt.query_map(params![space_id], row_to_knowledge_item)?
1511 };
1512 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1513 }
1514
1515 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
1517 let conn = self.conn.lock().await;
1518 Ok(
1519 conn.query_row(
1520 "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1521 FROM knowledge_items WHERE id = ?1",
1522 params![id],
1523 row_to_knowledge_item,
1524 )
1525 .optional()?,
1526 )
1527 }
1528
1529 pub async fn promote_knowledge_item(
1531 &self,
1532 request: &KnowledgePromotionRequest,
1533 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
1534 let mut conn = self.conn.lock().await;
1535 let tx = conn.transaction()?;
1536
1537 let Some(mut item) = tx
1538 .query_row(
1539 "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1540 FROM knowledge_items WHERE id = ?1",
1541 params![request.item_id],
1542 row_to_knowledge_item,
1543 )
1544 .optional()? else {
1545 return Ok(None);
1546 };
1547
1548 let previous_status = item.status;
1549 let previous_trust_level = item.trust_level;
1550
1551 if previous_status == KnowledgeItemStatus::Deprecated
1552 && request.target_status != KnowledgeItemStatus::Deprecated
1553 {
1554 return Err(crate::types::MemoryError::InvalidConfig(
1555 "cannot promote a deprecated knowledge item".to_string(),
1556 ));
1557 }
1558
1559 let next_status = request.target_status;
1560 match (previous_status, next_status) {
1561 (KnowledgeItemStatus::Working, KnowledgeItemStatus::Promoted)
1562 | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Promoted)
1563 | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::ApprovedDefault)
1564 | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::ApprovedDefault)
1565 | (KnowledgeItemStatus::Working, KnowledgeItemStatus::Deprecated)
1566 | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Deprecated)
1567 | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Deprecated) => {}
1568 (KnowledgeItemStatus::Working, KnowledgeItemStatus::ApprovedDefault) => {
1569 return Err(crate::types::MemoryError::InvalidConfig(
1570 "approved_default requires an intermediate promoted item".to_string(),
1571 ));
1572 }
1573 (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Promoted) => {
1574 return Err(crate::types::MemoryError::InvalidConfig(
1575 "approved_default items do not downgrade back to promoted".to_string(),
1576 ));
1577 }
1578 (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Working)
1579 | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Working) => {
1580 return Err(crate::types::MemoryError::InvalidConfig(
1581 "knowledge items cannot be demoted back to working".to_string(),
1582 ));
1583 }
1584 (KnowledgeItemStatus::Working, KnowledgeItemStatus::Working) => {}
1585 (KnowledgeItemStatus::Deprecated, _) => {}
1586 }
1587
1588 if next_status == KnowledgeItemStatus::ApprovedDefault
1589 && (request.reviewer_id.is_none() || request.approval_id.is_none())
1590 {
1591 return Err(crate::types::MemoryError::InvalidConfig(
1592 "approved_default promotion requires reviewer_id and approval_id".to_string(),
1593 ));
1594 }
1595
1596 let promoted = next_status.is_active()
1597 && (previous_status != next_status || request.freshness_expires_at_ms.is_some());
1598
1599 let mut metadata_obj = item
1600 .metadata
1601 .clone()
1602 .and_then(|value| value.as_object().cloned())
1603 .unwrap_or_default();
1604 metadata_obj.insert(
1605 "promotion".to_string(),
1606 serde_json::json!({
1607 "from_status": previous_status.to_string(),
1608 "to_status": next_status.to_string(),
1609 "promoted_at_ms": request.promoted_at_ms,
1610 "reason": request.reason,
1611 "reviewer_id": request.reviewer_id,
1612 "approval_id": request.approval_id,
1613 "freshness_expires_at_ms": request.freshness_expires_at_ms,
1614 }),
1615 );
1616
1617 item.status = next_status;
1618 if let Some(next_trust) = next_status.as_trust_level() {
1619 item.trust_level = next_trust;
1620 }
1621 if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
1622 item.freshness_expires_at_ms = Some(freshness_expires_at_ms);
1623 }
1624 item.metadata = Some(serde_json::Value::Object(metadata_obj));
1625 item.updated_at_ms = request.promoted_at_ms;
1626 let persisted_item = item.clone();
1627 let item_id = persisted_item.id.clone();
1628 let space_id = persisted_item.space_id.clone();
1629 let coverage_key = persisted_item.coverage_key.clone();
1630 let dedupe_key = persisted_item.dedupe_key.clone();
1631
1632 tx.execute(
1633 "INSERT OR REPLACE INTO knowledge_items
1634 (id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
1635 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
1636 params![
1637 persisted_item.id,
1638 persisted_item.space_id,
1639 persisted_item.coverage_key,
1640 persisted_item.dedupe_key,
1641 persisted_item.item_type,
1642 persisted_item.title,
1643 persisted_item.summary,
1644 persisted_item.payload.to_string(),
1645 persisted_item.trust_level.to_string(),
1646 persisted_item.status.to_string(),
1647 persisted_item.run_id,
1648 serde_json::to_string(&persisted_item.artifact_refs)?,
1649 serde_json::to_string(&persisted_item.source_memory_ids)?,
1650 persisted_item.freshness_expires_at_ms.map(|value| value as i64),
1651 persisted_item.metadata.as_ref().map(|value| value.to_string()),
1652 persisted_item.created_at_ms as i64,
1653 persisted_item.updated_at_ms as i64,
1654 ],
1655 )?;
1656
1657 let mut coverage = tx
1658 .query_row(
1659 "SELECT coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata
1660 FROM knowledge_coverage WHERE coverage_key = ?1 AND space_id = ?2",
1661 params![coverage_key.as_str(), space_id.as_str()],
1662 row_to_knowledge_coverage,
1663 )
1664 .optional()?
1665 .unwrap_or(KnowledgeCoverageRecord {
1666 coverage_key: coverage_key.clone(),
1667 space_id: space_id.clone(),
1668 latest_item_id: None,
1669 latest_dedupe_key: None,
1670 last_seen_at_ms: request.promoted_at_ms,
1671 last_promoted_at_ms: None,
1672 freshness_expires_at_ms: None,
1673 metadata: None,
1674 });
1675 coverage.latest_item_id = Some(item_id.clone());
1676 coverage.latest_dedupe_key = Some(dedupe_key.clone());
1677 coverage.last_seen_at_ms = request.promoted_at_ms;
1678 if next_status.is_active() {
1679 coverage.last_promoted_at_ms = Some(request.promoted_at_ms);
1680 }
1681 if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
1682 coverage.freshness_expires_at_ms = Some(freshness_expires_at_ms);
1683 }
1684 let mut coverage_metadata = coverage
1685 .metadata
1686 .clone()
1687 .and_then(|value| value.as_object().cloned())
1688 .unwrap_or_default();
1689 coverage_metadata.insert(
1690 "promotion".to_string(),
1691 serde_json::json!({
1692 "item_id": item_id,
1693 "from_status": previous_status.to_string(),
1694 "to_status": next_status.to_string(),
1695 "promoted_at_ms": request.promoted_at_ms,
1696 "reason": request.reason,
1697 "reviewer_id": request.reviewer_id,
1698 "approval_id": request.approval_id,
1699 }),
1700 );
1701 coverage.metadata = Some(serde_json::Value::Object(coverage_metadata));
1702
1703 tx.execute(
1704 "INSERT OR REPLACE INTO knowledge_coverage
1705 (coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
1706 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1707 params![
1708 coverage.coverage_key,
1709 coverage.space_id,
1710 coverage.latest_item_id,
1711 coverage.latest_dedupe_key,
1712 coverage.last_seen_at_ms as i64,
1713 coverage.last_promoted_at_ms.map(|value| value as i64),
1714 coverage.freshness_expires_at_ms.map(|value| value as i64),
1715 coverage.metadata.as_ref().map(|value| value.to_string()),
1716 ],
1717 )?;
1718
1719 tx.commit()?;
1720 Ok(Some(KnowledgePromotionResult {
1721 previous_status,
1722 previous_trust_level,
1723 promoted,
1724 item: persisted_item,
1725 coverage,
1726 }))
1727 }
1728
1729 pub async fn upsert_knowledge_coverage(
1731 &self,
1732 coverage: &KnowledgeCoverageRecord,
1733 ) -> MemoryResult<()> {
1734 let conn = self.conn.lock().await;
1735 conn.execute(
1736 "INSERT OR REPLACE INTO knowledge_coverage
1737 (coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
1738 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1739 params![
1740 coverage.coverage_key,
1741 coverage.space_id,
1742 coverage.latest_item_id,
1743 coverage.latest_dedupe_key,
1744 coverage.last_seen_at_ms as i64,
1745 coverage.last_promoted_at_ms.map(|value| value as i64),
1746 coverage.freshness_expires_at_ms.map(|value| value as i64),
1747 coverage.metadata.as_ref().map(|value| value.to_string()),
1748 ],
1749 )?;
1750 Ok(())
1751 }
1752
1753 pub async fn get_knowledge_coverage(
1755 &self,
1756 coverage_key: &str,
1757 space_id: &str,
1758 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
1759 let conn = self.conn.lock().await;
1760 Ok(
1761 conn.query_row(
1762 "SELECT coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata
1763 FROM knowledge_coverage WHERE coverage_key = ?1 AND space_id = ?2",
1764 params![coverage_key, space_id],
1765 row_to_knowledge_coverage,
1766 )
1767 .optional()?,
1768 )
1769 }
1770
1771 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1773 let conn = self.conn.lock().await;
1774
1775 let session_chunks: i64 =
1777 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1778 row.get(0)
1779 })?;
1780
1781 let project_chunks: i64 =
1782 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1783 row.get(0)
1784 })?;
1785
1786 let global_chunks: i64 =
1787 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1788 row.get(0)
1789 })?;
1790
1791 let session_bytes: i64 = conn.query_row(
1793 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1794 [],
1795 |row| row.get(0),
1796 )?;
1797
1798 let project_bytes: i64 = conn.query_row(
1799 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1800 [],
1801 |row| row.get(0),
1802 )?;
1803
1804 let global_bytes: i64 = conn.query_row(
1805 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1806 [],
1807 |row| row.get(0),
1808 )?;
1809
1810 let last_cleanup: Option<String> = conn
1812 .query_row(
1813 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1814 [],
1815 |row| row.get(0),
1816 )
1817 .optional()?;
1818
1819 let last_cleanup = last_cleanup.and_then(|s| {
1820 DateTime::parse_from_rfc3339(&s)
1821 .ok()
1822 .map(|dt| dt.with_timezone(&Utc))
1823 });
1824
1825 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1827
1828 Ok(MemoryStats {
1829 total_chunks: session_chunks + project_chunks + global_chunks,
1830 session_chunks,
1831 project_chunks,
1832 global_chunks,
1833 total_bytes: session_bytes + project_bytes + global_bytes,
1834 session_bytes,
1835 project_bytes,
1836 global_bytes,
1837 file_size,
1838 last_cleanup,
1839 })
1840 }
1841
1842 pub async fn log_cleanup(
1844 &self,
1845 cleanup_type: &str,
1846 tier: MemoryTier,
1847 project_id: Option<&str>,
1848 session_id: Option<&str>,
1849 chunks_deleted: i64,
1850 bytes_reclaimed: i64,
1851 ) -> MemoryResult<()> {
1852 let conn = self.conn.lock().await;
1853
1854 let id = uuid::Uuid::new_v4().to_string();
1855 let created_at = Utc::now().to_rfc3339();
1856
1857 conn.execute(
1858 "INSERT INTO memory_cleanup_log
1859 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1860 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1861 params![
1862 id,
1863 cleanup_type,
1864 tier.to_string(),
1865 project_id,
1866 session_id,
1867 chunks_deleted,
1868 bytes_reclaimed,
1869 created_at
1870 ],
1871 )?;
1872
1873 Ok(())
1874 }
1875
1876 pub async fn vacuum(&self) -> MemoryResult<()> {
1878 let conn = self.conn.lock().await;
1879 conn.execute("VACUUM", [])?;
1880 Ok(())
1881 }
1882
1883 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1888 let conn = self.conn.lock().await;
1889 let n: i64 = conn.query_row(
1890 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1891 params![project_id],
1892 |row| row.get(0),
1893 )?;
1894 Ok(n)
1895 }
1896
1897 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1898 let conn = self.conn.lock().await;
1899 let exists: Option<i64> = conn
1900 .query_row(
1901 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1902 params![project_id],
1903 |row| row.get(0),
1904 )
1905 .optional()?;
1906 Ok(exists.is_some())
1907 }
1908
1909 pub async fn get_file_index_entry(
1910 &self,
1911 project_id: &str,
1912 path: &str,
1913 ) -> MemoryResult<Option<(i64, i64, String)>> {
1914 let conn = self.conn.lock().await;
1915 let row: Option<(i64, i64, String)> = conn
1916 .query_row(
1917 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1918 params![project_id, path],
1919 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1920 )
1921 .optional()?;
1922 Ok(row)
1923 }
1924
1925 pub async fn upsert_file_index_entry(
1926 &self,
1927 project_id: &str,
1928 path: &str,
1929 mtime: i64,
1930 size: i64,
1931 hash: &str,
1932 ) -> MemoryResult<()> {
1933 let conn = self.conn.lock().await;
1934 let indexed_at = Utc::now().to_rfc3339();
1935 conn.execute(
1936 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1937 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1938 ON CONFLICT(project_id, path) DO UPDATE SET
1939 mtime = excluded.mtime,
1940 size = excluded.size,
1941 hash = excluded.hash,
1942 indexed_at = excluded.indexed_at",
1943 params![project_id, path, mtime, size, hash, indexed_at],
1944 )?;
1945 Ok(())
1946 }
1947
1948 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1949 let conn = self.conn.lock().await;
1950 conn.execute(
1951 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1952 params![project_id, path],
1953 )?;
1954 Ok(())
1955 }
1956
1957 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1958 let conn = self.conn.lock().await;
1959 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1960 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1961 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1962 }
1963
1964 pub async fn delete_project_file_chunks_by_path(
1965 &self,
1966 project_id: &str,
1967 source_path: &str,
1968 ) -> MemoryResult<(i64, i64)> {
1969 let conn = self.conn.lock().await;
1970
1971 let chunks_deleted: i64 = conn.query_row(
1972 "SELECT COUNT(*) FROM project_memory_chunks
1973 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1974 params![project_id, source_path],
1975 |row| row.get(0),
1976 )?;
1977
1978 let bytes_estimated: i64 = conn.query_row(
1979 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1980 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1981 params![project_id, source_path],
1982 |row| row.get(0),
1983 )?;
1984
1985 conn.execute(
1987 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1988 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1989 params![project_id, source_path],
1990 )?;
1991
1992 conn.execute(
1993 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1994 params![project_id, source_path],
1995 )?;
1996
1997 Ok((chunks_deleted, bytes_estimated))
1998 }
1999
2000 pub async fn get_import_index_entry(
2001 &self,
2002 tier: MemoryTier,
2003 session_id: Option<&str>,
2004 project_id: Option<&str>,
2005 path: &str,
2006 ) -> MemoryResult<Option<(i64, i64, String)>> {
2007 let conn = self.conn.lock().await;
2008 let row = match tier {
2009 MemoryTier::Session => {
2010 let session_id = require_scope_id(tier, session_id)?;
2011 conn.query_row(
2012 "SELECT mtime, size, hash FROM session_file_index WHERE session_id = ?1 AND path = ?2",
2013 params![session_id, path],
2014 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2015 )
2016 .optional()?
2017 }
2018 MemoryTier::Project => {
2019 let project_id = require_scope_id(tier, project_id)?;
2020 conn.query_row(
2021 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
2022 params![project_id, path],
2023 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2024 )
2025 .optional()?
2026 }
2027 MemoryTier::Global => conn
2028 .query_row(
2029 "SELECT mtime, size, hash FROM global_file_index WHERE path = ?1",
2030 params![path],
2031 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2032 )
2033 .optional()?,
2034 };
2035 Ok(row)
2036 }
2037
2038 pub async fn upsert_import_index_entry(
2039 &self,
2040 tier: MemoryTier,
2041 session_id: Option<&str>,
2042 project_id: Option<&str>,
2043 path: &str,
2044 mtime: i64,
2045 size: i64,
2046 hash: &str,
2047 ) -> MemoryResult<()> {
2048 let conn = self.conn.lock().await;
2049 let indexed_at = Utc::now().to_rfc3339();
2050 match tier {
2051 MemoryTier::Session => {
2052 let session_id = require_scope_id(tier, session_id)?;
2053 conn.execute(
2054 "INSERT INTO session_file_index (session_id, path, mtime, size, hash, indexed_at)
2055 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
2056 ON CONFLICT(session_id, path) DO UPDATE SET
2057 mtime = excluded.mtime,
2058 size = excluded.size,
2059 hash = excluded.hash,
2060 indexed_at = excluded.indexed_at",
2061 params![session_id, path, mtime, size, hash, indexed_at],
2062 )?;
2063 }
2064 MemoryTier::Project => {
2065 let project_id = require_scope_id(tier, project_id)?;
2066 conn.execute(
2067 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
2068 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
2069 ON CONFLICT(project_id, path) DO UPDATE SET
2070 mtime = excluded.mtime,
2071 size = excluded.size,
2072 hash = excluded.hash,
2073 indexed_at = excluded.indexed_at",
2074 params![project_id, path, mtime, size, hash, indexed_at],
2075 )?;
2076 }
2077 MemoryTier::Global => {
2078 conn.execute(
2079 "INSERT INTO global_file_index (path, mtime, size, hash, indexed_at)
2080 VALUES (?1, ?2, ?3, ?4, ?5)
2081 ON CONFLICT(path) DO UPDATE SET
2082 mtime = excluded.mtime,
2083 size = excluded.size,
2084 hash = excluded.hash,
2085 indexed_at = excluded.indexed_at",
2086 params![path, mtime, size, hash, indexed_at],
2087 )?;
2088 }
2089 }
2090 Ok(())
2091 }
2092
2093 pub async fn list_import_index_paths(
2094 &self,
2095 tier: MemoryTier,
2096 session_id: Option<&str>,
2097 project_id: Option<&str>,
2098 ) -> MemoryResult<Vec<String>> {
2099 let conn = self.conn.lock().await;
2100 let rows = match tier {
2101 MemoryTier::Session => {
2102 let session_id = require_scope_id(tier, session_id)?;
2103 let mut stmt =
2104 conn.prepare("SELECT path FROM session_file_index WHERE session_id = ?1")?;
2105 let rows = stmt.query_map(params![session_id], |row| row.get::<_, String>(0))?;
2106 rows.collect::<Result<Vec<_>, _>>()?
2107 }
2108 MemoryTier::Project => {
2109 let project_id = require_scope_id(tier, project_id)?;
2110 let mut stmt =
2111 conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
2112 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
2113 rows.collect::<Result<Vec<_>, _>>()?
2114 }
2115 MemoryTier::Global => {
2116 let mut stmt = conn.prepare("SELECT path FROM global_file_index")?;
2117 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
2118 rows.collect::<Result<Vec<_>, _>>()?
2119 }
2120 };
2121 Ok(rows)
2122 }
2123
2124 pub async fn delete_import_index_entry(
2125 &self,
2126 tier: MemoryTier,
2127 session_id: Option<&str>,
2128 project_id: Option<&str>,
2129 path: &str,
2130 ) -> MemoryResult<()> {
2131 let conn = self.conn.lock().await;
2132 match tier {
2133 MemoryTier::Session => {
2134 let session_id = require_scope_id(tier, session_id)?;
2135 conn.execute(
2136 "DELETE FROM session_file_index WHERE session_id = ?1 AND path = ?2",
2137 params![session_id, path],
2138 )?;
2139 }
2140 MemoryTier::Project => {
2141 let project_id = require_scope_id(tier, project_id)?;
2142 conn.execute(
2143 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
2144 params![project_id, path],
2145 )?;
2146 }
2147 MemoryTier::Global => {
2148 conn.execute(
2149 "DELETE FROM global_file_index WHERE path = ?1",
2150 params![path],
2151 )?;
2152 }
2153 }
2154 Ok(())
2155 }
2156
2157 pub async fn delete_file_chunks_by_path(
2158 &self,
2159 tier: MemoryTier,
2160 session_id: Option<&str>,
2161 project_id: Option<&str>,
2162 source_path: &str,
2163 ) -> MemoryResult<(i64, i64)> {
2164 let conn = self.conn.lock().await;
2165 let result = match tier {
2166 MemoryTier::Session => {
2167 let session_id = require_scope_id(tier, session_id)?;
2168 let chunks_deleted: i64 = conn.query_row(
2169 "SELECT COUNT(*) FROM session_memory_chunks
2170 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2171 params![session_id, source_path],
2172 |row| row.get(0),
2173 )?;
2174 let bytes_estimated: i64 = conn.query_row(
2175 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
2176 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2177 params![session_id, source_path],
2178 |row| row.get(0),
2179 )?;
2180 conn.execute(
2181 "DELETE FROM session_memory_vectors WHERE chunk_id IN
2182 (SELECT id FROM session_memory_chunks WHERE session_id = ?1 AND source = 'file' AND source_path = ?2)",
2183 params![session_id, source_path],
2184 )?;
2185 conn.execute(
2186 "DELETE FROM session_memory_chunks
2187 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2188 params![session_id, source_path],
2189 )?;
2190 (chunks_deleted, bytes_estimated)
2191 }
2192 MemoryTier::Project => {
2193 let project_id = require_scope_id(tier, project_id)?;
2194 let chunks_deleted: i64 = conn.query_row(
2195 "SELECT COUNT(*) FROM project_memory_chunks
2196 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2197 params![project_id, source_path],
2198 |row| row.get(0),
2199 )?;
2200 let bytes_estimated: i64 = conn.query_row(
2201 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
2202 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2203 params![project_id, source_path],
2204 |row| row.get(0),
2205 )?;
2206 conn.execute(
2207 "DELETE FROM project_memory_vectors WHERE chunk_id IN
2208 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
2209 params![project_id, source_path],
2210 )?;
2211 conn.execute(
2212 "DELETE FROM project_memory_chunks
2213 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2214 params![project_id, source_path],
2215 )?;
2216 (chunks_deleted, bytes_estimated)
2217 }
2218 MemoryTier::Global => {
2219 let chunks_deleted: i64 = conn.query_row(
2220 "SELECT COUNT(*) FROM global_memory_chunks
2221 WHERE source = 'file' AND source_path = ?1",
2222 params![source_path],
2223 |row| row.get(0),
2224 )?;
2225 let bytes_estimated: i64 = conn.query_row(
2226 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
2227 WHERE source = 'file' AND source_path = ?1",
2228 params![source_path],
2229 |row| row.get(0),
2230 )?;
2231 conn.execute(
2232 "DELETE FROM global_memory_vectors WHERE chunk_id IN
2233 (SELECT id FROM global_memory_chunks WHERE source = 'file' AND source_path = ?1)",
2234 params![source_path],
2235 )?;
2236 conn.execute(
2237 "DELETE FROM global_memory_chunks
2238 WHERE source = 'file' AND source_path = ?1",
2239 params![source_path],
2240 )?;
2241 (chunks_deleted, bytes_estimated)
2242 }
2243 };
2244 Ok(result)
2245 }
2246
2247 pub async fn upsert_project_index_status(
2248 &self,
2249 project_id: &str,
2250 total_files: i64,
2251 processed_files: i64,
2252 indexed_files: i64,
2253 skipped_files: i64,
2254 errors: i64,
2255 ) -> MemoryResult<()> {
2256 let conn = self.conn.lock().await;
2257 let last_indexed_at = Utc::now().to_rfc3339();
2258 conn.execute(
2259 "INSERT INTO project_index_status (
2260 project_id, last_indexed_at, last_total_files, last_processed_files,
2261 last_indexed_files, last_skipped_files, last_errors
2262 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
2263 ON CONFLICT(project_id) DO UPDATE SET
2264 last_indexed_at = excluded.last_indexed_at,
2265 last_total_files = excluded.last_total_files,
2266 last_processed_files = excluded.last_processed_files,
2267 last_indexed_files = excluded.last_indexed_files,
2268 last_skipped_files = excluded.last_skipped_files,
2269 last_errors = excluded.last_errors",
2270 params![
2271 project_id,
2272 last_indexed_at,
2273 total_files,
2274 processed_files,
2275 indexed_files,
2276 skipped_files,
2277 errors
2278 ],
2279 )?;
2280 Ok(())
2281 }
2282
2283 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
2284 let conn = self.conn.lock().await;
2285
2286 let project_chunks: i64 = conn.query_row(
2287 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
2288 params![project_id],
2289 |row| row.get(0),
2290 )?;
2291
2292 let project_bytes: i64 = conn.query_row(
2293 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
2294 params![project_id],
2295 |row| row.get(0),
2296 )?;
2297
2298 let file_index_chunks: i64 = conn.query_row(
2299 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2300 params![project_id],
2301 |row| row.get(0),
2302 )?;
2303
2304 let file_index_bytes: i64 = conn.query_row(
2305 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2306 params![project_id],
2307 |row| row.get(0),
2308 )?;
2309
2310 let indexed_files: i64 = conn.query_row(
2311 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
2312 params![project_id],
2313 |row| row.get(0),
2314 )?;
2315
2316 let status_row: Option<ProjectIndexStatusRow> =
2317 conn
2318 .query_row(
2319 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
2320 FROM project_index_status WHERE project_id = ?1",
2321 params![project_id],
2322 |row| {
2323 Ok((
2324 row.get(0)?,
2325 row.get(1)?,
2326 row.get(2)?,
2327 row.get(3)?,
2328 row.get(4)?,
2329 row.get(5)?,
2330 ))
2331 },
2332 )
2333 .optional()?;
2334
2335 let (
2336 last_indexed_at,
2337 last_total_files,
2338 last_processed_files,
2339 last_indexed_files,
2340 last_skipped_files,
2341 last_errors,
2342 ) = status_row.unwrap_or((None, None, None, None, None, None));
2343
2344 let last_indexed_at = last_indexed_at.and_then(|s| {
2345 DateTime::parse_from_rfc3339(&s)
2346 .ok()
2347 .map(|dt| dt.with_timezone(&Utc))
2348 });
2349
2350 Ok(ProjectMemoryStats {
2351 project_id: project_id.to_string(),
2352 project_chunks,
2353 project_bytes,
2354 file_index_chunks,
2355 file_index_bytes,
2356 indexed_files,
2357 last_indexed_at,
2358 last_total_files,
2359 last_processed_files,
2360 last_indexed_files,
2361 last_skipped_files,
2362 last_errors,
2363 })
2364 }
2365
2366 pub async fn clear_project_file_index(
2367 &self,
2368 project_id: &str,
2369 vacuum: bool,
2370 ) -> MemoryResult<ClearFileIndexResult> {
2371 let conn = self.conn.lock().await;
2372
2373 let chunks_deleted: i64 = conn.query_row(
2374 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2375 params![project_id],
2376 |row| row.get(0),
2377 )?;
2378
2379 let bytes_estimated: i64 = conn.query_row(
2380 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2381 params![project_id],
2382 |row| row.get(0),
2383 )?;
2384
2385 conn.execute(
2387 "DELETE FROM project_memory_vectors WHERE chunk_id IN
2388 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
2389 params![project_id],
2390 )?;
2391
2392 conn.execute(
2394 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2395 params![project_id],
2396 )?;
2397
2398 conn.execute(
2400 "DELETE FROM project_file_index WHERE project_id = ?1",
2401 params![project_id],
2402 )?;
2403 conn.execute(
2404 "DELETE FROM project_index_status WHERE project_id = ?1",
2405 params![project_id],
2406 )?;
2407
2408 drop(conn); if vacuum {
2411 self.vacuum().await?;
2412 }
2413
2414 Ok(ClearFileIndexResult {
2415 chunks_deleted,
2416 bytes_estimated,
2417 did_vacuum: vacuum,
2418 })
2419 }
2420
2421 pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
2433 if retention_days == 0 {
2434 return Ok(0);
2435 }
2436
2437 let conn = self.conn.lock().await;
2438
2439 let cutoff =
2441 (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
2442
2443 conn.execute(
2445 "DELETE FROM session_memory_vectors
2446 WHERE chunk_id IN (
2447 SELECT id FROM session_memory_chunks WHERE created_at < ?1
2448 )",
2449 params![cutoff],
2450 )?;
2451
2452 let deleted = conn.execute(
2453 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
2454 params![cutoff],
2455 )?;
2456
2457 if deleted > 0 {
2458 tracing::info!(
2459 retention_days,
2460 deleted,
2461 "memory hygiene: pruned old session chunks"
2462 );
2463 }
2464
2465 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2466 Ok(deleted as u64)
2467 }
2468
2469 pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
2475 let retention_days = if env_override_days > 0 {
2477 env_override_days
2478 } else {
2479 let conn = self.conn.lock().await;
2481 let days: Option<i64> = conn
2482 .query_row(
2483 "SELECT session_retention_days FROM memory_config
2484 WHERE project_id = '__global__' LIMIT 1",
2485 [],
2486 |row| row.get(0),
2487 )
2488 .ok();
2489 drop(conn);
2490 days.unwrap_or(30) as u32
2491 };
2492
2493 self.prune_old_session_chunks(retention_days).await
2494 }
2495
2496 pub async fn put_global_memory_record(
2497 &self,
2498 record: &GlobalMemoryRecord,
2499 ) -> MemoryResult<GlobalMemoryWriteResult> {
2500 let conn = self.conn.lock().await;
2501
2502 let existing: Option<String> = conn
2503 .query_row(
2504 "SELECT id FROM memory_records
2505 WHERE user_id = ?1
2506 AND source_type = ?2
2507 AND content_hash = ?3
2508 AND run_id = ?4
2509 AND IFNULL(session_id, '') = IFNULL(?5, '')
2510 AND IFNULL(message_id, '') = IFNULL(?6, '')
2511 AND IFNULL(tool_name, '') = IFNULL(?7, '')
2512 LIMIT 1",
2513 params![
2514 record.user_id,
2515 record.source_type,
2516 record.content_hash,
2517 record.run_id,
2518 record.session_id,
2519 record.message_id,
2520 record.tool_name
2521 ],
2522 |row| row.get(0),
2523 )
2524 .optional()?;
2525
2526 if let Some(id) = existing {
2527 return Ok(GlobalMemoryWriteResult {
2528 id,
2529 stored: false,
2530 deduped: true,
2531 });
2532 }
2533
2534 let metadata = record
2535 .metadata
2536 .as_ref()
2537 .map(ToString::to_string)
2538 .unwrap_or_default();
2539 let provenance = record
2540 .provenance
2541 .as_ref()
2542 .map(ToString::to_string)
2543 .unwrap_or_default();
2544 conn.execute(
2545 "INSERT INTO memory_records(
2546 id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
2547 project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
2548 visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
2549 ) VALUES (
2550 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
2551 ?10, ?11, ?12, ?13, ?14, ?15, ?16,
2552 ?17, ?18, ?19, ?20, ?21, ?22
2553 )",
2554 params![
2555 record.id,
2556 record.user_id,
2557 record.source_type,
2558 record.content,
2559 record.content_hash,
2560 record.run_id,
2561 record.session_id,
2562 record.message_id,
2563 record.tool_name,
2564 record.project_tag,
2565 record.channel_tag,
2566 record.host_tag,
2567 metadata,
2568 provenance,
2569 record.redaction_status,
2570 i64::from(record.redaction_count),
2571 record.visibility,
2572 if record.demoted { 1i64 } else { 0i64 },
2573 record.score_boost,
2574 record.created_at_ms as i64,
2575 record.updated_at_ms as i64,
2576 record.expires_at_ms.map(|v| v as i64),
2577 ],
2578 )?;
2579
2580 Ok(GlobalMemoryWriteResult {
2581 id: record.id.clone(),
2582 stored: true,
2583 deduped: false,
2584 })
2585 }
2586
2587 #[allow(clippy::too_many_arguments)]
2588 pub async fn search_global_memory(
2589 &self,
2590 user_id: &str,
2591 query: &str,
2592 limit: i64,
2593 project_tag: Option<&str>,
2594 channel_tag: Option<&str>,
2595 host_tag: Option<&str>,
2596 ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
2597 let conn = self.conn.lock().await;
2598 let now_ms = chrono::Utc::now().timestamp_millis();
2599 let mut hits = Vec::new();
2600
2601 let fts_query = build_fts_query(query);
2602 let search_limit = limit.clamp(1, 100);
2603 let maybe_rows = conn.prepare(
2604 "SELECT
2605 m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
2606 m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
2607 m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
2608 m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
2609 bm25(memory_records_fts) AS rank
2610 FROM memory_records_fts
2611 JOIN memory_records m ON m.id = memory_records_fts.id
2612 WHERE memory_records_fts MATCH ?1
2613 AND m.user_id = ?2
2614 AND m.demoted = 0
2615 AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
2616 AND (?4 IS NULL OR m.project_tag = ?4)
2617 AND (?5 IS NULL OR m.channel_tag = ?5)
2618 AND (?6 IS NULL OR m.host_tag = ?6)
2619 ORDER BY rank ASC
2620 LIMIT ?7"
2621 );
2622
2623 if let Ok(mut stmt) = maybe_rows {
2624 let rows = stmt.query_map(
2625 params![
2626 fts_query,
2627 user_id,
2628 now_ms,
2629 project_tag,
2630 channel_tag,
2631 host_tag,
2632 search_limit
2633 ],
2634 |row| {
2635 let record = row_to_global_record(row)?;
2636 let rank = row.get::<_, f64>(22)?;
2637 let score = 1.0 / (1.0 + rank.max(0.0));
2638 Ok(GlobalMemorySearchHit { record, score })
2639 },
2640 )?;
2641 for row in rows {
2642 hits.push(row?);
2643 }
2644 }
2645
2646 if !hits.is_empty() {
2647 return Ok(hits);
2648 }
2649
2650 let like = format!("%{}%", query.trim());
2651 let mut stmt = conn.prepare(
2652 "SELECT
2653 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2654 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2655 redaction_status, redaction_count, visibility, demoted, score_boost,
2656 created_at_ms, updated_at_ms, expires_at_ms
2657 FROM memory_records
2658 WHERE user_id = ?1
2659 AND demoted = 0
2660 AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
2661 AND (?3 IS NULL OR project_tag = ?3)
2662 AND (?4 IS NULL OR channel_tag = ?4)
2663 AND (?5 IS NULL OR host_tag = ?5)
2664 AND (?6 = '' OR content LIKE ?7)
2665 ORDER BY created_at_ms DESC
2666 LIMIT ?8",
2667 )?;
2668 let rows = stmt.query_map(
2669 params![
2670 user_id,
2671 now_ms,
2672 project_tag,
2673 channel_tag,
2674 host_tag,
2675 query.trim(),
2676 like,
2677 search_limit
2678 ],
2679 |row| {
2680 let record = row_to_global_record(row)?;
2681 Ok(GlobalMemorySearchHit {
2682 record,
2683 score: 0.25,
2684 })
2685 },
2686 )?;
2687 for row in rows {
2688 hits.push(row?);
2689 }
2690
2691 Ok(hits)
2692 }
2693
2694 pub async fn list_global_memory(
2695 &self,
2696 user_id: &str,
2697 q: Option<&str>,
2698 project_tag: Option<&str>,
2699 channel_tag: Option<&str>,
2700 limit: i64,
2701 offset: i64,
2702 ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
2703 let conn = self.conn.lock().await;
2704 let query = q.unwrap_or("").trim();
2705 let like = format!("%{}%", query);
2706 let mut stmt = conn.prepare(
2707 "SELECT
2708 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2709 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2710 redaction_status, redaction_count, visibility, demoted, score_boost,
2711 created_at_ms, updated_at_ms, expires_at_ms
2712 FROM memory_records
2713 WHERE user_id = ?1
2714 AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
2715 AND (?4 IS NULL OR project_tag = ?4)
2716 AND (?5 IS NULL OR channel_tag = ?5)
2717 ORDER BY created_at_ms DESC
2718 LIMIT ?6 OFFSET ?7",
2719 )?;
2720 let rows = stmt.query_map(
2721 params![
2722 user_id,
2723 query,
2724 like,
2725 project_tag,
2726 channel_tag,
2727 limit.clamp(1, 1000),
2728 offset.max(0)
2729 ],
2730 row_to_global_record,
2731 )?;
2732 let mut out = Vec::new();
2733 for row in rows {
2734 out.push(row?);
2735 }
2736 Ok(out)
2737 }
2738
2739 pub async fn set_global_memory_visibility(
2740 &self,
2741 id: &str,
2742 visibility: &str,
2743 demoted: bool,
2744 ) -> MemoryResult<bool> {
2745 let conn = self.conn.lock().await;
2746 let now_ms = chrono::Utc::now().timestamp_millis();
2747 let changed = conn.execute(
2748 "UPDATE memory_records
2749 SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
2750 WHERE id = ?1",
2751 params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
2752 )?;
2753 Ok(changed > 0)
2754 }
2755
2756 pub async fn update_global_memory_context(
2757 &self,
2758 id: &str,
2759 visibility: &str,
2760 demoted: bool,
2761 metadata: Option<&serde_json::Value>,
2762 provenance: Option<&serde_json::Value>,
2763 ) -> MemoryResult<bool> {
2764 let conn = self.conn.lock().await;
2765 let now_ms = chrono::Utc::now().timestamp_millis();
2766 let metadata = metadata.map(ToString::to_string).unwrap_or_default();
2767 let provenance = provenance.map(ToString::to_string).unwrap_or_default();
2768 let changed = conn.execute(
2769 "UPDATE memory_records
2770 SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
2771 WHERE id = ?1",
2772 params![
2773 id,
2774 visibility,
2775 if demoted { 1i64 } else { 0i64 },
2776 metadata,
2777 provenance,
2778 now_ms,
2779 ],
2780 )?;
2781 Ok(changed > 0)
2782 }
2783
2784 pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
2785 let conn = self.conn.lock().await;
2786 let mut stmt = conn.prepare(
2787 "SELECT
2788 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2789 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2790 redaction_status, redaction_count, visibility, demoted, score_boost,
2791 created_at_ms, updated_at_ms, expires_at_ms
2792 FROM memory_records
2793 WHERE id = ?1
2794 LIMIT 1",
2795 )?;
2796 let record = stmt
2797 .query_row(params![id], row_to_global_record)
2798 .optional()?;
2799 Ok(record)
2800 }
2801
2802 pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
2803 let conn = self.conn.lock().await;
2804 let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
2805 Ok(changed > 0)
2806 }
2807}
2808
2809fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
2811 let id: String = row.get(0)?;
2812 let content: String = row.get(1)?;
2813 let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
2814 match tier {
2815 MemoryTier::Session => (
2816 Some(row.get(2)?),
2817 row.get(3)?,
2818 4usize,
2819 5usize,
2820 6usize,
2821 7usize,
2822 ),
2823 MemoryTier::Project => (
2824 row.get(2)?,
2825 Some(row.get(3)?),
2826 4usize,
2827 5usize,
2828 6usize,
2829 7usize,
2830 ),
2831 MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
2832 };
2833
2834 let source: String = row.get(source_idx)?;
2835 let created_at_str: String = row.get(created_at_idx)?;
2836 let token_count: i64 = row.get(token_count_idx)?;
2837 let metadata_str: Option<String> = row.get(metadata_idx)?;
2838
2839 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
2840 .map_err(|e| {
2841 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
2842 })?
2843 .with_timezone(&Utc);
2844
2845 let metadata = metadata_str
2846 .filter(|s| !s.is_empty())
2847 .and_then(|s| serde_json::from_str(&s).ok());
2848
2849 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
2850 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
2851 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
2852 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
2853
2854 Ok(MemoryChunk {
2855 id,
2856 content,
2857 tier,
2858 session_id,
2859 project_id,
2860 source,
2861 source_path,
2862 source_mtime,
2863 source_size,
2864 source_hash,
2865 created_at,
2866 token_count,
2867 metadata,
2868 })
2869}
2870
2871fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
2872 scope
2873 .filter(|value| !value.trim().is_empty())
2874 .ok_or_else(|| {
2875 crate::types::MemoryError::InvalidConfig(match tier {
2876 MemoryTier::Session => "tier=session requires session_id".to_string(),
2877 MemoryTier::Project => "tier=project requires project_id".to_string(),
2878 MemoryTier::Global => "tier=global does not require a scope id".to_string(),
2879 })
2880 })
2881}
2882
2883fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
2884 let metadata_str: Option<String> = row.get(12)?;
2885 let provenance_str: Option<String> = row.get(13)?;
2886 Ok(GlobalMemoryRecord {
2887 id: row.get(0)?,
2888 user_id: row.get(1)?,
2889 source_type: row.get(2)?,
2890 content: row.get(3)?,
2891 content_hash: row.get(4)?,
2892 run_id: row.get(5)?,
2893 session_id: row.get(6)?,
2894 message_id: row.get(7)?,
2895 tool_name: row.get(8)?,
2896 project_tag: row.get(9)?,
2897 channel_tag: row.get(10)?,
2898 host_tag: row.get(11)?,
2899 metadata: metadata_str
2900 .filter(|s| !s.is_empty())
2901 .and_then(|s| serde_json::from_str(&s).ok()),
2902 provenance: provenance_str
2903 .filter(|s| !s.is_empty())
2904 .and_then(|s| serde_json::from_str(&s).ok()),
2905 redaction_status: row.get(14)?,
2906 redaction_count: row.get::<_, i64>(15)? as u32,
2907 visibility: row.get(16)?,
2908 demoted: row.get::<_, i64>(17)? != 0,
2909 score_boost: row.get(18)?,
2910 created_at_ms: row.get::<_, i64>(19)? as u64,
2911 updated_at_ms: row.get::<_, i64>(20)? as u64,
2912 expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
2913 })
2914}
2915
2916impl MemoryDatabase {
2917 pub async fn get_node_by_uri(
2918 &self,
2919 uri: &str,
2920 ) -> MemoryResult<Option<crate::types::MemoryNode>> {
2921 let conn = self.conn.lock().await;
2922 let mut stmt = conn.prepare(
2923 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2924 FROM memory_nodes WHERE uri = ?1",
2925 )?;
2926
2927 let result = stmt.query_row(params![uri], |row| {
2928 let node_type_str: String = row.get(3)?;
2929 let node_type = node_type_str
2930 .parse()
2931 .unwrap_or(crate::types::NodeType::File);
2932 let metadata_str: Option<String> = row.get(6)?;
2933 Ok(crate::types::MemoryNode {
2934 id: row.get(0)?,
2935 uri: row.get(1)?,
2936 parent_uri: row.get(2)?,
2937 node_type,
2938 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2939 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2940 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2941 })
2942 });
2943
2944 match result {
2945 Ok(node) => Ok(Some(node)),
2946 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2947 Err(e) => Err(MemoryError::Database(e)),
2948 }
2949 }
2950
2951 pub async fn create_node(
2952 &self,
2953 uri: &str,
2954 parent_uri: Option<&str>,
2955 node_type: crate::types::NodeType,
2956 metadata: Option<&serde_json::Value>,
2957 ) -> MemoryResult<String> {
2958 let id = uuid::Uuid::new_v4().to_string();
2959 let now = Utc::now().to_rfc3339();
2960 let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
2961
2962 let conn = self.conn.lock().await;
2963 conn.execute(
2964 "INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
2965 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2966 params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
2967 )?;
2968
2969 Ok(id)
2970 }
2971
2972 pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
2973 let conn = self.conn.lock().await;
2974 let mut stmt = conn.prepare(
2975 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2976 FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
2977 )?;
2978
2979 let rows = stmt.query_map(params![uri], |row| {
2980 let node_type_str: String = row.get(3)?;
2981 let node_type = node_type_str
2982 .parse()
2983 .unwrap_or(crate::types::NodeType::File);
2984 let metadata_str: Option<String> = row.get(6)?;
2985 Ok(crate::types::MemoryNode {
2986 id: row.get(0)?,
2987 uri: row.get(1)?,
2988 parent_uri: row.get(2)?,
2989 node_type,
2990 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2991 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2992 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2993 })
2994 })?;
2995
2996 rows.collect::<Result<Vec<_>, _>>()
2997 .map_err(MemoryError::Database)
2998 }
2999
3000 pub async fn get_layer(
3001 &self,
3002 node_id: &str,
3003 layer_type: crate::types::LayerType,
3004 ) -> MemoryResult<Option<crate::types::MemoryLayer>> {
3005 let conn = self.conn.lock().await;
3006 let mut stmt = conn.prepare(
3007 "SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
3008 FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
3009 )?;
3010
3011 let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
3012 let layer_type_str: String = row.get(2)?;
3013 let layer_type = layer_type_str
3014 .parse()
3015 .unwrap_or(crate::types::LayerType::L2);
3016 Ok(crate::types::MemoryLayer {
3017 id: row.get(0)?,
3018 node_id: row.get(1)?,
3019 layer_type,
3020 content: row.get(3)?,
3021 token_count: row.get(4)?,
3022 embedding_id: row.get(5)?,
3023 created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
3024 source_chunk_id: row.get(7)?,
3025 })
3026 });
3027
3028 match result {
3029 Ok(layer) => Ok(Some(layer)),
3030 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
3031 Err(e) => Err(MemoryError::Database(e)),
3032 }
3033 }
3034
3035 pub async fn create_layer(
3036 &self,
3037 node_id: &str,
3038 layer_type: crate::types::LayerType,
3039 content: &str,
3040 token_count: i64,
3041 source_chunk_id: Option<&str>,
3042 ) -> MemoryResult<String> {
3043 let id = uuid::Uuid::new_v4().to_string();
3044 let now = Utc::now().to_rfc3339();
3045
3046 let conn = self.conn.lock().await;
3047 conn.execute(
3048 "INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
3049 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
3050 params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
3051 )?;
3052
3053 Ok(id)
3054 }
3055
3056 pub async fn get_children_tree(
3057 &self,
3058 parent_uri: &str,
3059 max_depth: usize,
3060 ) -> MemoryResult<Vec<crate::types::TreeNode>> {
3061 if max_depth == 0 {
3062 return Ok(Vec::new());
3063 }
3064
3065 let children = self.list_directory(parent_uri).await?;
3066 let mut tree_nodes = Vec::new();
3067
3068 for child in children {
3069 let layer_summary = self.get_layer_summary(&child.id).await?;
3070
3071 let grandchildren = if child.node_type == crate::types::NodeType::Directory {
3072 Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
3073 } else {
3074 Vec::new()
3075 };
3076
3077 tree_nodes.push(crate::types::TreeNode {
3078 node: child,
3079 children: grandchildren,
3080 layer_summary,
3081 });
3082 }
3083
3084 Ok(tree_nodes)
3085 }
3086
3087 async fn get_layer_summary(
3088 &self,
3089 node_id: &str,
3090 ) -> MemoryResult<Option<crate::types::LayerSummary>> {
3091 let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
3092 let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
3093 let has_l2 = self
3094 .get_layer(node_id, crate::types::LayerType::L2)
3095 .await?
3096 .is_some();
3097
3098 if l0.is_none() && l1.is_none() && !has_l2 {
3099 return Ok(None);
3100 }
3101
3102 Ok(Some(crate::types::LayerSummary {
3103 l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
3104 l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
3105 has_l2,
3106 }))
3107 }
3108
3109 pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
3110 let conn = self.conn.lock().await;
3111 let count: i64 = conn.query_row(
3112 "SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
3113 params![uri],
3114 |row| row.get(0),
3115 )?;
3116 Ok(count > 0)
3117 }
3118}
3119
3120fn row_to_knowledge_space(row: &Row) -> Result<KnowledgeSpaceRecord, rusqlite::Error> {
3121 let scope = row
3122 .get::<_, String>(1)?
3123 .parse()
3124 .unwrap_or(tandem_orchestrator::KnowledgeScope::Project);
3125 let trust_level = row
3126 .get::<_, String>(6)?
3127 .parse()
3128 .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
3129 let metadata = row
3130 .get::<_, Option<String>>(7)?
3131 .and_then(|raw| serde_json::from_str(&raw).ok());
3132 Ok(KnowledgeSpaceRecord {
3133 id: row.get(0)?,
3134 scope,
3135 project_id: row.get(2)?,
3136 namespace: row.get(3)?,
3137 title: row.get(4)?,
3138 description: row.get(5)?,
3139 trust_level,
3140 metadata,
3141 created_at_ms: row.get::<_, i64>(8)? as u64,
3142 updated_at_ms: row.get::<_, i64>(9)? as u64,
3143 })
3144}
3145
3146fn row_to_knowledge_item(row: &Row) -> Result<KnowledgeItemRecord, rusqlite::Error> {
3147 let trust_level = row
3148 .get::<_, String>(8)?
3149 .parse()
3150 .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
3151 let status = row
3152 .get::<_, String>(9)?
3153 .parse()
3154 .unwrap_or(KnowledgeItemStatus::Working);
3155 let payload = row
3156 .get::<_, String>(7)
3157 .ok()
3158 .and_then(|raw| serde_json::from_str(&raw).ok())
3159 .unwrap_or(serde_json::Value::Null);
3160 let artifact_refs = row
3161 .get::<_, String>(11)
3162 .ok()
3163 .and_then(|raw| serde_json::from_str(&raw).ok())
3164 .unwrap_or_default();
3165 let source_memory_ids = row
3166 .get::<_, String>(12)
3167 .ok()
3168 .and_then(|raw| serde_json::from_str(&raw).ok())
3169 .unwrap_or_default();
3170 let metadata = row
3171 .get::<_, Option<String>>(14)?
3172 .and_then(|raw| serde_json::from_str(&raw).ok());
3173 Ok(KnowledgeItemRecord {
3174 id: row.get(0)?,
3175 space_id: row.get(1)?,
3176 coverage_key: row.get(2)?,
3177 dedupe_key: row.get(3)?,
3178 item_type: row.get(4)?,
3179 title: row.get(5)?,
3180 summary: row.get(6)?,
3181 payload,
3182 trust_level,
3183 status,
3184 run_id: row.get(10)?,
3185 artifact_refs,
3186 source_memory_ids,
3187 freshness_expires_at_ms: row.get::<_, Option<i64>>(13)?.map(|value| value as u64),
3188 metadata,
3189 created_at_ms: row.get::<_, i64>(15)? as u64,
3190 updated_at_ms: row.get::<_, i64>(16)? as u64,
3191 })
3192}
3193
3194fn row_to_knowledge_coverage(row: &Row) -> Result<KnowledgeCoverageRecord, rusqlite::Error> {
3195 let metadata = row
3196 .get::<_, Option<String>>(7)?
3197 .and_then(|raw| serde_json::from_str(&raw).ok());
3198 Ok(KnowledgeCoverageRecord {
3199 coverage_key: row.get(0)?,
3200 space_id: row.get(1)?,
3201 latest_item_id: row.get(2)?,
3202 latest_dedupe_key: row.get(3)?,
3203 last_seen_at_ms: row.get::<_, i64>(4)? as u64,
3204 last_promoted_at_ms: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
3205 freshness_expires_at_ms: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
3206 metadata,
3207 })
3208}
3209
3210fn truncate_string(s: &str, max_len: usize) -> String {
3211 if s.len() <= max_len {
3212 s.to_string()
3213 } else {
3214 format!("{}...", &s[..max_len.saturating_sub(3)])
3215 }
3216}
3217
3218fn build_fts_query(query: &str) -> String {
3219 let tokens = query
3220 .split_whitespace()
3221 .filter_map(|tok| {
3222 let cleaned =
3223 tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
3224 if cleaned.is_empty() {
3225 None
3226 } else {
3227 Some(format!("\"{}\"", cleaned))
3228 }
3229 })
3230 .collect::<Vec<_>>();
3231 if tokens.is_empty() {
3232 "\"\"".to_string()
3233 } else {
3234 tokens.join(" OR ")
3235 }
3236}
3237
3238#[cfg(test)]
3239mod tests {
3240 use super::*;
3241 use serde_json::Value;
3242 use tandem_orchestrator::{KnowledgeScope, KnowledgeTrustLevel};
3243 use tempfile::TempDir;
3244
3245 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
3246 let temp_dir = TempDir::new().unwrap();
3247 let db_path = temp_dir.path().join("test_memory.db");
3248 let db = MemoryDatabase::new(&db_path).await.unwrap();
3249 (db, temp_dir)
3250 }
3251
3252 #[tokio::test]
3253 async fn test_init_schema() {
3254 let (db, _temp) = setup_test_db().await;
3255 let stats = db.get_stats().await.unwrap();
3257 assert_eq!(stats.total_chunks, 0);
3258 }
3259
3260 #[tokio::test]
3261 async fn test_knowledge_registry_roundtrip() {
3262 let (db, _temp) = setup_test_db().await;
3263
3264 let space = KnowledgeSpaceRecord {
3265 id: "space-1".to_string(),
3266 scope: tandem_orchestrator::KnowledgeScope::Project,
3267 project_id: Some("project-1".to_string()),
3268 namespace: Some("support".to_string()),
3269 title: Some("Support Knowledge".to_string()),
3270 description: Some("Reusable support guidance".to_string()),
3271 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
3272 metadata: Some(serde_json::json!({"owner": "ops"})),
3273 created_at_ms: 1,
3274 updated_at_ms: 2,
3275 };
3276 db.upsert_knowledge_space(&space).await.unwrap();
3277
3278 let item = KnowledgeItemRecord {
3279 id: "item-1".to_string(),
3280 space_id: space.id.clone(),
3281 coverage_key: "project-1/support/debugging/slow-start".to_string(),
3282 dedupe_key: "dedupe-1".to_string(),
3283 item_type: "decision".to_string(),
3284 title: "Restart service before retry".to_string(),
3285 summary: Some("When the service is stale, restart before retrying.".to_string()),
3286 payload: serde_json::json!({"action": "restart"}),
3287 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
3288 status: KnowledgeItemStatus::Promoted,
3289 run_id: Some("run-1".to_string()),
3290 artifact_refs: vec!["artifact://run-1/report".to_string()],
3291 source_memory_ids: vec!["memory-1".to_string()],
3292 freshness_expires_at_ms: Some(10),
3293 metadata: Some(serde_json::json!({"source": "run"})),
3294 created_at_ms: 3,
3295 updated_at_ms: 4,
3296 };
3297 db.upsert_knowledge_item(&item).await.unwrap();
3298
3299 let coverage = KnowledgeCoverageRecord {
3300 coverage_key: item.coverage_key.clone(),
3301 space_id: space.id.clone(),
3302 latest_item_id: Some(item.id.clone()),
3303 latest_dedupe_key: Some(item.dedupe_key.clone()),
3304 last_seen_at_ms: 5,
3305 last_promoted_at_ms: Some(6),
3306 freshness_expires_at_ms: Some(10),
3307 metadata: Some(serde_json::json!({"coverage": true})),
3308 };
3309 db.upsert_knowledge_coverage(&coverage).await.unwrap();
3310
3311 let loaded_space = db.get_knowledge_space(&space.id).await.unwrap().unwrap();
3312 assert_eq!(loaded_space.namespace.as_deref(), Some("support"));
3313
3314 let loaded_items = db
3315 .list_knowledge_items(&space.id, Some(&item.coverage_key))
3316 .await
3317 .unwrap();
3318 assert_eq!(loaded_items.len(), 1);
3319 assert_eq!(loaded_items[0].title, item.title);
3320
3321 let loaded_coverage = db
3322 .get_knowledge_coverage(&item.coverage_key, &space.id)
3323 .await
3324 .unwrap()
3325 .unwrap();
3326 assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
3327 }
3328
3329 #[tokio::test]
3330 async fn test_store_and_retrieve_chunk() {
3331 let (db, _temp) = setup_test_db().await;
3332
3333 let chunk = MemoryChunk {
3334 id: "test-1".to_string(),
3335 content: "Test content".to_string(),
3336 tier: MemoryTier::Session,
3337 session_id: Some("session-1".to_string()),
3338 project_id: Some("project-1".to_string()),
3339 source: "user_message".to_string(),
3340 source_path: None,
3341 source_mtime: None,
3342 source_size: None,
3343 source_hash: None,
3344 created_at: Utc::now(),
3345 token_count: 10,
3346 metadata: None,
3347 };
3348
3349 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
3350 db.store_chunk(&chunk, &embedding).await.unwrap();
3351
3352 let chunks = db.get_session_chunks("session-1").await.unwrap();
3353 assert_eq!(chunks.len(), 1);
3354 assert_eq!(chunks[0].content, "Test content");
3355 }
3356
3357 #[tokio::test]
3358 async fn test_store_and_retrieve_global_chunk() {
3359 let (db, _temp) = setup_test_db().await;
3360
3361 let chunk = MemoryChunk {
3362 id: "global-1".to_string(),
3363 content: "Global note".to_string(),
3364 tier: MemoryTier::Global,
3365 session_id: None,
3366 project_id: None,
3367 source: "agent_note".to_string(),
3368 source_path: None,
3369 source_mtime: None,
3370 source_size: None,
3371 source_hash: None,
3372 created_at: Utc::now(),
3373 token_count: 7,
3374 metadata: Some(serde_json::json!({"kind":"test"})),
3375 };
3376
3377 let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
3378 db.store_chunk(&chunk, &embedding).await.unwrap();
3379
3380 let chunks = db.get_global_chunks(10).await.unwrap();
3381 assert_eq!(chunks.len(), 1);
3382 assert_eq!(chunks[0].content, "Global note");
3383 assert_eq!(chunks[0].source, "agent_note");
3384 assert_eq!(chunks[0].token_count, 7);
3385 assert_eq!(chunks[0].tier, MemoryTier::Global);
3386 }
3387
3388 #[tokio::test]
3389 async fn test_global_chunk_exists_by_source_hash() {
3390 let (db, _temp) = setup_test_db().await;
3391
3392 let chunk = MemoryChunk {
3393 id: "global-hash".to_string(),
3394 content: "Global hash note".to_string(),
3395 tier: MemoryTier::Global,
3396 session_id: None,
3397 project_id: None,
3398 source: "chat_exchange".to_string(),
3399 source_path: None,
3400 source_mtime: None,
3401 source_size: None,
3402 source_hash: Some("hash-123".to_string()),
3403 created_at: Utc::now(),
3404 token_count: 5,
3405 metadata: None,
3406 };
3407
3408 let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
3409 db.store_chunk(&chunk, &embedding).await.unwrap();
3410
3411 assert!(db
3412 .global_chunk_exists_by_source_hash("hash-123")
3413 .await
3414 .unwrap());
3415 assert!(!db
3416 .global_chunk_exists_by_source_hash("missing-hash")
3417 .await
3418 .unwrap());
3419 }
3420
3421 #[tokio::test]
3422 async fn test_config_crud() {
3423 let (db, _temp) = setup_test_db().await;
3424
3425 let config = db.get_or_create_config("project-1").await.unwrap();
3426 assert_eq!(config.max_chunks, 10000);
3427
3428 let new_config = MemoryConfig {
3429 max_chunks: 5000,
3430 ..Default::default()
3431 };
3432 db.update_config("project-1", &new_config).await.unwrap();
3433
3434 let updated = db.get_or_create_config("project-1").await.unwrap();
3435 assert_eq!(updated.max_chunks, 5000);
3436 }
3437
3438 #[tokio::test]
3439 async fn test_global_memory_put_search_and_dedup() {
3440 let (db, _temp) = setup_test_db().await;
3441 let now = chrono::Utc::now().timestamp_millis() as u64;
3442 let record = GlobalMemoryRecord {
3443 id: "gm-1".to_string(),
3444 user_id: "user-a".to_string(),
3445 source_type: "user_message".to_string(),
3446 content: "remember rust workspace layout".to_string(),
3447 content_hash: "h1".to_string(),
3448 run_id: "run-1".to_string(),
3449 session_id: Some("s1".to_string()),
3450 message_id: Some("m1".to_string()),
3451 tool_name: None,
3452 project_tag: Some("proj-x".to_string()),
3453 channel_tag: None,
3454 host_tag: None,
3455 metadata: None,
3456 provenance: None,
3457 redaction_status: "passed".to_string(),
3458 redaction_count: 0,
3459 visibility: "private".to_string(),
3460 demoted: false,
3461 score_boost: 0.0,
3462 created_at_ms: now,
3463 updated_at_ms: now,
3464 expires_at_ms: None,
3465 };
3466 let first = db.put_global_memory_record(&record).await.unwrap();
3467 assert!(first.stored);
3468 let second = db.put_global_memory_record(&record).await.unwrap();
3469 assert!(second.deduped);
3470
3471 let hits = db
3472 .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
3473 .await
3474 .unwrap();
3475 assert!(!hits.is_empty());
3476 assert_eq!(hits[0].record.id, "gm-1");
3477 }
3478
3479 #[tokio::test]
3480 async fn test_knowledge_registry_round_trip() {
3481 let (db, _temp) = setup_test_db().await;
3482 let now = chrono::Utc::now().timestamp_millis() as u64;
3483
3484 let space = KnowledgeSpaceRecord {
3485 id: "space-1".to_string(),
3486 scope: KnowledgeScope::Project,
3487 project_id: Some("project-1".to_string()),
3488 namespace: Some("marketing/positioning".to_string()),
3489 title: Some("Marketing positioning".to_string()),
3490 description: Some("Reusable positioning guidance".to_string()),
3491 trust_level: KnowledgeTrustLevel::ApprovedDefault,
3492 metadata: Some(serde_json::json!({"owner":"marketing"})),
3493 created_at_ms: now,
3494 updated_at_ms: now,
3495 };
3496 db.upsert_knowledge_space(&space).await.unwrap();
3497
3498 let loaded_space = db.get_knowledge_space("space-1").await.unwrap().unwrap();
3499 assert_eq!(loaded_space.id, "space-1");
3500 assert_eq!(loaded_space.scope, KnowledgeScope::Project);
3501 assert_eq!(loaded_space.project_id.as_deref(), Some("project-1"));
3502 assert_eq!(
3503 loaded_space.namespace.as_deref(),
3504 Some("marketing/positioning")
3505 );
3506
3507 let item = KnowledgeItemRecord {
3508 id: "item-1".to_string(),
3509 space_id: "space-1".to_string(),
3510 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3511 dedupe_key: "item-1-dedupe".to_string(),
3512 item_type: "evidence".to_string(),
3513 title: "Pricing sensitivity observation".to_string(),
3514 summary: Some("Customers reacted to annual pricing changes".to_string()),
3515 payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
3516 trust_level: KnowledgeTrustLevel::Promoted,
3517 status: KnowledgeItemStatus::Promoted,
3518 run_id: Some("run-1".to_string()),
3519 artifact_refs: vec!["artifact://run-1/research-sources".to_string()],
3520 source_memory_ids: vec!["memory-1".to_string()],
3521 freshness_expires_at_ms: Some(now + 86_400_000),
3522 metadata: Some(serde_json::json!({"source_kind":"web"})),
3523 created_at_ms: now,
3524 updated_at_ms: now,
3525 };
3526 db.upsert_knowledge_item(&item).await.unwrap();
3527
3528 let loaded_item = db.get_knowledge_item("item-1").await.unwrap().unwrap();
3529 assert_eq!(loaded_item.id, "item-1");
3530 assert_eq!(loaded_item.space_id, "space-1");
3531 assert_eq!(
3532 loaded_item.coverage_key,
3533 "project-1::marketing/positioning::strategy::pricing"
3534 );
3535 assert_eq!(loaded_item.status, KnowledgeItemStatus::Promoted);
3536 assert_eq!(
3537 loaded_item.artifact_refs,
3538 vec!["artifact://run-1/research-sources".to_string()]
3539 );
3540
3541 let by_space = db.list_knowledge_items("space-1", None).await.unwrap();
3542 assert_eq!(by_space.len(), 1);
3543 let by_coverage = db
3544 .list_knowledge_items(
3545 "space-1",
3546 Some("project-1::marketing/positioning::strategy::pricing"),
3547 )
3548 .await
3549 .unwrap();
3550 assert_eq!(by_coverage.len(), 1);
3551
3552 let coverage = KnowledgeCoverageRecord {
3553 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3554 space_id: "space-1".to_string(),
3555 latest_item_id: Some("item-1".to_string()),
3556 latest_dedupe_key: Some("item-1-dedupe".to_string()),
3557 last_seen_at_ms: now,
3558 last_promoted_at_ms: Some(now),
3559 freshness_expires_at_ms: Some(now + 86_400_000),
3560 metadata: Some(serde_json::json!({"reuse_reason":"same topic"})),
3561 };
3562 db.upsert_knowledge_coverage(&coverage).await.unwrap();
3563
3564 let loaded_coverage = db
3565 .get_knowledge_coverage(
3566 "project-1::marketing/positioning::strategy::pricing",
3567 "space-1",
3568 )
3569 .await
3570 .unwrap()
3571 .unwrap();
3572 assert_eq!(loaded_coverage.space_id, "space-1");
3573 assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
3574 assert_eq!(
3575 loaded_coverage.latest_dedupe_key.as_deref(),
3576 Some("item-1-dedupe")
3577 );
3578 }
3579
3580 #[tokio::test]
3581 async fn test_knowledge_promotion_working_to_promoted_updates_coverage() {
3582 let (db, _temp) = setup_test_db().await;
3583 let now = chrono::Utc::now().timestamp_millis() as u64;
3584
3585 let space = KnowledgeSpaceRecord {
3586 id: "space-promote-1".to_string(),
3587 scope: KnowledgeScope::Project,
3588 project_id: Some("project-1".to_string()),
3589 namespace: Some("engineering/debugging".to_string()),
3590 title: Some("Engineering debugging".to_string()),
3591 description: Some("Reusable debugging guidance".to_string()),
3592 trust_level: KnowledgeTrustLevel::Promoted,
3593 metadata: None,
3594 created_at_ms: now,
3595 updated_at_ms: now,
3596 };
3597 db.upsert_knowledge_space(&space).await.unwrap();
3598
3599 let item = KnowledgeItemRecord {
3600 id: "item-promote-1".to_string(),
3601 space_id: space.id.clone(),
3602 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
3603 dedupe_key: "dedupe-promote-1".to_string(),
3604 item_type: "decision".to_string(),
3605 title: "Delay startup-dependent retries".to_string(),
3606 summary: Some("Retry only after startup completed.".to_string()),
3607 payload: serde_json::json!({"action":"delay_retry"}),
3608 trust_level: KnowledgeTrustLevel::Working,
3609 status: KnowledgeItemStatus::Working,
3610 run_id: Some("run-1".to_string()),
3611 artifact_refs: vec!["artifact://run-1/debug".to_string()],
3612 source_memory_ids: vec!["memory-1".to_string()],
3613 freshness_expires_at_ms: None,
3614 metadata: None,
3615 created_at_ms: now,
3616 updated_at_ms: now,
3617 };
3618 db.upsert_knowledge_item(&item).await.unwrap();
3619
3620 let promote = KnowledgePromotionRequest {
3621 item_id: item.id.clone(),
3622 target_status: KnowledgeItemStatus::Promoted,
3623 promoted_at_ms: now + 10,
3624 freshness_expires_at_ms: Some(now + 86_400_000),
3625 reviewer_id: None,
3626 approval_id: None,
3627 reason: Some("validated in workflow".to_string()),
3628 };
3629
3630 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3631 assert!(result.promoted);
3632 assert_eq!(result.item.status, KnowledgeItemStatus::Promoted);
3633 assert_eq!(result.item.trust_level, KnowledgeTrustLevel::Promoted);
3634 assert_eq!(
3635 result.coverage.latest_item_id.as_deref(),
3636 Some("item-promote-1")
3637 );
3638 assert_eq!(
3639 result.coverage.latest_dedupe_key.as_deref(),
3640 Some("dedupe-promote-1")
3641 );
3642 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 10));
3643 }
3644
3645 #[tokio::test]
3646 async fn test_knowledge_promotion_promoted_to_approved_default_requires_review() {
3647 let (db, _temp) = setup_test_db().await;
3648 let now = chrono::Utc::now().timestamp_millis() as u64;
3649
3650 let space = KnowledgeSpaceRecord {
3651 id: "space-promote-2".to_string(),
3652 scope: KnowledgeScope::Project,
3653 project_id: Some("project-1".to_string()),
3654 namespace: Some("marketing/positioning".to_string()),
3655 title: Some("Marketing positioning".to_string()),
3656 description: Some("Reusable positioning guidance".to_string()),
3657 trust_level: KnowledgeTrustLevel::Promoted,
3658 metadata: None,
3659 created_at_ms: now,
3660 updated_at_ms: now,
3661 };
3662 db.upsert_knowledge_space(&space).await.unwrap();
3663
3664 let item = KnowledgeItemRecord {
3665 id: "item-promote-2".to_string(),
3666 space_id: space.id.clone(),
3667 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3668 dedupe_key: "dedupe-promote-2".to_string(),
3669 item_type: "evidence".to_string(),
3670 title: "Pricing observation".to_string(),
3671 summary: Some("Annual pricing changes created friction".to_string()),
3672 payload: serde_json::json!({"claim":"pricing friction"}),
3673 trust_level: KnowledgeTrustLevel::Promoted,
3674 status: KnowledgeItemStatus::Promoted,
3675 run_id: Some("run-2".to_string()),
3676 artifact_refs: vec!["artifact://run-2/research".to_string()],
3677 source_memory_ids: vec!["memory-2".to_string()],
3678 freshness_expires_at_ms: None,
3679 metadata: None,
3680 created_at_ms: now,
3681 updated_at_ms: now,
3682 };
3683 db.upsert_knowledge_item(&item).await.unwrap();
3684
3685 let promote = KnowledgePromotionRequest {
3686 item_id: item.id.clone(),
3687 target_status: KnowledgeItemStatus::ApprovedDefault,
3688 promoted_at_ms: now + 5,
3689 freshness_expires_at_ms: None,
3690 reviewer_id: None,
3691 approval_id: None,
3692 reason: Some("should require review".to_string()),
3693 };
3694
3695 let err = db.promote_knowledge_item(&promote).await.unwrap_err();
3696 match err {
3697 MemoryError::InvalidConfig(_) => {}
3698 other => panic!("unexpected error: {}", other),
3699 }
3700 }
3701
3702 #[tokio::test]
3703 async fn test_knowledge_promotion_promoted_to_approved_default_updates_coverage() {
3704 let (db, _temp) = setup_test_db().await;
3705 let now = chrono::Utc::now().timestamp_millis() as u64;
3706
3707 let space = KnowledgeSpaceRecord {
3708 id: "space-promote-3".to_string(),
3709 scope: KnowledgeScope::Project,
3710 project_id: Some("project-1".to_string()),
3711 namespace: Some("support/runbooks".to_string()),
3712 title: Some("Support runbooks".to_string()),
3713 description: Some("Reusable runbook guidance".to_string()),
3714 trust_level: KnowledgeTrustLevel::Promoted,
3715 metadata: None,
3716 created_at_ms: now,
3717 updated_at_ms: now,
3718 };
3719 db.upsert_knowledge_space(&space).await.unwrap();
3720
3721 let item = KnowledgeItemRecord {
3722 id: "item-promote-3".to_string(),
3723 space_id: space.id.clone(),
3724 coverage_key: "project-1::support/runbooks::oncall::restart".to_string(),
3725 dedupe_key: "dedupe-promote-3".to_string(),
3726 item_type: "runbook".to_string(),
3727 title: "Restart service and verify".to_string(),
3728 summary: Some("Restart then validate health endpoint.".to_string()),
3729 payload: serde_json::json!({"steps":["restart","healthcheck"]}),
3730 trust_level: KnowledgeTrustLevel::Promoted,
3731 status: KnowledgeItemStatus::Promoted,
3732 run_id: Some("run-3".to_string()),
3733 artifact_refs: vec!["artifact://run-3/runbook".to_string()],
3734 source_memory_ids: vec!["memory-3".to_string()],
3735 freshness_expires_at_ms: None,
3736 metadata: None,
3737 created_at_ms: now,
3738 updated_at_ms: now,
3739 };
3740 db.upsert_knowledge_item(&item).await.unwrap();
3741
3742 let promote = KnowledgePromotionRequest {
3743 item_id: item.id.clone(),
3744 target_status: KnowledgeItemStatus::ApprovedDefault,
3745 promoted_at_ms: now + 12,
3746 freshness_expires_at_ms: Some(now + 172_800_000),
3747 reviewer_id: Some("reviewer-1".to_string()),
3748 approval_id: Some("approval-1".to_string()),
3749 reason: Some("reviewed by ops".to_string()),
3750 };
3751
3752 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3753 assert!(result.promoted);
3754 assert_eq!(result.item.status, KnowledgeItemStatus::ApprovedDefault);
3755 assert_eq!(
3756 result.item.trust_level,
3757 KnowledgeTrustLevel::ApprovedDefault
3758 );
3759 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 12));
3760 assert_eq!(
3761 result.coverage.latest_item_id.as_deref(),
3762 Some("item-promote-3")
3763 );
3764 }
3765
3766 #[tokio::test]
3767 async fn test_knowledge_promotion_rejects_deprecated() {
3768 let (db, _temp) = setup_test_db().await;
3769 let now = chrono::Utc::now().timestamp_millis() as u64;
3770
3771 let space = KnowledgeSpaceRecord {
3772 id: "space-promote-4".to_string(),
3773 scope: KnowledgeScope::Project,
3774 project_id: Some("project-1".to_string()),
3775 namespace: Some("ops".to_string()),
3776 title: Some("Ops knowledge".to_string()),
3777 description: Some("Reusable ops knowledge".to_string()),
3778 trust_level: KnowledgeTrustLevel::Promoted,
3779 metadata: None,
3780 created_at_ms: now,
3781 updated_at_ms: now,
3782 };
3783 db.upsert_knowledge_space(&space).await.unwrap();
3784
3785 let item = KnowledgeItemRecord {
3786 id: "item-promote-4".to_string(),
3787 space_id: space.id.clone(),
3788 coverage_key: "project-1::ops::incident::latency".to_string(),
3789 dedupe_key: "dedupe-promote-4".to_string(),
3790 item_type: "decision".to_string(),
3791 title: "Ignore deprecated item".to_string(),
3792 summary: None,
3793 payload: serde_json::json!({"decision":"deprecated"}),
3794 trust_level: KnowledgeTrustLevel::Promoted,
3795 status: KnowledgeItemStatus::Deprecated,
3796 run_id: Some("run-4".to_string()),
3797 artifact_refs: vec![],
3798 source_memory_ids: vec![],
3799 freshness_expires_at_ms: None,
3800 metadata: None,
3801 created_at_ms: now,
3802 updated_at_ms: now,
3803 };
3804 db.upsert_knowledge_item(&item).await.unwrap();
3805
3806 let promote = KnowledgePromotionRequest {
3807 item_id: item.id.clone(),
3808 target_status: KnowledgeItemStatus::Promoted,
3809 promoted_at_ms: now + 1,
3810 freshness_expires_at_ms: None,
3811 reviewer_id: None,
3812 approval_id: None,
3813 reason: None,
3814 };
3815
3816 let err = db.promote_knowledge_item(&promote).await.unwrap_err();
3817 match err {
3818 MemoryError::InvalidConfig(_) => {}
3819 other => panic!("unexpected error: {}", other),
3820 }
3821 }
3822
3823 #[tokio::test]
3824 async fn test_knowledge_promotion_idempotent_updates_coverage() {
3825 let (db, _temp) = setup_test_db().await;
3826 let now = chrono::Utc::now().timestamp_millis() as u64;
3827
3828 let space = KnowledgeSpaceRecord {
3829 id: "space-promote-5".to_string(),
3830 scope: KnowledgeScope::Project,
3831 project_id: Some("project-1".to_string()),
3832 namespace: Some("engineering/ops".to_string()),
3833 title: Some("Engineering ops".to_string()),
3834 description: None,
3835 trust_level: KnowledgeTrustLevel::Promoted,
3836 metadata: None,
3837 created_at_ms: now,
3838 updated_at_ms: now,
3839 };
3840 db.upsert_knowledge_space(&space).await.unwrap();
3841
3842 let item = KnowledgeItemRecord {
3843 id: "item-promote-5".to_string(),
3844 space_id: space.id.clone(),
3845 coverage_key: "project-1::engineering/ops::deploy::guardrails".to_string(),
3846 dedupe_key: "dedupe-promote-5".to_string(),
3847 item_type: "pattern".to_string(),
3848 title: "Deploy guardrails".to_string(),
3849 summary: None,
3850 payload: serde_json::json!({"pattern":"guardrails"}),
3851 trust_level: KnowledgeTrustLevel::Promoted,
3852 status: KnowledgeItemStatus::Promoted,
3853 run_id: Some("run-5".to_string()),
3854 artifact_refs: vec![],
3855 source_memory_ids: vec![],
3856 freshness_expires_at_ms: None,
3857 metadata: None,
3858 created_at_ms: now,
3859 updated_at_ms: now,
3860 };
3861 db.upsert_knowledge_item(&item).await.unwrap();
3862
3863 let promote = KnowledgePromotionRequest {
3864 item_id: item.id.clone(),
3865 target_status: KnowledgeItemStatus::Promoted,
3866 promoted_at_ms: now + 20,
3867 freshness_expires_at_ms: None,
3868 reviewer_id: None,
3869 approval_id: None,
3870 reason: None,
3871 };
3872
3873 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3874 assert!(!result.promoted);
3875 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 20));
3876 assert_eq!(
3877 result.coverage.latest_item_id.as_deref(),
3878 Some("item-promote-5")
3879 );
3880 }
3881
3882 #[tokio::test]
3883 async fn test_knowledge_item_promotion_updates_coverage() {
3884 let (db, _temp) = setup_test_db().await;
3885 let now = chrono::Utc::now().timestamp_millis() as u64;
3886
3887 let space = KnowledgeSpaceRecord {
3888 id: "space-promote".to_string(),
3889 scope: KnowledgeScope::Project,
3890 project_id: Some("project-1".to_string()),
3891 namespace: Some("engineering/debugging".to_string()),
3892 title: Some("Engineering debugging".to_string()),
3893 description: Some("Reusable debugging guidance".to_string()),
3894 trust_level: KnowledgeTrustLevel::Promoted,
3895 metadata: None,
3896 created_at_ms: now,
3897 updated_at_ms: now,
3898 };
3899 db.upsert_knowledge_space(&space).await.unwrap();
3900
3901 let item = KnowledgeItemRecord {
3902 id: "item-promote".to_string(),
3903 space_id: space.id.clone(),
3904 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
3905 dedupe_key: "dedupe-promote".to_string(),
3906 item_type: "decision".to_string(),
3907 title: "Delay startup-dependent retries".to_string(),
3908 summary: Some("Retry only after startup completes.".to_string()),
3909 payload: serde_json::json!({"action": "delay_retry"}),
3910 trust_level: KnowledgeTrustLevel::Working,
3911 status: KnowledgeItemStatus::Working,
3912 run_id: Some("run-promote".to_string()),
3913 artifact_refs: vec!["artifact://run-promote/report".to_string()],
3914 source_memory_ids: vec!["memory-promote".to_string()],
3915 freshness_expires_at_ms: None,
3916 metadata: Some(serde_json::json!({"source_kind":"run"})),
3917 created_at_ms: now,
3918 updated_at_ms: now,
3919 };
3920 db.upsert_knowledge_item(&item).await.unwrap();
3921
3922 let request = KnowledgePromotionRequest {
3923 item_id: item.id.clone(),
3924 target_status: KnowledgeItemStatus::Promoted,
3925 promoted_at_ms: now + 10,
3926 freshness_expires_at_ms: Some(now + 86_400_000),
3927 reviewer_id: None,
3928 approval_id: None,
3929 reason: Some("validated".to_string()),
3930 };
3931 let promoted = db
3932 .promote_knowledge_item(&request)
3933 .await
3934 .unwrap()
3935 .expect("promotion result");
3936 assert_eq!(promoted.previous_status, KnowledgeItemStatus::Working);
3937 assert!(promoted.promoted);
3938 assert_eq!(promoted.item.status, KnowledgeItemStatus::Promoted);
3939 assert_eq!(promoted.item.trust_level, KnowledgeTrustLevel::Promoted);
3940 assert_eq!(
3941 promoted.item.freshness_expires_at_ms,
3942 Some(now + 86_400_000)
3943 );
3944 assert_eq!(
3945 promoted
3946 .item
3947 .metadata
3948 .as_ref()
3949 .and_then(|value| value.get("promotion"))
3950 .and_then(|value| value.get("to_status"))
3951 .and_then(Value::as_str),
3952 Some("promoted")
3953 );
3954 assert_eq!(
3955 promoted.coverage.latest_item_id.as_deref(),
3956 Some("item-promote")
3957 );
3958 assert_eq!(
3959 promoted.coverage.latest_dedupe_key.as_deref(),
3960 Some("dedupe-promote")
3961 );
3962 assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 10));
3963 assert_eq!(
3964 promoted.coverage.freshness_expires_at_ms,
3965 Some(now + 86_400_000)
3966 );
3967
3968 let loaded = db
3969 .get_knowledge_item("item-promote")
3970 .await
3971 .unwrap()
3972 .unwrap();
3973 assert_eq!(loaded.status, KnowledgeItemStatus::Promoted);
3974 assert_eq!(
3975 loaded
3976 .metadata
3977 .as_ref()
3978 .and_then(|value| value.get("promotion"))
3979 .and_then(|value| value.get("from_status"))
3980 .and_then(Value::as_str),
3981 Some("working")
3982 );
3983 }
3984
3985 #[tokio::test]
3986 async fn test_knowledge_item_approved_default_requires_review() {
3987 let (db, _temp) = setup_test_db().await;
3988 let now = chrono::Utc::now().timestamp_millis() as u64;
3989
3990 let space = KnowledgeSpaceRecord {
3991 id: "space-approved".to_string(),
3992 scope: KnowledgeScope::Project,
3993 project_id: Some("project-1".to_string()),
3994 namespace: Some("marketing/positioning".to_string()),
3995 title: Some("Marketing positioning".to_string()),
3996 description: Some("Reusable positioning guidance".to_string()),
3997 trust_level: KnowledgeTrustLevel::Promoted,
3998 metadata: None,
3999 created_at_ms: now,
4000 updated_at_ms: now,
4001 };
4002 db.upsert_knowledge_space(&space).await.unwrap();
4003
4004 let item = KnowledgeItemRecord {
4005 id: "item-approved".to_string(),
4006 space_id: space.id.clone(),
4007 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
4008 dedupe_key: "dedupe-approved".to_string(),
4009 item_type: "evidence".to_string(),
4010 title: "Pricing sensitivity observation".to_string(),
4011 summary: Some("Customers reacted to annual pricing changes".to_string()),
4012 payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
4013 trust_level: KnowledgeTrustLevel::Promoted,
4014 status: KnowledgeItemStatus::Promoted,
4015 run_id: Some("run-approved".to_string()),
4016 artifact_refs: vec!["artifact://run-approved/research".to_string()],
4017 source_memory_ids: vec!["memory-approved".to_string()],
4018 freshness_expires_at_ms: Some(now + 1234),
4019 metadata: None,
4020 created_at_ms: now,
4021 updated_at_ms: now,
4022 };
4023 db.upsert_knowledge_item(&item).await.unwrap();
4024
4025 let request = KnowledgePromotionRequest {
4026 item_id: item.id.clone(),
4027 target_status: KnowledgeItemStatus::ApprovedDefault,
4028 promoted_at_ms: now + 20,
4029 freshness_expires_at_ms: Some(now + 90_000_000),
4030 reviewer_id: Some("reviewer-1".to_string()),
4031 approval_id: Some("approval-1".to_string()),
4032 reason: Some("approved as default guidance".to_string()),
4033 };
4034 let promoted = db
4035 .promote_knowledge_item(&request)
4036 .await
4037 .unwrap()
4038 .expect("promotion result");
4039 assert_eq!(promoted.previous_status, KnowledgeItemStatus::Promoted);
4040 assert_eq!(promoted.item.status, KnowledgeItemStatus::ApprovedDefault);
4041 assert_eq!(
4042 promoted.item.trust_level,
4043 KnowledgeTrustLevel::ApprovedDefault
4044 );
4045 assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 20));
4046 assert_eq!(
4047 promoted
4048 .item
4049 .metadata
4050 .as_ref()
4051 .and_then(|value| value.get("promotion"))
4052 .and_then(|value| value.get("approval_id"))
4053 .and_then(Value::as_str),
4054 Some("approval-1")
4055 );
4056 }
4057
4058 #[tokio::test]
4059 async fn test_knowledge_item_promotion_rejects_invalid_transition() {
4060 let (db, _temp) = setup_test_db().await;
4061 let now = chrono::Utc::now().timestamp_millis() as u64;
4062
4063 let space = KnowledgeSpaceRecord {
4064 id: "space-invalid".to_string(),
4065 scope: KnowledgeScope::Project,
4066 project_id: Some("project-1".to_string()),
4067 namespace: Some("support".to_string()),
4068 title: Some("Support".to_string()),
4069 description: Some("Support guidance".to_string()),
4070 trust_level: KnowledgeTrustLevel::Promoted,
4071 metadata: None,
4072 created_at_ms: now,
4073 updated_at_ms: now,
4074 };
4075 db.upsert_knowledge_space(&space).await.unwrap();
4076
4077 let item = KnowledgeItemRecord {
4078 id: "item-invalid".to_string(),
4079 space_id: space.id.clone(),
4080 coverage_key: "project-1::support::workflow::triage".to_string(),
4081 dedupe_key: "dedupe-invalid".to_string(),
4082 item_type: "decision".to_string(),
4083 title: "Triage first".to_string(),
4084 summary: None,
4085 payload: serde_json::json!({"action":"triage"}),
4086 trust_level: KnowledgeTrustLevel::Working,
4087 status: KnowledgeItemStatus::Working,
4088 run_id: Some("run-invalid".to_string()),
4089 artifact_refs: vec![],
4090 source_memory_ids: vec![],
4091 freshness_expires_at_ms: None,
4092 metadata: None,
4093 created_at_ms: now,
4094 updated_at_ms: now,
4095 };
4096 db.upsert_knowledge_item(&item).await.unwrap();
4097
4098 let request = KnowledgePromotionRequest {
4099 item_id: item.id.clone(),
4100 target_status: KnowledgeItemStatus::ApprovedDefault,
4101 promoted_at_ms: now + 1,
4102 freshness_expires_at_ms: None,
4103 reviewer_id: Some("reviewer-1".to_string()),
4104 approval_id: Some("approval-1".to_string()),
4105 reason: Some("should fail".to_string()),
4106 };
4107 let err = db.promote_knowledge_item(&request).await.unwrap_err();
4108 assert!(matches!(err, MemoryError::InvalidConfig(_)));
4109 let loaded = db.get_knowledge_item(&item.id).await.unwrap().unwrap();
4110 assert_eq!(loaded.status, KnowledgeItemStatus::Working);
4111 }
4112}