Skip to main content

tandem_memory/
db.rs

1// Database Layer Module
2// SQLite + sqlite-vec for vector storage
3
4use crate::types::{
5    ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6    KnowledgeCoverageRecord, KnowledgeItemRecord, KnowledgeItemStatus, KnowledgePromotionRequest,
7    KnowledgePromotionResult, KnowledgeSpaceRecord, MemoryChunk, MemoryConfig, MemoryError,
8    MemoryResult, MemoryStats, MemoryTier, ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
9};
10use chrono::{DateTime, Utc};
11use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
12use sqlite_vec::sqlite3_vec_init;
13use std::collections::HashSet;
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Mutex;
18
19type ProjectIndexStatusRow = (
20    Option<String>,
21    Option<i64>,
22    Option<i64>,
23    Option<i64>,
24    Option<i64>,
25    Option<i64>,
26);
27
28/// Database connection manager
29pub struct MemoryDatabase {
30    conn: Arc<Mutex<Connection>>,
31    db_path: std::path::PathBuf,
32}
33
34impl MemoryDatabase {
35    /// Initialize or open the memory database
36    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
37        // Register sqlite-vec extension
38        unsafe {
39            sqlite3_auto_extension(Some(std::mem::transmute::<
40                *const (),
41                unsafe extern "C" fn(
42                    *mut rusqlite::ffi::sqlite3,
43                    *mut *mut i8,
44                    *const rusqlite::ffi::sqlite3_api_routines,
45                ) -> i32,
46            >(sqlite3_vec_init as *const ())));
47        }
48
49        let conn = Connection::open(db_path)?;
50        conn.busy_timeout(Duration::from_secs(10))?;
51
52        // Enable WAL mode for better concurrency
53        // PRAGMA journal_mode returns a row, so we use query_row to ignore it
54        conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
55        conn.execute("PRAGMA synchronous = NORMAL", [])?;
56
57        let db = Self {
58            conn: Arc::new(Mutex::new(conn)),
59            db_path: db_path.to_path_buf(),
60        };
61
62        // Initialize schema
63        db.init_schema().await?;
64        if let Err(err) = db.validate_vector_tables().await {
65            match &err {
66                crate::types::MemoryError::Database(db_err)
67                    if Self::is_vector_table_error(db_err) =>
68                {
69                    tracing::warn!(
70                        "Detected vector table corruption during startup ({}). Recreating vector tables.",
71                        db_err
72                    );
73                    db.recreate_vector_tables().await?;
74                }
75                _ => return Err(err),
76            }
77        }
78        db.validate_integrity().await?;
79
80        Ok(db)
81    }
82
83    /// Validate base SQLite integrity early so startup recovery can heal corrupt DB files.
84    async fn validate_integrity(&self) -> MemoryResult<()> {
85        let conn = self.conn.lock().await;
86        let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
87        {
88            Ok(value) => value,
89            Err(err) => {
90                // sqlite-vec virtual tables can intermittently return generic SQL logic errors
91                // during integrity probing even when runtime reads/writes still work.
92                // Do not block startup on this probe failure.
93                tracing::warn!(
94                    "Skipping strict PRAGMA quick_check due to probe error: {}",
95                    err
96                );
97                return Ok(());
98            }
99        };
100        if check.trim().eq_ignore_ascii_case("ok") {
101            return Ok(());
102        }
103
104        let lowered = check.to_lowercase();
105        if lowered.contains("malformed")
106            || lowered.contains("corrupt")
107            || lowered.contains("database disk image is malformed")
108        {
109            return Err(crate::types::MemoryError::InvalidConfig(format!(
110                "malformed database integrity check: {}",
111                check
112            )));
113        }
114
115        tracing::warn!(
116            "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
117            check
118        );
119        Ok(())
120    }
121
122    /// Initialize database schema
123    async fn init_schema(&self) -> MemoryResult<()> {
124        let conn = self.conn.lock().await;
125
126        // Extension is already registered globally in new()
127
128        // Session memory chunks table
129        conn.execute(
130            "CREATE TABLE IF NOT EXISTS session_memory_chunks (
131                id TEXT PRIMARY KEY,
132                content TEXT NOT NULL,
133                session_id TEXT NOT NULL,
134                project_id TEXT,
135                source TEXT NOT NULL,
136                created_at TEXT NOT NULL,
137                token_count INTEGER NOT NULL DEFAULT 0,
138                metadata TEXT
139            )",
140            [],
141        )?;
142        let session_existing_cols: HashSet<String> = {
143            let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
144            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
145            rows.collect::<Result<HashSet<_>, _>>()?
146        };
147        if !session_existing_cols.contains("source_path") {
148            conn.execute(
149                "ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
150                [],
151            )?;
152        }
153        if !session_existing_cols.contains("source_mtime") {
154            conn.execute(
155                "ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
156                [],
157            )?;
158        }
159        if !session_existing_cols.contains("source_size") {
160            conn.execute(
161                "ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
162                [],
163            )?;
164        }
165        if !session_existing_cols.contains("source_hash") {
166            conn.execute(
167                "ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
168                [],
169            )?;
170        }
171
172        // Session memory vectors (virtual table)
173        conn.execute(
174            &format!(
175                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
176                    chunk_id TEXT PRIMARY KEY,
177                    embedding float[{}]
178                )",
179                DEFAULT_EMBEDDING_DIMENSION
180            ),
181            [],
182        )?;
183
184        // Project memory chunks table
185        conn.execute(
186            "CREATE TABLE IF NOT EXISTS project_memory_chunks (
187                id TEXT PRIMARY KEY,
188                content TEXT NOT NULL,
189                project_id TEXT NOT NULL,
190                session_id TEXT,
191                source TEXT NOT NULL,
192                created_at TEXT NOT NULL,
193                token_count INTEGER NOT NULL DEFAULT 0,
194                metadata TEXT
195            )",
196            [],
197        )?;
198
199        // Migrations: file-derived columns on project_memory_chunks
200        // (SQLite doesn't support IF NOT EXISTS for columns, so we inspect table_info)
201        let existing_cols: HashSet<String> = {
202            let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
203            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
204            rows.collect::<Result<HashSet<_>, _>>()?
205        };
206
207        if !existing_cols.contains("source_path") {
208            conn.execute(
209                "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
210                [],
211            )?;
212        }
213        if !existing_cols.contains("source_mtime") {
214            conn.execute(
215                "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
216                [],
217            )?;
218        }
219        if !existing_cols.contains("source_size") {
220            conn.execute(
221                "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
222                [],
223            )?;
224        }
225        if !existing_cols.contains("source_hash") {
226            conn.execute(
227                "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
228                [],
229            )?;
230        }
231
232        // Project memory vectors (virtual table)
233        conn.execute(
234            &format!(
235                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
236                    chunk_id TEXT PRIMARY KEY,
237                    embedding float[{}]
238                )",
239                DEFAULT_EMBEDDING_DIMENSION
240            ),
241            [],
242        )?;
243
244        // File indexing tables (project-scoped)
245        conn.execute(
246            "CREATE TABLE IF NOT EXISTS project_file_index (
247                project_id TEXT NOT NULL,
248                path TEXT NOT NULL,
249                mtime INTEGER NOT NULL,
250                size INTEGER NOT NULL,
251                hash TEXT NOT NULL,
252                indexed_at TEXT NOT NULL,
253                PRIMARY KEY(project_id, path)
254            )",
255            [],
256        )?;
257        conn.execute(
258            "CREATE TABLE IF NOT EXISTS session_file_index (
259                session_id TEXT NOT NULL,
260                path TEXT NOT NULL,
261                mtime INTEGER NOT NULL,
262                size INTEGER NOT NULL,
263                hash TEXT NOT NULL,
264                indexed_at TEXT NOT NULL,
265                PRIMARY KEY(session_id, path)
266            )",
267            [],
268        )?;
269
270        conn.execute(
271            "CREATE TABLE IF NOT EXISTS project_index_status (
272                project_id TEXT PRIMARY KEY,
273                last_indexed_at TEXT,
274                last_total_files INTEGER,
275                last_processed_files INTEGER,
276                last_indexed_files INTEGER,
277                last_skipped_files INTEGER,
278                last_errors INTEGER
279            )",
280            [],
281        )?;
282
283        // Global memory chunks table
284        conn.execute(
285            "CREATE TABLE IF NOT EXISTS global_memory_chunks (
286                id TEXT PRIMARY KEY,
287                content TEXT NOT NULL,
288                source TEXT NOT NULL,
289                created_at TEXT NOT NULL,
290                token_count INTEGER NOT NULL DEFAULT 0,
291                metadata TEXT
292            )",
293            [],
294        )?;
295        let global_existing_cols: HashSet<String> = {
296            let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
297            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
298            rows.collect::<Result<HashSet<_>, _>>()?
299        };
300        if !global_existing_cols.contains("source_path") {
301            conn.execute(
302                "ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
303                [],
304            )?;
305        }
306        if !global_existing_cols.contains("source_mtime") {
307            conn.execute(
308                "ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
309                [],
310            )?;
311        }
312        if !global_existing_cols.contains("source_size") {
313            conn.execute(
314                "ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
315                [],
316            )?;
317        }
318        if !global_existing_cols.contains("source_hash") {
319            conn.execute(
320                "ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
321                [],
322            )?;
323        }
324
325        // Global memory vectors (virtual table)
326        conn.execute(
327            &format!(
328                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
329                    chunk_id TEXT PRIMARY KEY,
330                    embedding float[{}]
331                )",
332                DEFAULT_EMBEDDING_DIMENSION
333            ),
334            [],
335        )?;
336
337        // Memory configuration table
338        conn.execute(
339            "CREATE TABLE IF NOT EXISTS memory_config (
340                project_id TEXT PRIMARY KEY,
341                max_chunks INTEGER NOT NULL DEFAULT 10000,
342                chunk_size INTEGER NOT NULL DEFAULT 512,
343                retrieval_k INTEGER NOT NULL DEFAULT 5,
344                auto_cleanup INTEGER NOT NULL DEFAULT 1,
345                session_retention_days INTEGER NOT NULL DEFAULT 30,
346                token_budget INTEGER NOT NULL DEFAULT 5000,
347                chunk_overlap INTEGER NOT NULL DEFAULT 64,
348                updated_at TEXT NOT NULL
349            )",
350            [],
351        )?;
352
353        // Cleanup log table
354        conn.execute(
355            "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
356                id TEXT PRIMARY KEY,
357                cleanup_type TEXT NOT NULL,
358                tier TEXT NOT NULL,
359                project_id TEXT,
360                session_id TEXT,
361                chunks_deleted INTEGER NOT NULL DEFAULT 0,
362                bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
363                created_at TEXT NOT NULL
364            )",
365            [],
366        )?;
367
368        // Create indexes for better query performance
369        conn.execute(
370            "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
371            [],
372        )?;
373        conn.execute(
374            "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
375            [],
376        )?;
377        conn.execute(
378            "CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
379            [],
380        )?;
381        conn.execute(
382            "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
383            [],
384        )?;
385        conn.execute(
386            "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
387            [],
388        )?;
389        conn.execute(
390            "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
391            [],
392        )?;
393        conn.execute(
394            "CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
395            [],
396        )?;
397        conn.execute(
398            "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
399            [],
400        )?;
401        conn.execute(
402            "CREATE TABLE IF NOT EXISTS global_file_index (
403                path TEXT PRIMARY KEY,
404                mtime INTEGER NOT NULL,
405                size INTEGER NOT NULL,
406                hash TEXT NOT NULL,
407                indexed_at TEXT NOT NULL
408            )",
409            [],
410        )?;
411
412        // Knowledge registry tables (scoped reusable knowledge, separate from raw memory)
413        conn.execute(
414            "CREATE TABLE IF NOT EXISTS knowledge_spaces (
415                id TEXT PRIMARY KEY,
416                scope TEXT NOT NULL,
417                project_id TEXT,
418                namespace TEXT,
419                title TEXT,
420                description TEXT,
421                trust_level TEXT NOT NULL,
422                metadata TEXT,
423                created_at_ms INTEGER NOT NULL,
424                updated_at_ms INTEGER NOT NULL
425            )",
426            [],
427        )?;
428        conn.execute(
429            "CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_spaces_scope_project_namespace
430                ON knowledge_spaces(scope, IFNULL(project_id, ''), IFNULL(namespace, ''))",
431            [],
432        )?;
433        conn.execute(
434            "CREATE TABLE IF NOT EXISTS knowledge_items (
435                id TEXT PRIMARY KEY,
436                space_id TEXT NOT NULL,
437                coverage_key TEXT NOT NULL,
438                dedupe_key TEXT NOT NULL,
439                item_type TEXT NOT NULL,
440                title TEXT NOT NULL,
441                summary TEXT,
442                payload TEXT NOT NULL,
443                trust_level TEXT NOT NULL,
444                status TEXT NOT NULL,
445                run_id TEXT,
446                artifact_refs TEXT NOT NULL,
447                source_memory_ids TEXT NOT NULL,
448                freshness_expires_at_ms INTEGER,
449                metadata TEXT,
450                created_at_ms INTEGER NOT NULL,
451                updated_at_ms INTEGER NOT NULL,
452                FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
453            )",
454            [],
455        )?;
456        conn.execute(
457            "CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_items_space_dedupe
458                ON knowledge_items(space_id, dedupe_key)",
459            [],
460        )?;
461        conn.execute(
462            "CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_coverage
463                ON knowledge_items(space_id, coverage_key)",
464            [],
465        )?;
466        conn.execute(
467            "CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_created
468                ON knowledge_items(space_id, created_at_ms DESC)",
469            [],
470        )?;
471        conn.execute(
472            "CREATE TABLE IF NOT EXISTS knowledge_coverage (
473                coverage_key TEXT NOT NULL,
474                space_id TEXT NOT NULL,
475                latest_item_id TEXT,
476                latest_dedupe_key TEXT,
477                last_seen_at_ms INTEGER NOT NULL,
478                last_promoted_at_ms INTEGER,
479                freshness_expires_at_ms INTEGER,
480                metadata TEXT,
481                PRIMARY KEY(coverage_key, space_id),
482                FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
483            )",
484            [],
485        )?;
486        conn.execute(
487            "CREATE INDEX IF NOT EXISTS idx_knowledge_coverage_space_seen
488                ON knowledge_coverage(space_id, last_seen_at_ms DESC)",
489            [],
490        )?;
491
492        // Global user memory records (FTS-backed baseline retrieval path)
493        conn.execute(
494            "CREATE TABLE IF NOT EXISTS memory_records (
495                id TEXT PRIMARY KEY,
496                user_id TEXT NOT NULL,
497                source_type TEXT NOT NULL,
498                content TEXT NOT NULL,
499                content_hash TEXT NOT NULL,
500                run_id TEXT NOT NULL,
501                session_id TEXT,
502                message_id TEXT,
503                tool_name TEXT,
504                project_tag TEXT,
505                channel_tag TEXT,
506                host_tag TEXT,
507                metadata TEXT,
508                provenance TEXT,
509                redaction_status TEXT NOT NULL,
510                redaction_count INTEGER NOT NULL DEFAULT 0,
511                visibility TEXT NOT NULL DEFAULT 'private',
512                demoted INTEGER NOT NULL DEFAULT 0,
513                score_boost REAL NOT NULL DEFAULT 0.0,
514                created_at_ms INTEGER NOT NULL,
515                updated_at_ms INTEGER NOT NULL,
516                expires_at_ms INTEGER
517            )",
518            [],
519        )?;
520        conn.execute(
521            "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
522                ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
523            [],
524        )?;
525        conn.execute(
526            "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
527                ON memory_records(user_id, created_at_ms DESC)",
528            [],
529        )?;
530        conn.execute(
531            "CREATE INDEX IF NOT EXISTS idx_memory_records_run
532                ON memory_records(run_id)",
533            [],
534        )?;
535        conn.execute(
536            "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
537                id UNINDEXED,
538                user_id UNINDEXED,
539                content
540            )",
541            [],
542        )?;
543        conn.execute(
544            "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
545                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
546            END",
547            [],
548        )?;
549        conn.execute(
550            "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
551                DELETE FROM memory_records_fts WHERE id = old.id;
552            END",
553            [],
554        )?;
555        conn.execute(
556            "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
557                DELETE FROM memory_records_fts WHERE id = old.id;
558                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
559            END",
560            [],
561        )?;
562
563        conn.execute(
564            "CREATE TABLE IF NOT EXISTS memory_nodes (
565                id TEXT PRIMARY KEY,
566                uri TEXT NOT NULL UNIQUE,
567                parent_uri TEXT,
568                node_type TEXT NOT NULL,
569                created_at TEXT NOT NULL,
570                updated_at TEXT NOT NULL,
571                metadata TEXT
572            )",
573            [],
574        )?;
575        conn.execute(
576            "CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
577            [],
578        )?;
579        conn.execute(
580            "CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
581            [],
582        )?;
583
584        conn.execute(
585            "CREATE TABLE IF NOT EXISTS memory_layers (
586                id TEXT PRIMARY KEY,
587                node_id TEXT NOT NULL,
588                layer_type TEXT NOT NULL,
589                content TEXT NOT NULL,
590                token_count INTEGER NOT NULL,
591                embedding_id TEXT,
592                created_at TEXT NOT NULL,
593                source_chunk_id TEXT,
594                FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
595            )",
596            [],
597        )?;
598        conn.execute(
599            "CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
600            [],
601        )?;
602        conn.execute(
603            "CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
604            [],
605        )?;
606
607        conn.execute(
608            "CREATE TABLE IF NOT EXISTS memory_retrieval_state (
609                node_id TEXT PRIMARY KEY,
610                active_layer TEXT NOT NULL DEFAULT 'L0',
611                last_accessed TEXT,
612                access_count INTEGER DEFAULT 0,
613                FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
614            )",
615            [],
616        )?;
617
618        Ok(())
619    }
620
621    /// Validate that sqlite-vec tables are readable.
622    /// This catches legacy/corrupted vector blobs early so startup can recover.
623    pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
624        let conn = self.conn.lock().await;
625        let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
626
627        for table in [
628            "session_memory_vectors",
629            "project_memory_vectors",
630            "global_memory_vectors",
631        ] {
632            let sql = format!("SELECT COUNT(*) FROM {}", table);
633            let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
634
635            // COUNT(*) can pass even when vector chunk blobs are unreadable.
636            // Probe sqlite-vec MATCH execution to surface latent blob corruption.
637            if row_count > 0 {
638                let probe_sql = format!(
639                    "SELECT chunk_id, distance
640                     FROM {}
641                     WHERE embedding MATCH ?1 AND k = 1",
642                    table
643                );
644                let mut stmt = conn.prepare(&probe_sql)?;
645                let mut rows = stmt.query(params![probe_embedding.as_str()])?;
646                let _ = rows.next()?;
647            }
648        }
649        Ok(())
650    }
651
652    fn is_vector_table_error(err: &rusqlite::Error) -> bool {
653        let text = err.to_string().to_lowercase();
654        text.contains("vector blob")
655            || text.contains("chunks iter error")
656            || text.contains("chunks iter")
657            || text.contains("internal sqlite-vec error")
658            || text.contains("insert rowids id")
659            || text.contains("sql logic error")
660            || text.contains("database disk image is malformed")
661            || text.contains("session_memory_vectors")
662            || text.contains("project_memory_vectors")
663            || text.contains("global_memory_vectors")
664            || text.contains("vec0")
665    }
666
667    async fn recreate_vector_tables(&self) -> MemoryResult<()> {
668        let conn = self.conn.lock().await;
669
670        for base in [
671            "session_memory_vectors",
672            "project_memory_vectors",
673            "global_memory_vectors",
674        ] {
675            // Drop vec virtual table and common sqlite-vec shadow tables first.
676            for name in [
677                base.to_string(),
678                format!("{}_chunks", base),
679                format!("{}_info", base),
680                format!("{}_rowids", base),
681                format!("{}_vector_chunks00", base),
682            ] {
683                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
684                conn.execute(&sql, [])?;
685            }
686
687            // Drop any additional shadow tables (e.g. *_vector_chunks01).
688            let like_pattern = format!("{base}_%");
689            let mut stmt = conn.prepare(
690                "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
691            )?;
692            let table_names = stmt
693                .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
694                .collect::<Result<Vec<_>, _>>()?;
695            drop(stmt);
696            for name in table_names {
697                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
698                conn.execute(&sql, [])?;
699            }
700        }
701
702        conn.execute(
703            &format!(
704                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
705                    chunk_id TEXT PRIMARY KEY,
706                    embedding float[{}]
707                )",
708                DEFAULT_EMBEDDING_DIMENSION
709            ),
710            [],
711        )?;
712
713        conn.execute(
714            &format!(
715                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
716                    chunk_id TEXT PRIMARY KEY,
717                    embedding float[{}]
718                )",
719                DEFAULT_EMBEDDING_DIMENSION
720            ),
721            [],
722        )?;
723
724        conn.execute(
725            &format!(
726                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
727                    chunk_id TEXT PRIMARY KEY,
728                    embedding float[{}]
729                )",
730                DEFAULT_EMBEDDING_DIMENSION
731            ),
732            [],
733        )?;
734
735        Ok(())
736    }
737
738    /// Ensure vector tables are readable and recreate them if corruption is detected.
739    /// Returns true when a repair was performed.
740    pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
741        match self.validate_vector_tables().await {
742            Ok(()) => Ok(false),
743            Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
744                tracing::warn!(
745                    "Memory vector tables appear corrupted ({}). Recreating vector tables.",
746                    err
747                );
748                self.recreate_vector_tables().await?;
749                Ok(true)
750            }
751            Err(err) => Err(err),
752        }
753    }
754
755    /// Last-resort runtime repair for malformed DB states: drop user memory tables
756    /// and recreate the schema in-place so new writes can proceed.
757    /// This intentionally clears memory content for the active DB file.
758    pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
759        let table_names = {
760            let conn = self.conn.lock().await;
761            let mut stmt = conn.prepare(
762                "SELECT name FROM sqlite_master
763                 WHERE type='table'
764                   AND name NOT LIKE 'sqlite_%'
765                 ORDER BY name",
766            )?;
767            let names = stmt
768                .query_map([], |row| row.get::<_, String>(0))?
769                .collect::<Result<Vec<_>, _>>()?;
770            names
771        };
772
773        {
774            let conn = self.conn.lock().await;
775            for table in table_names {
776                let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
777                let _ = conn.execute(&sql, []);
778            }
779        }
780
781        self.init_schema().await
782    }
783
784    /// Attempt an immediate vector-table repair when a concrete DB error indicates
785    /// sqlite-vec internals are failing at statement/rowid level.
786    pub async fn try_repair_after_error(
787        &self,
788        err: &crate::types::MemoryError,
789    ) -> MemoryResult<bool> {
790        match err {
791            crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
792                tracing::warn!(
793                    "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
794                    db_err
795                );
796                self.recreate_vector_tables().await?;
797                Ok(true)
798            }
799            _ => Ok(false),
800        }
801    }
802
803    /// Store a chunk with its embedding
804    pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
805        let conn = self.conn.lock().await;
806
807        let (chunks_table, vectors_table) = match chunk.tier {
808            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
809            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
810            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
811        };
812
813        let created_at_str = chunk.created_at.to_rfc3339();
814        let metadata_str = chunk
815            .metadata
816            .as_ref()
817            .map(|m| m.to_string())
818            .unwrap_or_default();
819
820        // Insert chunk
821        match chunk.tier {
822            MemoryTier::Session => {
823                conn.execute(
824                    &format!(
825                        "INSERT INTO {} (
826                            id, content, session_id, project_id, source, created_at, token_count, metadata,
827                            source_path, source_mtime, source_size, source_hash
828                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
829                        chunks_table
830                    ),
831                    params![
832                        chunk.id,
833                        chunk.content,
834                        chunk.session_id.as_ref().unwrap_or(&String::new()),
835                        chunk.project_id,
836                        chunk.source,
837                        created_at_str,
838                        chunk.token_count,
839                        metadata_str,
840                        chunk.source_path.clone(),
841                        chunk.source_mtime,
842                        chunk.source_size,
843                        chunk.source_hash.clone()
844                    ],
845                )?;
846            }
847            MemoryTier::Project => {
848                conn.execute(
849                    &format!(
850                        "INSERT INTO {} (
851                            id, content, project_id, session_id, source, created_at, token_count, metadata,
852                            source_path, source_mtime, source_size, source_hash
853                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
854                        chunks_table
855                    ),
856                    params![
857                        chunk.id,
858                        chunk.content,
859                        chunk.project_id.as_ref().unwrap_or(&String::new()),
860                        chunk.session_id,
861                        chunk.source,
862                        created_at_str,
863                        chunk.token_count,
864                        metadata_str,
865                        chunk.source_path.clone(),
866                        chunk.source_mtime,
867                        chunk.source_size,
868                        chunk.source_hash.clone()
869                    ],
870                )?;
871            }
872            MemoryTier::Global => {
873                conn.execute(
874                    &format!(
875                        "INSERT INTO {} (
876                            id, content, source, created_at, token_count, metadata,
877                            source_path, source_mtime, source_size, source_hash
878                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
879                        chunks_table
880                    ),
881                    params![
882                        chunk.id,
883                        chunk.content,
884                        chunk.source,
885                        created_at_str,
886                        chunk.token_count,
887                        metadata_str,
888                        chunk.source_path.clone(),
889                        chunk.source_mtime,
890                        chunk.source_size,
891                        chunk.source_hash.clone()
892                    ],
893                )?;
894            }
895        }
896
897        // Insert embedding
898        let embedding_json = format!(
899            "[{}]",
900            embedding
901                .iter()
902                .map(|f| f.to_string())
903                .collect::<Vec<_>>()
904                .join(",")
905        );
906        conn.execute(
907            &format!(
908                "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
909                vectors_table
910            ),
911            params![chunk.id, embedding_json],
912        )?;
913
914        Ok(())
915    }
916
917    /// Search for similar chunks
918    pub async fn search_similar(
919        &self,
920        query_embedding: &[f32],
921        tier: MemoryTier,
922        project_id: Option<&str>,
923        session_id: Option<&str>,
924        limit: i64,
925    ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
926        let conn = self.conn.lock().await;
927
928        let (chunks_table, vectors_table) = match tier {
929            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
930            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
931            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
932        };
933
934        let embedding_json = format!(
935            "[{}]",
936            query_embedding
937                .iter()
938                .map(|f| f.to_string())
939                .collect::<Vec<_>>()
940                .join(",")
941        );
942
943        // Build query based on tier and filters
944        let results = match tier {
945            MemoryTier::Session => {
946                if let Some(sid) = session_id {
947                    let sql = format!(
948                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
949                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
950                                v.distance
951                         FROM {} AS v
952                         JOIN {} AS c ON v.chunk_id = c.id
953                         WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
954                         ORDER BY v.distance",
955                        vectors_table, chunks_table
956                    );
957                    let mut stmt = conn.prepare(&sql)?;
958                    let results = stmt
959                        .query_map(params![sid, embedding_json, limit], |row| {
960                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
961                        })?
962                        .collect::<Result<Vec<_>, _>>()?;
963                    results
964                } else if let Some(pid) = project_id {
965                    let sql = format!(
966                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
967                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
968                                v.distance
969                         FROM {} AS v
970                         JOIN {} AS c ON v.chunk_id = c.id
971                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
972                         ORDER BY v.distance",
973                        vectors_table, chunks_table
974                    );
975                    let mut stmt = conn.prepare(&sql)?;
976                    let results = stmt
977                        .query_map(params![pid, embedding_json, limit], |row| {
978                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
979                        })?
980                        .collect::<Result<Vec<_>, _>>()?;
981                    results
982                } else {
983                    let sql = format!(
984                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
985                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
986                                v.distance
987                         FROM {} AS v
988                         JOIN {} AS c ON v.chunk_id = c.id
989                         WHERE v.embedding MATCH ?1 AND k = ?2
990                         ORDER BY v.distance",
991                        vectors_table, chunks_table
992                    );
993                    let mut stmt = conn.prepare(&sql)?;
994                    let results = stmt
995                        .query_map(params![embedding_json, limit], |row| {
996                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
997                        })?
998                        .collect::<Result<Vec<_>, _>>()?;
999                    results
1000                }
1001            }
1002            MemoryTier::Project => {
1003                if let Some(pid) = project_id {
1004                    let sql = format!(
1005                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
1006                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
1007                                v.distance
1008                         FROM {} AS v
1009                         JOIN {} AS c ON v.chunk_id = c.id
1010                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
1011                         ORDER BY v.distance",
1012                        vectors_table, chunks_table
1013                    );
1014                    let mut stmt = conn.prepare(&sql)?;
1015                    let results = stmt
1016                        .query_map(params![pid, embedding_json, limit], |row| {
1017                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1018                        })?
1019                        .collect::<Result<Vec<_>, _>>()?;
1020                    results
1021                } else {
1022                    let sql = format!(
1023                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
1024                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
1025                                v.distance
1026                         FROM {} AS v
1027                         JOIN {} AS c ON v.chunk_id = c.id
1028                         WHERE v.embedding MATCH ?1 AND k = ?2
1029                         ORDER BY v.distance",
1030                        vectors_table, chunks_table
1031                    );
1032                    let mut stmt = conn.prepare(&sql)?;
1033                    let results = stmt
1034                        .query_map(params![embedding_json, limit], |row| {
1035                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1036                        })?
1037                        .collect::<Result<Vec<_>, _>>()?;
1038                    results
1039                }
1040            }
1041            MemoryTier::Global => {
1042                let sql = format!(
1043                    "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
1044                            c.source_path, c.source_mtime, c.source_size, c.source_hash,
1045                            v.distance
1046                     FROM {} AS v
1047                     JOIN {} AS c ON v.chunk_id = c.id
1048                     WHERE v.embedding MATCH ?1 AND k = ?2
1049                     ORDER BY v.distance",
1050                    vectors_table, chunks_table
1051                );
1052                let mut stmt = conn.prepare(&sql)?;
1053                let results = stmt
1054                    .query_map(params![embedding_json, limit], |row| {
1055                        Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
1056                    })?
1057                    .collect::<Result<Vec<_>, _>>()?;
1058                results
1059            }
1060        };
1061
1062        Ok(results)
1063    }
1064
1065    /// Get chunks by session ID
1066    pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1067        let conn = self.conn.lock().await;
1068
1069        let mut stmt = conn.prepare(
1070            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1071                    source_path, source_mtime, source_size, source_hash
1072             FROM session_memory_chunks
1073             WHERE session_id = ?1
1074             ORDER BY created_at DESC",
1075        )?;
1076
1077        let chunks = stmt
1078            .query_map(params![session_id], |row| {
1079                row_to_chunk(row, MemoryTier::Session)
1080            })?
1081            .collect::<Result<Vec<_>, _>>()?;
1082
1083        Ok(chunks)
1084    }
1085
1086    /// Get chunks by project ID
1087    pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1088        let conn = self.conn.lock().await;
1089
1090        let mut stmt = conn.prepare(
1091            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1092                    source_path, source_mtime, source_size, source_hash
1093             FROM project_memory_chunks
1094             WHERE project_id = ?1
1095             ORDER BY created_at DESC",
1096        )?;
1097
1098        let chunks = stmt
1099            .query_map(params![project_id], |row| {
1100                row_to_chunk(row, MemoryTier::Project)
1101            })?
1102            .collect::<Result<Vec<_>, _>>()?;
1103
1104        Ok(chunks)
1105    }
1106
1107    /// Get global chunks
1108    pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
1109        let conn = self.conn.lock().await;
1110
1111        let mut stmt = conn.prepare(
1112            "SELECT id, content, source, created_at, token_count, metadata,
1113                    source_path, source_mtime, source_size, source_hash
1114             FROM global_memory_chunks
1115             ORDER BY created_at DESC
1116             LIMIT ?1",
1117        )?;
1118
1119        let chunks = stmt
1120            .query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
1121            .collect::<Result<Vec<_>, _>>()?;
1122
1123        Ok(chunks)
1124    }
1125
1126    pub async fn global_chunk_exists_by_source_hash(
1127        &self,
1128        source_hash: &str,
1129    ) -> MemoryResult<bool> {
1130        let conn = self.conn.lock().await;
1131        let exists = conn
1132            .query_row(
1133                "SELECT 1 FROM global_memory_chunks WHERE source_hash = ?1 LIMIT 1",
1134                params![source_hash],
1135                |_row| Ok(()),
1136            )
1137            .optional()?
1138            .is_some();
1139        Ok(exists)
1140    }
1141
1142    /// Clear session memory
1143    pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
1144        let conn = self.conn.lock().await;
1145
1146        // Get count before deletion
1147        let count: i64 = conn.query_row(
1148            "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
1149            params![session_id],
1150            |row| row.get(0),
1151        )?;
1152
1153        // Delete vectors first (foreign key constraint)
1154        conn.execute(
1155            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
1156             (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
1157            params![session_id],
1158        )?;
1159
1160        // Delete chunks
1161        conn.execute(
1162            "DELETE FROM session_memory_chunks WHERE session_id = ?1",
1163            params![session_id],
1164        )?;
1165
1166        Ok(count as u64)
1167    }
1168
1169    /// Clear project memory
1170    pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
1171        let conn = self.conn.lock().await;
1172
1173        // Get count before deletion
1174        let count: i64 = conn.query_row(
1175            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1176            params![project_id],
1177            |row| row.get(0),
1178        )?;
1179
1180        // Delete vectors first
1181        conn.execute(
1182            "DELETE FROM project_memory_vectors WHERE chunk_id IN 
1183             (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
1184            params![project_id],
1185        )?;
1186
1187        // Delete chunks
1188        conn.execute(
1189            "DELETE FROM project_memory_chunks WHERE project_id = ?1",
1190            params![project_id],
1191        )?;
1192
1193        Ok(count as u64)
1194    }
1195
1196    /// Clear global memory chunks by source prefix (and matching vectors).
1197    pub async fn clear_global_memory_by_source_prefix(
1198        &self,
1199        source_prefix: &str,
1200    ) -> MemoryResult<u64> {
1201        let conn = self.conn.lock().await;
1202        let like = format!("{}%", source_prefix);
1203
1204        let count: i64 = conn.query_row(
1205            "SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
1206            params![like],
1207            |row| row.get(0),
1208        )?;
1209
1210        conn.execute(
1211            "DELETE FROM global_memory_vectors WHERE chunk_id IN
1212             (SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
1213            params![like],
1214        )?;
1215
1216        conn.execute(
1217            "DELETE FROM global_memory_chunks WHERE source LIKE ?1",
1218            params![like],
1219        )?;
1220
1221        Ok(count as u64)
1222    }
1223
1224    /// Delete a single memory chunk by id within the requested scope.
1225    pub async fn delete_chunk(
1226        &self,
1227        tier: MemoryTier,
1228        chunk_id: &str,
1229        project_id: Option<&str>,
1230        session_id: Option<&str>,
1231    ) -> MemoryResult<u64> {
1232        let conn = self.conn.lock().await;
1233
1234        let deleted = match tier {
1235            MemoryTier::Session => {
1236                let Some(session_id) = session_id else {
1237                    return Err(MemoryError::InvalidConfig(
1238                        "session_id is required to delete session memory chunks".to_string(),
1239                    ));
1240                };
1241                conn.execute(
1242                    "DELETE FROM session_memory_vectors WHERE chunk_id IN
1243                     (SELECT id FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2)",
1244                    params![chunk_id, session_id],
1245                )?;
1246                conn.execute(
1247                    "DELETE FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2",
1248                    params![chunk_id, session_id],
1249                )?
1250            }
1251            MemoryTier::Project => {
1252                let Some(project_id) = project_id else {
1253                    return Err(MemoryError::InvalidConfig(
1254                        "project_id is required to delete project memory chunks".to_string(),
1255                    ));
1256                };
1257                conn.execute(
1258                    "DELETE FROM project_memory_vectors WHERE chunk_id IN
1259                     (SELECT id FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2)",
1260                    params![chunk_id, project_id],
1261                )?;
1262                conn.execute(
1263                    "DELETE FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2",
1264                    params![chunk_id, project_id],
1265                )?
1266            }
1267            MemoryTier::Global => {
1268                conn.execute(
1269                    "DELETE FROM global_memory_vectors WHERE chunk_id IN
1270                     (SELECT id FROM global_memory_chunks WHERE id = ?1)",
1271                    params![chunk_id],
1272                )?;
1273                conn.execute(
1274                    "DELETE FROM global_memory_chunks WHERE id = ?1",
1275                    params![chunk_id],
1276                )?
1277            }
1278        };
1279
1280        Ok(deleted as u64)
1281    }
1282
1283    /// Clear old session memory based on retention policy
1284    pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
1285        let conn = self.conn.lock().await;
1286
1287        let cutoff = Utc::now() - chrono::Duration::days(retention_days);
1288        let cutoff_str = cutoff.to_rfc3339();
1289
1290        // Get count before deletion
1291        let count: i64 = conn.query_row(
1292            "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
1293            params![cutoff_str],
1294            |row| row.get(0),
1295        )?;
1296
1297        // Delete vectors first
1298        conn.execute(
1299            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
1300             (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
1301            params![cutoff_str],
1302        )?;
1303
1304        // Delete chunks
1305        conn.execute(
1306            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1307            params![cutoff_str],
1308        )?;
1309
1310        Ok(count as u64)
1311    }
1312
1313    /// Get or create memory config for a project
1314    pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1315        let conn = self.conn.lock().await;
1316
1317        let result: Option<MemoryConfig> = conn
1318            .query_row(
1319                "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1320                        session_retention_days, token_budget, chunk_overlap
1321                 FROM memory_config WHERE project_id = ?1",
1322                params![project_id],
1323                |row| {
1324                    Ok(MemoryConfig {
1325                        max_chunks: row.get(0)?,
1326                        chunk_size: row.get(1)?,
1327                        retrieval_k: row.get(2)?,
1328                        auto_cleanup: row.get::<_, i64>(3)? != 0,
1329                        session_retention_days: row.get(4)?,
1330                        token_budget: row.get(5)?,
1331                        chunk_overlap: row.get(6)?,
1332                    })
1333                },
1334            )
1335            .optional()?;
1336
1337        match result {
1338            Some(config) => Ok(config),
1339            None => {
1340                // Create default config
1341                let config = MemoryConfig::default();
1342                let updated_at = Utc::now().to_rfc3339();
1343
1344                conn.execute(
1345                    "INSERT INTO memory_config 
1346                     (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1347                      session_retention_days, token_budget, chunk_overlap, updated_at)
1348                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1349                    params![
1350                        project_id,
1351                        config.max_chunks,
1352                        config.chunk_size,
1353                        config.retrieval_k,
1354                        config.auto_cleanup as i64,
1355                        config.session_retention_days,
1356                        config.token_budget,
1357                        config.chunk_overlap,
1358                        updated_at
1359                    ],
1360                )?;
1361
1362                Ok(config)
1363            }
1364        }
1365    }
1366
1367    /// Update memory config for a project
1368    pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1369        let conn = self.conn.lock().await;
1370
1371        let updated_at = Utc::now().to_rfc3339();
1372
1373        conn.execute(
1374            "INSERT OR REPLACE INTO memory_config 
1375             (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1376              session_retention_days, token_budget, chunk_overlap, updated_at)
1377             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1378            params![
1379                project_id,
1380                config.max_chunks,
1381                config.chunk_size,
1382                config.retrieval_k,
1383                config.auto_cleanup as i64,
1384                config.session_retention_days,
1385                config.token_budget,
1386                config.chunk_overlap,
1387                updated_at
1388            ],
1389        )?;
1390
1391        Ok(())
1392    }
1393
1394    /// Insert or update a reusable knowledge space.
1395    pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
1396        let conn = self.conn.lock().await;
1397        conn.execute(
1398            "INSERT OR REPLACE INTO knowledge_spaces
1399             (id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms)
1400             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
1401            params![
1402                space.id,
1403                space.scope.to_string(),
1404                space.project_id,
1405                space.namespace,
1406                space.title,
1407                space.description,
1408                space.trust_level.to_string(),
1409                space.metadata.as_ref().map(|value| value.to_string()),
1410                space.created_at_ms as i64,
1411                space.updated_at_ms as i64,
1412            ],
1413        )?;
1414        Ok(())
1415    }
1416
1417    /// Fetch a knowledge space by ID.
1418    pub async fn get_knowledge_space(
1419        &self,
1420        id: &str,
1421    ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
1422        let conn = self.conn.lock().await;
1423        Ok(
1424            conn.query_row(
1425                "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1426                 FROM knowledge_spaces WHERE id = ?1",
1427                params![id],
1428                row_to_knowledge_space,
1429            )
1430            .optional()?,
1431        )
1432    }
1433
1434    /// List knowledge spaces, optionally filtered by project.
1435    pub async fn list_knowledge_spaces(
1436        &self,
1437        project_id: Option<&str>,
1438    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
1439        let conn = self.conn.lock().await;
1440        let mut stmt = if project_id.is_some() {
1441            conn.prepare(
1442                "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1443                 FROM knowledge_spaces WHERE project_id = ?1 ORDER BY updated_at_ms DESC",
1444            )?
1445        } else {
1446            conn.prepare(
1447                "SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
1448                 FROM knowledge_spaces ORDER BY updated_at_ms DESC",
1449            )?
1450        };
1451        let rows = if let Some(project_id) = project_id {
1452            stmt.query_map(params![project_id], row_to_knowledge_space)?
1453        } else {
1454            stmt.query_map([], row_to_knowledge_space)?
1455        };
1456        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1457    }
1458
1459    /// Insert or update a reusable knowledge item.
1460    pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
1461        let conn = self.conn.lock().await;
1462        conn.execute(
1463            "INSERT OR REPLACE INTO knowledge_items
1464             (id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
1465             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
1466            params![
1467                item.id,
1468                item.space_id,
1469                item.coverage_key,
1470                item.dedupe_key,
1471                item.item_type,
1472                item.title,
1473                item.summary,
1474                item.payload.to_string(),
1475                item.trust_level.to_string(),
1476                item.status.to_string(),
1477                item.run_id,
1478                serde_json::to_string(&item.artifact_refs)?,
1479                serde_json::to_string(&item.source_memory_ids)?,
1480                item.freshness_expires_at_ms.map(|value| value as i64),
1481                item.metadata.as_ref().map(|value| value.to_string()),
1482                item.created_at_ms as i64,
1483                item.updated_at_ms as i64,
1484            ],
1485        )?;
1486        Ok(())
1487    }
1488
1489    /// List knowledge items for a knowledge space.
1490    pub async fn list_knowledge_items(
1491        &self,
1492        space_id: &str,
1493        coverage_key: Option<&str>,
1494    ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
1495        let conn = self.conn.lock().await;
1496        let mut stmt = if coverage_key.is_some() {
1497            conn.prepare(
1498                "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1499                 FROM knowledge_items WHERE space_id = ?1 AND coverage_key = ?2 ORDER BY created_at_ms DESC",
1500            )?
1501        } else {
1502            conn.prepare(
1503                "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1504                 FROM knowledge_items WHERE space_id = ?1 ORDER BY created_at_ms DESC",
1505            )?
1506        };
1507        let rows = if let Some(coverage_key) = coverage_key {
1508            stmt.query_map(params![space_id, coverage_key], row_to_knowledge_item)?
1509        } else {
1510            stmt.query_map(params![space_id], row_to_knowledge_item)?
1511        };
1512        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1513    }
1514
1515    /// Fetch a knowledge item by ID.
1516    pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
1517        let conn = self.conn.lock().await;
1518        Ok(
1519            conn.query_row(
1520                "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1521                 FROM knowledge_items WHERE id = ?1",
1522                params![id],
1523                row_to_knowledge_item,
1524            )
1525            .optional()?,
1526        )
1527    }
1528
1529    /// Promote or retire a knowledge item and update its coverage record atomically.
1530    pub async fn promote_knowledge_item(
1531        &self,
1532        request: &KnowledgePromotionRequest,
1533    ) -> MemoryResult<Option<KnowledgePromotionResult>> {
1534        let mut conn = self.conn.lock().await;
1535        let tx = conn.transaction()?;
1536
1537        let Some(mut item) = tx
1538            .query_row(
1539                "SELECT id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms
1540                 FROM knowledge_items WHERE id = ?1",
1541                params![request.item_id],
1542                row_to_knowledge_item,
1543            )
1544            .optional()? else {
1545            return Ok(None);
1546        };
1547
1548        let previous_status = item.status;
1549        let previous_trust_level = item.trust_level;
1550
1551        if previous_status == KnowledgeItemStatus::Deprecated
1552            && request.target_status != KnowledgeItemStatus::Deprecated
1553        {
1554            return Err(crate::types::MemoryError::InvalidConfig(
1555                "cannot promote a deprecated knowledge item".to_string(),
1556            ));
1557        }
1558
1559        let next_status = request.target_status;
1560        match (previous_status, next_status) {
1561            (KnowledgeItemStatus::Working, KnowledgeItemStatus::Promoted)
1562            | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Promoted)
1563            | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::ApprovedDefault)
1564            | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::ApprovedDefault)
1565            | (KnowledgeItemStatus::Working, KnowledgeItemStatus::Deprecated)
1566            | (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Deprecated)
1567            | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Deprecated) => {}
1568            (KnowledgeItemStatus::Working, KnowledgeItemStatus::ApprovedDefault) => {
1569                return Err(crate::types::MemoryError::InvalidConfig(
1570                    "approved_default requires an intermediate promoted item".to_string(),
1571                ));
1572            }
1573            (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Promoted) => {
1574                return Err(crate::types::MemoryError::InvalidConfig(
1575                    "approved_default items do not downgrade back to promoted".to_string(),
1576                ));
1577            }
1578            (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Working)
1579            | (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Working) => {
1580                return Err(crate::types::MemoryError::InvalidConfig(
1581                    "knowledge items cannot be demoted back to working".to_string(),
1582                ));
1583            }
1584            (KnowledgeItemStatus::Working, KnowledgeItemStatus::Working) => {}
1585            (KnowledgeItemStatus::Deprecated, _) => {}
1586        }
1587
1588        if next_status == KnowledgeItemStatus::ApprovedDefault
1589            && (request.reviewer_id.is_none() || request.approval_id.is_none())
1590        {
1591            return Err(crate::types::MemoryError::InvalidConfig(
1592                "approved_default promotion requires reviewer_id and approval_id".to_string(),
1593            ));
1594        }
1595
1596        let promoted = next_status.is_active()
1597            && (previous_status != next_status || request.freshness_expires_at_ms.is_some());
1598
1599        let mut metadata_obj = item
1600            .metadata
1601            .clone()
1602            .and_then(|value| value.as_object().cloned())
1603            .unwrap_or_default();
1604        metadata_obj.insert(
1605            "promotion".to_string(),
1606            serde_json::json!({
1607                "from_status": previous_status.to_string(),
1608                "to_status": next_status.to_string(),
1609                "promoted_at_ms": request.promoted_at_ms,
1610                "reason": request.reason,
1611                "reviewer_id": request.reviewer_id,
1612                "approval_id": request.approval_id,
1613                "freshness_expires_at_ms": request.freshness_expires_at_ms,
1614            }),
1615        );
1616
1617        item.status = next_status;
1618        if let Some(next_trust) = next_status.as_trust_level() {
1619            item.trust_level = next_trust;
1620        }
1621        if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
1622            item.freshness_expires_at_ms = Some(freshness_expires_at_ms);
1623        }
1624        item.metadata = Some(serde_json::Value::Object(metadata_obj));
1625        item.updated_at_ms = request.promoted_at_ms;
1626        let persisted_item = item.clone();
1627        let item_id = persisted_item.id.clone();
1628        let space_id = persisted_item.space_id.clone();
1629        let coverage_key = persisted_item.coverage_key.clone();
1630        let dedupe_key = persisted_item.dedupe_key.clone();
1631
1632        tx.execute(
1633            "INSERT OR REPLACE INTO knowledge_items
1634             (id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
1635             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
1636            params![
1637                persisted_item.id,
1638                persisted_item.space_id,
1639                persisted_item.coverage_key,
1640                persisted_item.dedupe_key,
1641                persisted_item.item_type,
1642                persisted_item.title,
1643                persisted_item.summary,
1644                persisted_item.payload.to_string(),
1645                persisted_item.trust_level.to_string(),
1646                persisted_item.status.to_string(),
1647                persisted_item.run_id,
1648                serde_json::to_string(&persisted_item.artifact_refs)?,
1649                serde_json::to_string(&persisted_item.source_memory_ids)?,
1650                persisted_item.freshness_expires_at_ms.map(|value| value as i64),
1651                persisted_item.metadata.as_ref().map(|value| value.to_string()),
1652                persisted_item.created_at_ms as i64,
1653                persisted_item.updated_at_ms as i64,
1654            ],
1655        )?;
1656
1657        let mut coverage = tx
1658            .query_row(
1659                "SELECT coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata
1660                 FROM knowledge_coverage WHERE coverage_key = ?1 AND space_id = ?2",
1661                params![coverage_key.as_str(), space_id.as_str()],
1662                row_to_knowledge_coverage,
1663            )
1664            .optional()?
1665            .unwrap_or(KnowledgeCoverageRecord {
1666                coverage_key: coverage_key.clone(),
1667                space_id: space_id.clone(),
1668                latest_item_id: None,
1669                latest_dedupe_key: None,
1670                last_seen_at_ms: request.promoted_at_ms,
1671                last_promoted_at_ms: None,
1672                freshness_expires_at_ms: None,
1673                metadata: None,
1674            });
1675        coverage.latest_item_id = Some(item_id.clone());
1676        coverage.latest_dedupe_key = Some(dedupe_key.clone());
1677        coverage.last_seen_at_ms = request.promoted_at_ms;
1678        if next_status.is_active() {
1679            coverage.last_promoted_at_ms = Some(request.promoted_at_ms);
1680        }
1681        if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
1682            coverage.freshness_expires_at_ms = Some(freshness_expires_at_ms);
1683        }
1684        let mut coverage_metadata = coverage
1685            .metadata
1686            .clone()
1687            .and_then(|value| value.as_object().cloned())
1688            .unwrap_or_default();
1689        coverage_metadata.insert(
1690            "promotion".to_string(),
1691            serde_json::json!({
1692                "item_id": item_id,
1693                "from_status": previous_status.to_string(),
1694                "to_status": next_status.to_string(),
1695                "promoted_at_ms": request.promoted_at_ms,
1696                "reason": request.reason,
1697                "reviewer_id": request.reviewer_id,
1698                "approval_id": request.approval_id,
1699            }),
1700        );
1701        coverage.metadata = Some(serde_json::Value::Object(coverage_metadata));
1702
1703        tx.execute(
1704            "INSERT OR REPLACE INTO knowledge_coverage
1705             (coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
1706             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1707            params![
1708                coverage.coverage_key,
1709                coverage.space_id,
1710                coverage.latest_item_id,
1711                coverage.latest_dedupe_key,
1712                coverage.last_seen_at_ms as i64,
1713                coverage.last_promoted_at_ms.map(|value| value as i64),
1714                coverage.freshness_expires_at_ms.map(|value| value as i64),
1715                coverage.metadata.as_ref().map(|value| value.to_string()),
1716            ],
1717        )?;
1718
1719        tx.commit()?;
1720        Ok(Some(KnowledgePromotionResult {
1721            previous_status,
1722            previous_trust_level,
1723            promoted,
1724            item: persisted_item,
1725            coverage,
1726        }))
1727    }
1728
1729    /// Insert or update a coverage record for a reusable knowledge key.
1730    pub async fn upsert_knowledge_coverage(
1731        &self,
1732        coverage: &KnowledgeCoverageRecord,
1733    ) -> MemoryResult<()> {
1734        let conn = self.conn.lock().await;
1735        conn.execute(
1736            "INSERT OR REPLACE INTO knowledge_coverage
1737             (coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
1738             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1739            params![
1740                coverage.coverage_key,
1741                coverage.space_id,
1742                coverage.latest_item_id,
1743                coverage.latest_dedupe_key,
1744                coverage.last_seen_at_ms as i64,
1745                coverage.last_promoted_at_ms.map(|value| value as i64),
1746                coverage.freshness_expires_at_ms.map(|value| value as i64),
1747                coverage.metadata.as_ref().map(|value| value.to_string()),
1748            ],
1749        )?;
1750        Ok(())
1751    }
1752
1753    /// Fetch a coverage row for a key and space.
1754    pub async fn get_knowledge_coverage(
1755        &self,
1756        coverage_key: &str,
1757        space_id: &str,
1758    ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
1759        let conn = self.conn.lock().await;
1760        Ok(
1761            conn.query_row(
1762                "SELECT coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata
1763                 FROM knowledge_coverage WHERE coverage_key = ?1 AND space_id = ?2",
1764                params![coverage_key, space_id],
1765                row_to_knowledge_coverage,
1766            )
1767            .optional()?,
1768        )
1769    }
1770
1771    /// Get memory statistics
1772    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1773        let conn = self.conn.lock().await;
1774
1775        // Count chunks
1776        let session_chunks: i64 =
1777            conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1778                row.get(0)
1779            })?;
1780
1781        let project_chunks: i64 =
1782            conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1783                row.get(0)
1784            })?;
1785
1786        let global_chunks: i64 =
1787            conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1788                row.get(0)
1789            })?;
1790
1791        // Calculate sizes
1792        let session_bytes: i64 = conn.query_row(
1793            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1794            [],
1795            |row| row.get(0),
1796        )?;
1797
1798        let project_bytes: i64 = conn.query_row(
1799            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1800            [],
1801            |row| row.get(0),
1802        )?;
1803
1804        let global_bytes: i64 = conn.query_row(
1805            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1806            [],
1807            |row| row.get(0),
1808        )?;
1809
1810        // Get last cleanup
1811        let last_cleanup: Option<String> = conn
1812            .query_row(
1813                "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1814                [],
1815                |row| row.get(0),
1816            )
1817            .optional()?;
1818
1819        let last_cleanup = last_cleanup.and_then(|s| {
1820            DateTime::parse_from_rfc3339(&s)
1821                .ok()
1822                .map(|dt| dt.with_timezone(&Utc))
1823        });
1824
1825        // Get file size
1826        let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1827
1828        Ok(MemoryStats {
1829            total_chunks: session_chunks + project_chunks + global_chunks,
1830            session_chunks,
1831            project_chunks,
1832            global_chunks,
1833            total_bytes: session_bytes + project_bytes + global_bytes,
1834            session_bytes,
1835            project_bytes,
1836            global_bytes,
1837            file_size,
1838            last_cleanup,
1839        })
1840    }
1841
1842    /// Log cleanup operation
1843    pub async fn log_cleanup(
1844        &self,
1845        cleanup_type: &str,
1846        tier: MemoryTier,
1847        project_id: Option<&str>,
1848        session_id: Option<&str>,
1849        chunks_deleted: i64,
1850        bytes_reclaimed: i64,
1851    ) -> MemoryResult<()> {
1852        let conn = self.conn.lock().await;
1853
1854        let id = uuid::Uuid::new_v4().to_string();
1855        let created_at = Utc::now().to_rfc3339();
1856
1857        conn.execute(
1858            "INSERT INTO memory_cleanup_log 
1859             (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1860             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1861            params![
1862                id,
1863                cleanup_type,
1864                tier.to_string(),
1865                project_id,
1866                session_id,
1867                chunks_deleted,
1868                bytes_reclaimed,
1869                created_at
1870            ],
1871        )?;
1872
1873        Ok(())
1874    }
1875
1876    /// Vacuum the database to reclaim space
1877    pub async fn vacuum(&self) -> MemoryResult<()> {
1878        let conn = self.conn.lock().await;
1879        conn.execute("VACUUM", [])?;
1880        Ok(())
1881    }
1882
1883    // ---------------------------------------------------------------------
1884    // Project file indexing helpers
1885    // ---------------------------------------------------------------------
1886
1887    pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1888        let conn = self.conn.lock().await;
1889        let n: i64 = conn.query_row(
1890            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1891            params![project_id],
1892            |row| row.get(0),
1893        )?;
1894        Ok(n)
1895    }
1896
1897    pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1898        let conn = self.conn.lock().await;
1899        let exists: Option<i64> = conn
1900            .query_row(
1901                "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1902                params![project_id],
1903                |row| row.get(0),
1904            )
1905            .optional()?;
1906        Ok(exists.is_some())
1907    }
1908
1909    pub async fn get_file_index_entry(
1910        &self,
1911        project_id: &str,
1912        path: &str,
1913    ) -> MemoryResult<Option<(i64, i64, String)>> {
1914        let conn = self.conn.lock().await;
1915        let row: Option<(i64, i64, String)> = conn
1916            .query_row(
1917                "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1918                params![project_id, path],
1919                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1920            )
1921            .optional()?;
1922        Ok(row)
1923    }
1924
1925    pub async fn upsert_file_index_entry(
1926        &self,
1927        project_id: &str,
1928        path: &str,
1929        mtime: i64,
1930        size: i64,
1931        hash: &str,
1932    ) -> MemoryResult<()> {
1933        let conn = self.conn.lock().await;
1934        let indexed_at = Utc::now().to_rfc3339();
1935        conn.execute(
1936            "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1937             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1938             ON CONFLICT(project_id, path) DO UPDATE SET
1939                mtime = excluded.mtime,
1940                size = excluded.size,
1941                hash = excluded.hash,
1942                indexed_at = excluded.indexed_at",
1943            params![project_id, path, mtime, size, hash, indexed_at],
1944        )?;
1945        Ok(())
1946    }
1947
1948    pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1949        let conn = self.conn.lock().await;
1950        conn.execute(
1951            "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1952            params![project_id, path],
1953        )?;
1954        Ok(())
1955    }
1956
1957    pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1958        let conn = self.conn.lock().await;
1959        let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1960        let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1961        Ok(rows.collect::<Result<Vec<_>, _>>()?)
1962    }
1963
1964    pub async fn delete_project_file_chunks_by_path(
1965        &self,
1966        project_id: &str,
1967        source_path: &str,
1968    ) -> MemoryResult<(i64, i64)> {
1969        let conn = self.conn.lock().await;
1970
1971        let chunks_deleted: i64 = conn.query_row(
1972            "SELECT COUNT(*) FROM project_memory_chunks
1973             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1974            params![project_id, source_path],
1975            |row| row.get(0),
1976        )?;
1977
1978        let bytes_estimated: i64 = conn.query_row(
1979            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1980             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1981            params![project_id, source_path],
1982            |row| row.get(0),
1983        )?;
1984
1985        // Delete vectors first (keep order consistent with other clears)
1986        conn.execute(
1987            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1988             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1989            params![project_id, source_path],
1990        )?;
1991
1992        conn.execute(
1993            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1994            params![project_id, source_path],
1995        )?;
1996
1997        Ok((chunks_deleted, bytes_estimated))
1998    }
1999
2000    pub async fn get_import_index_entry(
2001        &self,
2002        tier: MemoryTier,
2003        session_id: Option<&str>,
2004        project_id: Option<&str>,
2005        path: &str,
2006    ) -> MemoryResult<Option<(i64, i64, String)>> {
2007        let conn = self.conn.lock().await;
2008        let row = match tier {
2009            MemoryTier::Session => {
2010                let session_id = require_scope_id(tier, session_id)?;
2011                conn.query_row(
2012                    "SELECT mtime, size, hash FROM session_file_index WHERE session_id = ?1 AND path = ?2",
2013                    params![session_id, path],
2014                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2015                )
2016                .optional()?
2017            }
2018            MemoryTier::Project => {
2019                let project_id = require_scope_id(tier, project_id)?;
2020                conn.query_row(
2021                    "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
2022                    params![project_id, path],
2023                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2024                )
2025                .optional()?
2026            }
2027            MemoryTier::Global => conn
2028                .query_row(
2029                    "SELECT mtime, size, hash FROM global_file_index WHERE path = ?1",
2030                    params![path],
2031                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2032                )
2033                .optional()?,
2034        };
2035        Ok(row)
2036    }
2037
2038    pub async fn upsert_import_index_entry(
2039        &self,
2040        tier: MemoryTier,
2041        session_id: Option<&str>,
2042        project_id: Option<&str>,
2043        path: &str,
2044        mtime: i64,
2045        size: i64,
2046        hash: &str,
2047    ) -> MemoryResult<()> {
2048        let conn = self.conn.lock().await;
2049        let indexed_at = Utc::now().to_rfc3339();
2050        match tier {
2051            MemoryTier::Session => {
2052                let session_id = require_scope_id(tier, session_id)?;
2053                conn.execute(
2054                    "INSERT INTO session_file_index (session_id, path, mtime, size, hash, indexed_at)
2055                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
2056                     ON CONFLICT(session_id, path) DO UPDATE SET
2057                        mtime = excluded.mtime,
2058                        size = excluded.size,
2059                        hash = excluded.hash,
2060                        indexed_at = excluded.indexed_at",
2061                    params![session_id, path, mtime, size, hash, indexed_at],
2062                )?;
2063            }
2064            MemoryTier::Project => {
2065                let project_id = require_scope_id(tier, project_id)?;
2066                conn.execute(
2067                    "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
2068                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
2069                     ON CONFLICT(project_id, path) DO UPDATE SET
2070                        mtime = excluded.mtime,
2071                        size = excluded.size,
2072                        hash = excluded.hash,
2073                        indexed_at = excluded.indexed_at",
2074                    params![project_id, path, mtime, size, hash, indexed_at],
2075                )?;
2076            }
2077            MemoryTier::Global => {
2078                conn.execute(
2079                    "INSERT INTO global_file_index (path, mtime, size, hash, indexed_at)
2080                     VALUES (?1, ?2, ?3, ?4, ?5)
2081                     ON CONFLICT(path) DO UPDATE SET
2082                        mtime = excluded.mtime,
2083                        size = excluded.size,
2084                        hash = excluded.hash,
2085                        indexed_at = excluded.indexed_at",
2086                    params![path, mtime, size, hash, indexed_at],
2087                )?;
2088            }
2089        }
2090        Ok(())
2091    }
2092
2093    pub async fn list_import_index_paths(
2094        &self,
2095        tier: MemoryTier,
2096        session_id: Option<&str>,
2097        project_id: Option<&str>,
2098    ) -> MemoryResult<Vec<String>> {
2099        let conn = self.conn.lock().await;
2100        let rows = match tier {
2101            MemoryTier::Session => {
2102                let session_id = require_scope_id(tier, session_id)?;
2103                let mut stmt =
2104                    conn.prepare("SELECT path FROM session_file_index WHERE session_id = ?1")?;
2105                let rows = stmt.query_map(params![session_id], |row| row.get::<_, String>(0))?;
2106                rows.collect::<Result<Vec<_>, _>>()?
2107            }
2108            MemoryTier::Project => {
2109                let project_id = require_scope_id(tier, project_id)?;
2110                let mut stmt =
2111                    conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
2112                let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
2113                rows.collect::<Result<Vec<_>, _>>()?
2114            }
2115            MemoryTier::Global => {
2116                let mut stmt = conn.prepare("SELECT path FROM global_file_index")?;
2117                let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
2118                rows.collect::<Result<Vec<_>, _>>()?
2119            }
2120        };
2121        Ok(rows)
2122    }
2123
2124    pub async fn delete_import_index_entry(
2125        &self,
2126        tier: MemoryTier,
2127        session_id: Option<&str>,
2128        project_id: Option<&str>,
2129        path: &str,
2130    ) -> MemoryResult<()> {
2131        let conn = self.conn.lock().await;
2132        match tier {
2133            MemoryTier::Session => {
2134                let session_id = require_scope_id(tier, session_id)?;
2135                conn.execute(
2136                    "DELETE FROM session_file_index WHERE session_id = ?1 AND path = ?2",
2137                    params![session_id, path],
2138                )?;
2139            }
2140            MemoryTier::Project => {
2141                let project_id = require_scope_id(tier, project_id)?;
2142                conn.execute(
2143                    "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
2144                    params![project_id, path],
2145                )?;
2146            }
2147            MemoryTier::Global => {
2148                conn.execute(
2149                    "DELETE FROM global_file_index WHERE path = ?1",
2150                    params![path],
2151                )?;
2152            }
2153        }
2154        Ok(())
2155    }
2156
2157    pub async fn delete_file_chunks_by_path(
2158        &self,
2159        tier: MemoryTier,
2160        session_id: Option<&str>,
2161        project_id: Option<&str>,
2162        source_path: &str,
2163    ) -> MemoryResult<(i64, i64)> {
2164        let conn = self.conn.lock().await;
2165        let result = match tier {
2166            MemoryTier::Session => {
2167                let session_id = require_scope_id(tier, session_id)?;
2168                let chunks_deleted: i64 = conn.query_row(
2169                    "SELECT COUNT(*) FROM session_memory_chunks
2170                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2171                    params![session_id, source_path],
2172                    |row| row.get(0),
2173                )?;
2174                let bytes_estimated: i64 = conn.query_row(
2175                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
2176                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2177                    params![session_id, source_path],
2178                    |row| row.get(0),
2179                )?;
2180                conn.execute(
2181                    "DELETE FROM session_memory_vectors WHERE chunk_id IN
2182                     (SELECT id FROM session_memory_chunks WHERE session_id = ?1 AND source = 'file' AND source_path = ?2)",
2183                    params![session_id, source_path],
2184                )?;
2185                conn.execute(
2186                    "DELETE FROM session_memory_chunks
2187                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
2188                    params![session_id, source_path],
2189                )?;
2190                (chunks_deleted, bytes_estimated)
2191            }
2192            MemoryTier::Project => {
2193                let project_id = require_scope_id(tier, project_id)?;
2194                let chunks_deleted: i64 = conn.query_row(
2195                    "SELECT COUNT(*) FROM project_memory_chunks
2196                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2197                    params![project_id, source_path],
2198                    |row| row.get(0),
2199                )?;
2200                let bytes_estimated: i64 = conn.query_row(
2201                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
2202                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2203                    params![project_id, source_path],
2204                    |row| row.get(0),
2205                )?;
2206                conn.execute(
2207                    "DELETE FROM project_memory_vectors WHERE chunk_id IN
2208                     (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
2209                    params![project_id, source_path],
2210                )?;
2211                conn.execute(
2212                    "DELETE FROM project_memory_chunks
2213                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
2214                    params![project_id, source_path],
2215                )?;
2216                (chunks_deleted, bytes_estimated)
2217            }
2218            MemoryTier::Global => {
2219                let chunks_deleted: i64 = conn.query_row(
2220                    "SELECT COUNT(*) FROM global_memory_chunks
2221                     WHERE source = 'file' AND source_path = ?1",
2222                    params![source_path],
2223                    |row| row.get(0),
2224                )?;
2225                let bytes_estimated: i64 = conn.query_row(
2226                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
2227                     WHERE source = 'file' AND source_path = ?1",
2228                    params![source_path],
2229                    |row| row.get(0),
2230                )?;
2231                conn.execute(
2232                    "DELETE FROM global_memory_vectors WHERE chunk_id IN
2233                     (SELECT id FROM global_memory_chunks WHERE source = 'file' AND source_path = ?1)",
2234                    params![source_path],
2235                )?;
2236                conn.execute(
2237                    "DELETE FROM global_memory_chunks
2238                     WHERE source = 'file' AND source_path = ?1",
2239                    params![source_path],
2240                )?;
2241                (chunks_deleted, bytes_estimated)
2242            }
2243        };
2244        Ok(result)
2245    }
2246
2247    pub async fn upsert_project_index_status(
2248        &self,
2249        project_id: &str,
2250        total_files: i64,
2251        processed_files: i64,
2252        indexed_files: i64,
2253        skipped_files: i64,
2254        errors: i64,
2255    ) -> MemoryResult<()> {
2256        let conn = self.conn.lock().await;
2257        let last_indexed_at = Utc::now().to_rfc3339();
2258        conn.execute(
2259            "INSERT INTO project_index_status (
2260                project_id, last_indexed_at, last_total_files, last_processed_files,
2261                last_indexed_files, last_skipped_files, last_errors
2262             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
2263             ON CONFLICT(project_id) DO UPDATE SET
2264                last_indexed_at = excluded.last_indexed_at,
2265                last_total_files = excluded.last_total_files,
2266                last_processed_files = excluded.last_processed_files,
2267                last_indexed_files = excluded.last_indexed_files,
2268                last_skipped_files = excluded.last_skipped_files,
2269                last_errors = excluded.last_errors",
2270            params![
2271                project_id,
2272                last_indexed_at,
2273                total_files,
2274                processed_files,
2275                indexed_files,
2276                skipped_files,
2277                errors
2278            ],
2279        )?;
2280        Ok(())
2281    }
2282
2283    pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
2284        let conn = self.conn.lock().await;
2285
2286        let project_chunks: i64 = conn.query_row(
2287            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
2288            params![project_id],
2289            |row| row.get(0),
2290        )?;
2291
2292        let project_bytes: i64 = conn.query_row(
2293            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
2294            params![project_id],
2295            |row| row.get(0),
2296        )?;
2297
2298        let file_index_chunks: i64 = conn.query_row(
2299            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2300            params![project_id],
2301            |row| row.get(0),
2302        )?;
2303
2304        let file_index_bytes: i64 = conn.query_row(
2305            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2306            params![project_id],
2307            |row| row.get(0),
2308        )?;
2309
2310        let indexed_files: i64 = conn.query_row(
2311            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
2312            params![project_id],
2313            |row| row.get(0),
2314        )?;
2315
2316        let status_row: Option<ProjectIndexStatusRow> =
2317            conn
2318                .query_row(
2319                    "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
2320                     FROM project_index_status WHERE project_id = ?1",
2321                    params![project_id],
2322                    |row| {
2323                        Ok((
2324                            row.get(0)?,
2325                            row.get(1)?,
2326                            row.get(2)?,
2327                            row.get(3)?,
2328                            row.get(4)?,
2329                            row.get(5)?,
2330                        ))
2331                    },
2332                )
2333                .optional()?;
2334
2335        let (
2336            last_indexed_at,
2337            last_total_files,
2338            last_processed_files,
2339            last_indexed_files,
2340            last_skipped_files,
2341            last_errors,
2342        ) = status_row.unwrap_or((None, None, None, None, None, None));
2343
2344        let last_indexed_at = last_indexed_at.and_then(|s| {
2345            DateTime::parse_from_rfc3339(&s)
2346                .ok()
2347                .map(|dt| dt.with_timezone(&Utc))
2348        });
2349
2350        Ok(ProjectMemoryStats {
2351            project_id: project_id.to_string(),
2352            project_chunks,
2353            project_bytes,
2354            file_index_chunks,
2355            file_index_bytes,
2356            indexed_files,
2357            last_indexed_at,
2358            last_total_files,
2359            last_processed_files,
2360            last_indexed_files,
2361            last_skipped_files,
2362            last_errors,
2363        })
2364    }
2365
2366    pub async fn clear_project_file_index(
2367        &self,
2368        project_id: &str,
2369        vacuum: bool,
2370    ) -> MemoryResult<ClearFileIndexResult> {
2371        let conn = self.conn.lock().await;
2372
2373        let chunks_deleted: i64 = conn.query_row(
2374            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2375            params![project_id],
2376            |row| row.get(0),
2377        )?;
2378
2379        let bytes_estimated: i64 = conn.query_row(
2380            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2381            params![project_id],
2382            |row| row.get(0),
2383        )?;
2384
2385        // Delete vectors first
2386        conn.execute(
2387            "DELETE FROM project_memory_vectors WHERE chunk_id IN
2388             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
2389            params![project_id],
2390        )?;
2391
2392        // Delete file chunks
2393        conn.execute(
2394            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
2395            params![project_id],
2396        )?;
2397
2398        // Clear file index tracking + status
2399        conn.execute(
2400            "DELETE FROM project_file_index WHERE project_id = ?1",
2401            params![project_id],
2402        )?;
2403        conn.execute(
2404            "DELETE FROM project_index_status WHERE project_id = ?1",
2405            params![project_id],
2406        )?;
2407
2408        drop(conn); // release lock before VACUUM (which needs exclusive access)
2409
2410        if vacuum {
2411            self.vacuum().await?;
2412        }
2413
2414        Ok(ClearFileIndexResult {
2415            chunks_deleted,
2416            bytes_estimated,
2417            did_vacuum: vacuum,
2418        })
2419    }
2420
2421    // ------------------------------------------------------------------
2422    // Memory hygiene
2423    // ------------------------------------------------------------------
2424
2425    /// Delete session memory chunks older than `retention_days` days.
2426    ///
2427    /// Also removes orphaned vector entries for the deleted chunks so the
2428    /// sqlite-vec virtual table stays consistent.
2429    ///
2430    /// Returns the number of chunk rows deleted.
2431    /// If `retention_days` is 0 hygiene is disabled and this returns Ok(0).
2432    pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
2433        if retention_days == 0 {
2434            return Ok(0);
2435        }
2436
2437        let conn = self.conn.lock().await;
2438
2439        // WAL is already active (set in new()) — no need to set it again here.
2440        let cutoff =
2441            (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
2442
2443        // Remove orphaned vector entries first (chunk_id FK would dangle otherwise)
2444        conn.execute(
2445            "DELETE FROM session_memory_vectors
2446             WHERE chunk_id IN (
2447                 SELECT id FROM session_memory_chunks WHERE created_at < ?1
2448             )",
2449            params![cutoff],
2450        )?;
2451
2452        let deleted = conn.execute(
2453            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
2454            params![cutoff],
2455        )?;
2456
2457        if deleted > 0 {
2458            tracing::info!(
2459                retention_days,
2460                deleted,
2461                "memory hygiene: pruned old session chunks"
2462            );
2463        }
2464
2465        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2466        Ok(deleted as u64)
2467    }
2468
2469    /// Run scheduled hygiene: read `session_retention_days` from `memory_config`
2470    /// (falling back to `env_override` if provided) and prune stale session chunks.
2471    ///
2472    /// Returns `Ok(chunks_deleted)`. This method is intentionally best-effort —
2473    /// callers should log errors and continue.
2474    pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
2475        // Prefer the env override, fall back to the DB config for the null project.
2476        let retention_days = if env_override_days > 0 {
2477            env_override_days
2478        } else {
2479            // Try to read the global (project_id = '__global__') config if present.
2480            let conn = self.conn.lock().await;
2481            let days: Option<i64> = conn
2482                .query_row(
2483                    "SELECT session_retention_days FROM memory_config
2484                     WHERE project_id = '__global__' LIMIT 1",
2485                    [],
2486                    |row| row.get(0),
2487                )
2488                .ok();
2489            drop(conn);
2490            days.unwrap_or(30) as u32
2491        };
2492
2493        self.prune_old_session_chunks(retention_days).await
2494    }
2495
2496    pub async fn put_global_memory_record(
2497        &self,
2498        record: &GlobalMemoryRecord,
2499    ) -> MemoryResult<GlobalMemoryWriteResult> {
2500        let conn = self.conn.lock().await;
2501
2502        let existing: Option<String> = conn
2503            .query_row(
2504                "SELECT id FROM memory_records
2505                 WHERE user_id = ?1
2506                   AND source_type = ?2
2507                   AND content_hash = ?3
2508                   AND run_id = ?4
2509                   AND IFNULL(session_id, '') = IFNULL(?5, '')
2510                   AND IFNULL(message_id, '') = IFNULL(?6, '')
2511                   AND IFNULL(tool_name, '') = IFNULL(?7, '')
2512                 LIMIT 1",
2513                params![
2514                    record.user_id,
2515                    record.source_type,
2516                    record.content_hash,
2517                    record.run_id,
2518                    record.session_id,
2519                    record.message_id,
2520                    record.tool_name
2521                ],
2522                |row| row.get(0),
2523            )
2524            .optional()?;
2525
2526        if let Some(id) = existing {
2527            return Ok(GlobalMemoryWriteResult {
2528                id,
2529                stored: false,
2530                deduped: true,
2531            });
2532        }
2533
2534        let metadata = record
2535            .metadata
2536            .as_ref()
2537            .map(ToString::to_string)
2538            .unwrap_or_default();
2539        let provenance = record
2540            .provenance
2541            .as_ref()
2542            .map(ToString::to_string)
2543            .unwrap_or_default();
2544        conn.execute(
2545            "INSERT INTO memory_records(
2546                id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
2547                project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
2548                visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
2549            ) VALUES (
2550                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
2551                ?10, ?11, ?12, ?13, ?14, ?15, ?16,
2552                ?17, ?18, ?19, ?20, ?21, ?22
2553            )",
2554            params![
2555                record.id,
2556                record.user_id,
2557                record.source_type,
2558                record.content,
2559                record.content_hash,
2560                record.run_id,
2561                record.session_id,
2562                record.message_id,
2563                record.tool_name,
2564                record.project_tag,
2565                record.channel_tag,
2566                record.host_tag,
2567                metadata,
2568                provenance,
2569                record.redaction_status,
2570                i64::from(record.redaction_count),
2571                record.visibility,
2572                if record.demoted { 1i64 } else { 0i64 },
2573                record.score_boost,
2574                record.created_at_ms as i64,
2575                record.updated_at_ms as i64,
2576                record.expires_at_ms.map(|v| v as i64),
2577            ],
2578        )?;
2579
2580        Ok(GlobalMemoryWriteResult {
2581            id: record.id.clone(),
2582            stored: true,
2583            deduped: false,
2584        })
2585    }
2586
2587    #[allow(clippy::too_many_arguments)]
2588    pub async fn search_global_memory(
2589        &self,
2590        user_id: &str,
2591        query: &str,
2592        limit: i64,
2593        project_tag: Option<&str>,
2594        channel_tag: Option<&str>,
2595        host_tag: Option<&str>,
2596    ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
2597        let conn = self.conn.lock().await;
2598        let now_ms = chrono::Utc::now().timestamp_millis();
2599        let mut hits = Vec::new();
2600
2601        let fts_query = build_fts_query(query);
2602        let search_limit = limit.clamp(1, 100);
2603        let maybe_rows = conn.prepare(
2604            "SELECT
2605                m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
2606                m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
2607                m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
2608                m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
2609                bm25(memory_records_fts) AS rank
2610             FROM memory_records_fts
2611             JOIN memory_records m ON m.id = memory_records_fts.id
2612             WHERE memory_records_fts MATCH ?1
2613               AND m.user_id = ?2
2614               AND m.demoted = 0
2615               AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
2616               AND (?4 IS NULL OR m.project_tag = ?4)
2617               AND (?5 IS NULL OR m.channel_tag = ?5)
2618               AND (?6 IS NULL OR m.host_tag = ?6)
2619             ORDER BY rank ASC
2620             LIMIT ?7"
2621        );
2622
2623        if let Ok(mut stmt) = maybe_rows {
2624            let rows = stmt.query_map(
2625                params![
2626                    fts_query,
2627                    user_id,
2628                    now_ms,
2629                    project_tag,
2630                    channel_tag,
2631                    host_tag,
2632                    search_limit
2633                ],
2634                |row| {
2635                    let record = row_to_global_record(row)?;
2636                    let rank = row.get::<_, f64>(22)?;
2637                    let score = 1.0 / (1.0 + rank.max(0.0));
2638                    Ok(GlobalMemorySearchHit { record, score })
2639                },
2640            )?;
2641            for row in rows {
2642                hits.push(row?);
2643            }
2644        }
2645
2646        if !hits.is_empty() {
2647            return Ok(hits);
2648        }
2649
2650        let like = format!("%{}%", query.trim());
2651        let mut stmt = conn.prepare(
2652            "SELECT
2653                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2654                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2655                redaction_status, redaction_count, visibility, demoted, score_boost,
2656                created_at_ms, updated_at_ms, expires_at_ms
2657             FROM memory_records
2658             WHERE user_id = ?1
2659               AND demoted = 0
2660               AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
2661               AND (?3 IS NULL OR project_tag = ?3)
2662               AND (?4 IS NULL OR channel_tag = ?4)
2663               AND (?5 IS NULL OR host_tag = ?5)
2664               AND (?6 = '' OR content LIKE ?7)
2665             ORDER BY created_at_ms DESC
2666             LIMIT ?8",
2667        )?;
2668        let rows = stmt.query_map(
2669            params![
2670                user_id,
2671                now_ms,
2672                project_tag,
2673                channel_tag,
2674                host_tag,
2675                query.trim(),
2676                like,
2677                search_limit
2678            ],
2679            |row| {
2680                let record = row_to_global_record(row)?;
2681                Ok(GlobalMemorySearchHit {
2682                    record,
2683                    score: 0.25,
2684                })
2685            },
2686        )?;
2687        for row in rows {
2688            hits.push(row?);
2689        }
2690
2691        Ok(hits)
2692    }
2693
2694    pub async fn list_global_memory(
2695        &self,
2696        user_id: &str,
2697        q: Option<&str>,
2698        project_tag: Option<&str>,
2699        channel_tag: Option<&str>,
2700        limit: i64,
2701        offset: i64,
2702    ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
2703        let conn = self.conn.lock().await;
2704        let query = q.unwrap_or("").trim();
2705        let like = format!("%{}%", query);
2706        let mut stmt = conn.prepare(
2707            "SELECT
2708                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2709                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2710                redaction_status, redaction_count, visibility, demoted, score_boost,
2711                created_at_ms, updated_at_ms, expires_at_ms
2712             FROM memory_records
2713             WHERE user_id = ?1
2714               AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
2715               AND (?4 IS NULL OR project_tag = ?4)
2716               AND (?5 IS NULL OR channel_tag = ?5)
2717             ORDER BY created_at_ms DESC
2718             LIMIT ?6 OFFSET ?7",
2719        )?;
2720        let rows = stmt.query_map(
2721            params![
2722                user_id,
2723                query,
2724                like,
2725                project_tag,
2726                channel_tag,
2727                limit.clamp(1, 1000),
2728                offset.max(0)
2729            ],
2730            row_to_global_record,
2731        )?;
2732        let mut out = Vec::new();
2733        for row in rows {
2734            out.push(row?);
2735        }
2736        Ok(out)
2737    }
2738
2739    pub async fn set_global_memory_visibility(
2740        &self,
2741        id: &str,
2742        visibility: &str,
2743        demoted: bool,
2744    ) -> MemoryResult<bool> {
2745        let conn = self.conn.lock().await;
2746        let now_ms = chrono::Utc::now().timestamp_millis();
2747        let changed = conn.execute(
2748            "UPDATE memory_records
2749             SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
2750             WHERE id = ?1",
2751            params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
2752        )?;
2753        Ok(changed > 0)
2754    }
2755
2756    pub async fn update_global_memory_context(
2757        &self,
2758        id: &str,
2759        visibility: &str,
2760        demoted: bool,
2761        metadata: Option<&serde_json::Value>,
2762        provenance: Option<&serde_json::Value>,
2763    ) -> MemoryResult<bool> {
2764        let conn = self.conn.lock().await;
2765        let now_ms = chrono::Utc::now().timestamp_millis();
2766        let metadata = metadata.map(ToString::to_string).unwrap_or_default();
2767        let provenance = provenance.map(ToString::to_string).unwrap_or_default();
2768        let changed = conn.execute(
2769            "UPDATE memory_records
2770             SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
2771             WHERE id = ?1",
2772            params![
2773                id,
2774                visibility,
2775                if demoted { 1i64 } else { 0i64 },
2776                metadata,
2777                provenance,
2778                now_ms,
2779            ],
2780        )?;
2781        Ok(changed > 0)
2782    }
2783
2784    pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
2785        let conn = self.conn.lock().await;
2786        let mut stmt = conn.prepare(
2787            "SELECT
2788                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2789                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2790                redaction_status, redaction_count, visibility, demoted, score_boost,
2791                created_at_ms, updated_at_ms, expires_at_ms
2792             FROM memory_records
2793             WHERE id = ?1
2794             LIMIT 1",
2795        )?;
2796        let record = stmt
2797            .query_row(params![id], row_to_global_record)
2798            .optional()?;
2799        Ok(record)
2800    }
2801
2802    pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
2803        let conn = self.conn.lock().await;
2804        let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
2805        Ok(changed > 0)
2806    }
2807}
2808
2809/// Convert a database row to a MemoryChunk
2810fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
2811    let id: String = row.get(0)?;
2812    let content: String = row.get(1)?;
2813    let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
2814        match tier {
2815            MemoryTier::Session => (
2816                Some(row.get(2)?),
2817                row.get(3)?,
2818                4usize,
2819                5usize,
2820                6usize,
2821                7usize,
2822            ),
2823            MemoryTier::Project => (
2824                row.get(2)?,
2825                Some(row.get(3)?),
2826                4usize,
2827                5usize,
2828                6usize,
2829                7usize,
2830            ),
2831            MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
2832        };
2833
2834    let source: String = row.get(source_idx)?;
2835    let created_at_str: String = row.get(created_at_idx)?;
2836    let token_count: i64 = row.get(token_count_idx)?;
2837    let metadata_str: Option<String> = row.get(metadata_idx)?;
2838
2839    let created_at = DateTime::parse_from_rfc3339(&created_at_str)
2840        .map_err(|e| {
2841            rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
2842        })?
2843        .with_timezone(&Utc);
2844
2845    let metadata = metadata_str
2846        .filter(|s| !s.is_empty())
2847        .and_then(|s| serde_json::from_str(&s).ok());
2848
2849    let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
2850    let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
2851    let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
2852    let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
2853
2854    Ok(MemoryChunk {
2855        id,
2856        content,
2857        tier,
2858        session_id,
2859        project_id,
2860        source,
2861        source_path,
2862        source_mtime,
2863        source_size,
2864        source_hash,
2865        created_at,
2866        token_count,
2867        metadata,
2868    })
2869}
2870
2871fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
2872    scope
2873        .filter(|value| !value.trim().is_empty())
2874        .ok_or_else(|| {
2875            crate::types::MemoryError::InvalidConfig(match tier {
2876                MemoryTier::Session => "tier=session requires session_id".to_string(),
2877                MemoryTier::Project => "tier=project requires project_id".to_string(),
2878                MemoryTier::Global => "tier=global does not require a scope id".to_string(),
2879            })
2880        })
2881}
2882
2883fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
2884    let metadata_str: Option<String> = row.get(12)?;
2885    let provenance_str: Option<String> = row.get(13)?;
2886    Ok(GlobalMemoryRecord {
2887        id: row.get(0)?,
2888        user_id: row.get(1)?,
2889        source_type: row.get(2)?,
2890        content: row.get(3)?,
2891        content_hash: row.get(4)?,
2892        run_id: row.get(5)?,
2893        session_id: row.get(6)?,
2894        message_id: row.get(7)?,
2895        tool_name: row.get(8)?,
2896        project_tag: row.get(9)?,
2897        channel_tag: row.get(10)?,
2898        host_tag: row.get(11)?,
2899        metadata: metadata_str
2900            .filter(|s| !s.is_empty())
2901            .and_then(|s| serde_json::from_str(&s).ok()),
2902        provenance: provenance_str
2903            .filter(|s| !s.is_empty())
2904            .and_then(|s| serde_json::from_str(&s).ok()),
2905        redaction_status: row.get(14)?,
2906        redaction_count: row.get::<_, i64>(15)? as u32,
2907        visibility: row.get(16)?,
2908        demoted: row.get::<_, i64>(17)? != 0,
2909        score_boost: row.get(18)?,
2910        created_at_ms: row.get::<_, i64>(19)? as u64,
2911        updated_at_ms: row.get::<_, i64>(20)? as u64,
2912        expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
2913    })
2914}
2915
2916impl MemoryDatabase {
2917    pub async fn get_node_by_uri(
2918        &self,
2919        uri: &str,
2920    ) -> MemoryResult<Option<crate::types::MemoryNode>> {
2921        let conn = self.conn.lock().await;
2922        let mut stmt = conn.prepare(
2923            "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2924             FROM memory_nodes WHERE uri = ?1",
2925        )?;
2926
2927        let result = stmt.query_row(params![uri], |row| {
2928            let node_type_str: String = row.get(3)?;
2929            let node_type = node_type_str
2930                .parse()
2931                .unwrap_or(crate::types::NodeType::File);
2932            let metadata_str: Option<String> = row.get(6)?;
2933            Ok(crate::types::MemoryNode {
2934                id: row.get(0)?,
2935                uri: row.get(1)?,
2936                parent_uri: row.get(2)?,
2937                node_type,
2938                created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2939                updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2940                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2941            })
2942        });
2943
2944        match result {
2945            Ok(node) => Ok(Some(node)),
2946            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2947            Err(e) => Err(MemoryError::Database(e)),
2948        }
2949    }
2950
2951    pub async fn create_node(
2952        &self,
2953        uri: &str,
2954        parent_uri: Option<&str>,
2955        node_type: crate::types::NodeType,
2956        metadata: Option<&serde_json::Value>,
2957    ) -> MemoryResult<String> {
2958        let id = uuid::Uuid::new_v4().to_string();
2959        let now = Utc::now().to_rfc3339();
2960        let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
2961
2962        let conn = self.conn.lock().await;
2963        conn.execute(
2964            "INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
2965             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2966            params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
2967        )?;
2968
2969        Ok(id)
2970    }
2971
2972    pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
2973        let conn = self.conn.lock().await;
2974        let mut stmt = conn.prepare(
2975            "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2976             FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
2977        )?;
2978
2979        let rows = stmt.query_map(params![uri], |row| {
2980            let node_type_str: String = row.get(3)?;
2981            let node_type = node_type_str
2982                .parse()
2983                .unwrap_or(crate::types::NodeType::File);
2984            let metadata_str: Option<String> = row.get(6)?;
2985            Ok(crate::types::MemoryNode {
2986                id: row.get(0)?,
2987                uri: row.get(1)?,
2988                parent_uri: row.get(2)?,
2989                node_type,
2990                created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2991                updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2992                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2993            })
2994        })?;
2995
2996        rows.collect::<Result<Vec<_>, _>>()
2997            .map_err(MemoryError::Database)
2998    }
2999
3000    pub async fn get_layer(
3001        &self,
3002        node_id: &str,
3003        layer_type: crate::types::LayerType,
3004    ) -> MemoryResult<Option<crate::types::MemoryLayer>> {
3005        let conn = self.conn.lock().await;
3006        let mut stmt = conn.prepare(
3007            "SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
3008             FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
3009        )?;
3010
3011        let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
3012            let layer_type_str: String = row.get(2)?;
3013            let layer_type = layer_type_str
3014                .parse()
3015                .unwrap_or(crate::types::LayerType::L2);
3016            Ok(crate::types::MemoryLayer {
3017                id: row.get(0)?,
3018                node_id: row.get(1)?,
3019                layer_type,
3020                content: row.get(3)?,
3021                token_count: row.get(4)?,
3022                embedding_id: row.get(5)?,
3023                created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
3024                source_chunk_id: row.get(7)?,
3025            })
3026        });
3027
3028        match result {
3029            Ok(layer) => Ok(Some(layer)),
3030            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
3031            Err(e) => Err(MemoryError::Database(e)),
3032        }
3033    }
3034
3035    pub async fn create_layer(
3036        &self,
3037        node_id: &str,
3038        layer_type: crate::types::LayerType,
3039        content: &str,
3040        token_count: i64,
3041        source_chunk_id: Option<&str>,
3042    ) -> MemoryResult<String> {
3043        let id = uuid::Uuid::new_v4().to_string();
3044        let now = Utc::now().to_rfc3339();
3045
3046        let conn = self.conn.lock().await;
3047        conn.execute(
3048            "INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
3049             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
3050            params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
3051        )?;
3052
3053        Ok(id)
3054    }
3055
3056    pub async fn get_children_tree(
3057        &self,
3058        parent_uri: &str,
3059        max_depth: usize,
3060    ) -> MemoryResult<Vec<crate::types::TreeNode>> {
3061        if max_depth == 0 {
3062            return Ok(Vec::new());
3063        }
3064
3065        let children = self.list_directory(parent_uri).await?;
3066        let mut tree_nodes = Vec::new();
3067
3068        for child in children {
3069            let layer_summary = self.get_layer_summary(&child.id).await?;
3070
3071            let grandchildren = if child.node_type == crate::types::NodeType::Directory {
3072                Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
3073            } else {
3074                Vec::new()
3075            };
3076
3077            tree_nodes.push(crate::types::TreeNode {
3078                node: child,
3079                children: grandchildren,
3080                layer_summary,
3081            });
3082        }
3083
3084        Ok(tree_nodes)
3085    }
3086
3087    async fn get_layer_summary(
3088        &self,
3089        node_id: &str,
3090    ) -> MemoryResult<Option<crate::types::LayerSummary>> {
3091        let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
3092        let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
3093        let has_l2 = self
3094            .get_layer(node_id, crate::types::LayerType::L2)
3095            .await?
3096            .is_some();
3097
3098        if l0.is_none() && l1.is_none() && !has_l2 {
3099            return Ok(None);
3100        }
3101
3102        Ok(Some(crate::types::LayerSummary {
3103            l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
3104            l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
3105            has_l2,
3106        }))
3107    }
3108
3109    pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
3110        let conn = self.conn.lock().await;
3111        let count: i64 = conn.query_row(
3112            "SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
3113            params![uri],
3114            |row| row.get(0),
3115        )?;
3116        Ok(count > 0)
3117    }
3118}
3119
3120fn row_to_knowledge_space(row: &Row) -> Result<KnowledgeSpaceRecord, rusqlite::Error> {
3121    let scope = row
3122        .get::<_, String>(1)?
3123        .parse()
3124        .unwrap_or(tandem_orchestrator::KnowledgeScope::Project);
3125    let trust_level = row
3126        .get::<_, String>(6)?
3127        .parse()
3128        .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
3129    let metadata = row
3130        .get::<_, Option<String>>(7)?
3131        .and_then(|raw| serde_json::from_str(&raw).ok());
3132    Ok(KnowledgeSpaceRecord {
3133        id: row.get(0)?,
3134        scope,
3135        project_id: row.get(2)?,
3136        namespace: row.get(3)?,
3137        title: row.get(4)?,
3138        description: row.get(5)?,
3139        trust_level,
3140        metadata,
3141        created_at_ms: row.get::<_, i64>(8)? as u64,
3142        updated_at_ms: row.get::<_, i64>(9)? as u64,
3143    })
3144}
3145
3146fn row_to_knowledge_item(row: &Row) -> Result<KnowledgeItemRecord, rusqlite::Error> {
3147    let trust_level = row
3148        .get::<_, String>(8)?
3149        .parse()
3150        .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
3151    let status = row
3152        .get::<_, String>(9)?
3153        .parse()
3154        .unwrap_or(KnowledgeItemStatus::Working);
3155    let payload = row
3156        .get::<_, String>(7)
3157        .ok()
3158        .and_then(|raw| serde_json::from_str(&raw).ok())
3159        .unwrap_or(serde_json::Value::Null);
3160    let artifact_refs = row
3161        .get::<_, String>(11)
3162        .ok()
3163        .and_then(|raw| serde_json::from_str(&raw).ok())
3164        .unwrap_or_default();
3165    let source_memory_ids = row
3166        .get::<_, String>(12)
3167        .ok()
3168        .and_then(|raw| serde_json::from_str(&raw).ok())
3169        .unwrap_or_default();
3170    let metadata = row
3171        .get::<_, Option<String>>(14)?
3172        .and_then(|raw| serde_json::from_str(&raw).ok());
3173    Ok(KnowledgeItemRecord {
3174        id: row.get(0)?,
3175        space_id: row.get(1)?,
3176        coverage_key: row.get(2)?,
3177        dedupe_key: row.get(3)?,
3178        item_type: row.get(4)?,
3179        title: row.get(5)?,
3180        summary: row.get(6)?,
3181        payload,
3182        trust_level,
3183        status,
3184        run_id: row.get(10)?,
3185        artifact_refs,
3186        source_memory_ids,
3187        freshness_expires_at_ms: row.get::<_, Option<i64>>(13)?.map(|value| value as u64),
3188        metadata,
3189        created_at_ms: row.get::<_, i64>(15)? as u64,
3190        updated_at_ms: row.get::<_, i64>(16)? as u64,
3191    })
3192}
3193
3194fn row_to_knowledge_coverage(row: &Row) -> Result<KnowledgeCoverageRecord, rusqlite::Error> {
3195    let metadata = row
3196        .get::<_, Option<String>>(7)?
3197        .and_then(|raw| serde_json::from_str(&raw).ok());
3198    Ok(KnowledgeCoverageRecord {
3199        coverage_key: row.get(0)?,
3200        space_id: row.get(1)?,
3201        latest_item_id: row.get(2)?,
3202        latest_dedupe_key: row.get(3)?,
3203        last_seen_at_ms: row.get::<_, i64>(4)? as u64,
3204        last_promoted_at_ms: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
3205        freshness_expires_at_ms: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
3206        metadata,
3207    })
3208}
3209
3210fn truncate_string(s: &str, max_len: usize) -> String {
3211    if s.len() <= max_len {
3212        s.to_string()
3213    } else {
3214        format!("{}...", &s[..max_len.saturating_sub(3)])
3215    }
3216}
3217
3218fn build_fts_query(query: &str) -> String {
3219    let tokens = query
3220        .split_whitespace()
3221        .filter_map(|tok| {
3222            let cleaned =
3223                tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
3224            if cleaned.is_empty() {
3225                None
3226            } else {
3227                Some(format!("\"{}\"", cleaned))
3228            }
3229        })
3230        .collect::<Vec<_>>();
3231    if tokens.is_empty() {
3232        "\"\"".to_string()
3233    } else {
3234        tokens.join(" OR ")
3235    }
3236}
3237
3238#[cfg(test)]
3239mod tests {
3240    use super::*;
3241    use serde_json::Value;
3242    use tandem_orchestrator::{KnowledgeScope, KnowledgeTrustLevel};
3243    use tempfile::TempDir;
3244
3245    async fn setup_test_db() -> (MemoryDatabase, TempDir) {
3246        let temp_dir = TempDir::new().unwrap();
3247        let db_path = temp_dir.path().join("test_memory.db");
3248        let db = MemoryDatabase::new(&db_path).await.unwrap();
3249        (db, temp_dir)
3250    }
3251
3252    #[tokio::test]
3253    async fn test_init_schema() {
3254        let (db, _temp) = setup_test_db().await;
3255        // If we get here, schema was initialized successfully
3256        let stats = db.get_stats().await.unwrap();
3257        assert_eq!(stats.total_chunks, 0);
3258    }
3259
3260    #[tokio::test]
3261    async fn test_knowledge_registry_roundtrip() {
3262        let (db, _temp) = setup_test_db().await;
3263
3264        let space = KnowledgeSpaceRecord {
3265            id: "space-1".to_string(),
3266            scope: tandem_orchestrator::KnowledgeScope::Project,
3267            project_id: Some("project-1".to_string()),
3268            namespace: Some("support".to_string()),
3269            title: Some("Support Knowledge".to_string()),
3270            description: Some("Reusable support guidance".to_string()),
3271            trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
3272            metadata: Some(serde_json::json!({"owner": "ops"})),
3273            created_at_ms: 1,
3274            updated_at_ms: 2,
3275        };
3276        db.upsert_knowledge_space(&space).await.unwrap();
3277
3278        let item = KnowledgeItemRecord {
3279            id: "item-1".to_string(),
3280            space_id: space.id.clone(),
3281            coverage_key: "project-1/support/debugging/slow-start".to_string(),
3282            dedupe_key: "dedupe-1".to_string(),
3283            item_type: "decision".to_string(),
3284            title: "Restart service before retry".to_string(),
3285            summary: Some("When the service is stale, restart before retrying.".to_string()),
3286            payload: serde_json::json!({"action": "restart"}),
3287            trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
3288            status: KnowledgeItemStatus::Promoted,
3289            run_id: Some("run-1".to_string()),
3290            artifact_refs: vec!["artifact://run-1/report".to_string()],
3291            source_memory_ids: vec!["memory-1".to_string()],
3292            freshness_expires_at_ms: Some(10),
3293            metadata: Some(serde_json::json!({"source": "run"})),
3294            created_at_ms: 3,
3295            updated_at_ms: 4,
3296        };
3297        db.upsert_knowledge_item(&item).await.unwrap();
3298
3299        let coverage = KnowledgeCoverageRecord {
3300            coverage_key: item.coverage_key.clone(),
3301            space_id: space.id.clone(),
3302            latest_item_id: Some(item.id.clone()),
3303            latest_dedupe_key: Some(item.dedupe_key.clone()),
3304            last_seen_at_ms: 5,
3305            last_promoted_at_ms: Some(6),
3306            freshness_expires_at_ms: Some(10),
3307            metadata: Some(serde_json::json!({"coverage": true})),
3308        };
3309        db.upsert_knowledge_coverage(&coverage).await.unwrap();
3310
3311        let loaded_space = db.get_knowledge_space(&space.id).await.unwrap().unwrap();
3312        assert_eq!(loaded_space.namespace.as_deref(), Some("support"));
3313
3314        let loaded_items = db
3315            .list_knowledge_items(&space.id, Some(&item.coverage_key))
3316            .await
3317            .unwrap();
3318        assert_eq!(loaded_items.len(), 1);
3319        assert_eq!(loaded_items[0].title, item.title);
3320
3321        let loaded_coverage = db
3322            .get_knowledge_coverage(&item.coverage_key, &space.id)
3323            .await
3324            .unwrap()
3325            .unwrap();
3326        assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
3327    }
3328
3329    #[tokio::test]
3330    async fn test_store_and_retrieve_chunk() {
3331        let (db, _temp) = setup_test_db().await;
3332
3333        let chunk = MemoryChunk {
3334            id: "test-1".to_string(),
3335            content: "Test content".to_string(),
3336            tier: MemoryTier::Session,
3337            session_id: Some("session-1".to_string()),
3338            project_id: Some("project-1".to_string()),
3339            source: "user_message".to_string(),
3340            source_path: None,
3341            source_mtime: None,
3342            source_size: None,
3343            source_hash: None,
3344            created_at: Utc::now(),
3345            token_count: 10,
3346            metadata: None,
3347        };
3348
3349        let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
3350        db.store_chunk(&chunk, &embedding).await.unwrap();
3351
3352        let chunks = db.get_session_chunks("session-1").await.unwrap();
3353        assert_eq!(chunks.len(), 1);
3354        assert_eq!(chunks[0].content, "Test content");
3355    }
3356
3357    #[tokio::test]
3358    async fn test_store_and_retrieve_global_chunk() {
3359        let (db, _temp) = setup_test_db().await;
3360
3361        let chunk = MemoryChunk {
3362            id: "global-1".to_string(),
3363            content: "Global note".to_string(),
3364            tier: MemoryTier::Global,
3365            session_id: None,
3366            project_id: None,
3367            source: "agent_note".to_string(),
3368            source_path: None,
3369            source_mtime: None,
3370            source_size: None,
3371            source_hash: None,
3372            created_at: Utc::now(),
3373            token_count: 7,
3374            metadata: Some(serde_json::json!({"kind":"test"})),
3375        };
3376
3377        let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
3378        db.store_chunk(&chunk, &embedding).await.unwrap();
3379
3380        let chunks = db.get_global_chunks(10).await.unwrap();
3381        assert_eq!(chunks.len(), 1);
3382        assert_eq!(chunks[0].content, "Global note");
3383        assert_eq!(chunks[0].source, "agent_note");
3384        assert_eq!(chunks[0].token_count, 7);
3385        assert_eq!(chunks[0].tier, MemoryTier::Global);
3386    }
3387
3388    #[tokio::test]
3389    async fn test_global_chunk_exists_by_source_hash() {
3390        let (db, _temp) = setup_test_db().await;
3391
3392        let chunk = MemoryChunk {
3393            id: "global-hash".to_string(),
3394            content: "Global hash note".to_string(),
3395            tier: MemoryTier::Global,
3396            session_id: None,
3397            project_id: None,
3398            source: "chat_exchange".to_string(),
3399            source_path: None,
3400            source_mtime: None,
3401            source_size: None,
3402            source_hash: Some("hash-123".to_string()),
3403            created_at: Utc::now(),
3404            token_count: 5,
3405            metadata: None,
3406        };
3407
3408        let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
3409        db.store_chunk(&chunk, &embedding).await.unwrap();
3410
3411        assert!(db
3412            .global_chunk_exists_by_source_hash("hash-123")
3413            .await
3414            .unwrap());
3415        assert!(!db
3416            .global_chunk_exists_by_source_hash("missing-hash")
3417            .await
3418            .unwrap());
3419    }
3420
3421    #[tokio::test]
3422    async fn test_config_crud() {
3423        let (db, _temp) = setup_test_db().await;
3424
3425        let config = db.get_or_create_config("project-1").await.unwrap();
3426        assert_eq!(config.max_chunks, 10000);
3427
3428        let new_config = MemoryConfig {
3429            max_chunks: 5000,
3430            ..Default::default()
3431        };
3432        db.update_config("project-1", &new_config).await.unwrap();
3433
3434        let updated = db.get_or_create_config("project-1").await.unwrap();
3435        assert_eq!(updated.max_chunks, 5000);
3436    }
3437
3438    #[tokio::test]
3439    async fn test_global_memory_put_search_and_dedup() {
3440        let (db, _temp) = setup_test_db().await;
3441        let now = chrono::Utc::now().timestamp_millis() as u64;
3442        let record = GlobalMemoryRecord {
3443            id: "gm-1".to_string(),
3444            user_id: "user-a".to_string(),
3445            source_type: "user_message".to_string(),
3446            content: "remember rust workspace layout".to_string(),
3447            content_hash: "h1".to_string(),
3448            run_id: "run-1".to_string(),
3449            session_id: Some("s1".to_string()),
3450            message_id: Some("m1".to_string()),
3451            tool_name: None,
3452            project_tag: Some("proj-x".to_string()),
3453            channel_tag: None,
3454            host_tag: None,
3455            metadata: None,
3456            provenance: None,
3457            redaction_status: "passed".to_string(),
3458            redaction_count: 0,
3459            visibility: "private".to_string(),
3460            demoted: false,
3461            score_boost: 0.0,
3462            created_at_ms: now,
3463            updated_at_ms: now,
3464            expires_at_ms: None,
3465        };
3466        let first = db.put_global_memory_record(&record).await.unwrap();
3467        assert!(first.stored);
3468        let second = db.put_global_memory_record(&record).await.unwrap();
3469        assert!(second.deduped);
3470
3471        let hits = db
3472            .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
3473            .await
3474            .unwrap();
3475        assert!(!hits.is_empty());
3476        assert_eq!(hits[0].record.id, "gm-1");
3477    }
3478
3479    #[tokio::test]
3480    async fn test_knowledge_registry_round_trip() {
3481        let (db, _temp) = setup_test_db().await;
3482        let now = chrono::Utc::now().timestamp_millis() as u64;
3483
3484        let space = KnowledgeSpaceRecord {
3485            id: "space-1".to_string(),
3486            scope: KnowledgeScope::Project,
3487            project_id: Some("project-1".to_string()),
3488            namespace: Some("marketing/positioning".to_string()),
3489            title: Some("Marketing positioning".to_string()),
3490            description: Some("Reusable positioning guidance".to_string()),
3491            trust_level: KnowledgeTrustLevel::ApprovedDefault,
3492            metadata: Some(serde_json::json!({"owner":"marketing"})),
3493            created_at_ms: now,
3494            updated_at_ms: now,
3495        };
3496        db.upsert_knowledge_space(&space).await.unwrap();
3497
3498        let loaded_space = db.get_knowledge_space("space-1").await.unwrap().unwrap();
3499        assert_eq!(loaded_space.id, "space-1");
3500        assert_eq!(loaded_space.scope, KnowledgeScope::Project);
3501        assert_eq!(loaded_space.project_id.as_deref(), Some("project-1"));
3502        assert_eq!(
3503            loaded_space.namespace.as_deref(),
3504            Some("marketing/positioning")
3505        );
3506
3507        let item = KnowledgeItemRecord {
3508            id: "item-1".to_string(),
3509            space_id: "space-1".to_string(),
3510            coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3511            dedupe_key: "item-1-dedupe".to_string(),
3512            item_type: "evidence".to_string(),
3513            title: "Pricing sensitivity observation".to_string(),
3514            summary: Some("Customers reacted to annual pricing changes".to_string()),
3515            payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
3516            trust_level: KnowledgeTrustLevel::Promoted,
3517            status: KnowledgeItemStatus::Promoted,
3518            run_id: Some("run-1".to_string()),
3519            artifact_refs: vec!["artifact://run-1/research-sources".to_string()],
3520            source_memory_ids: vec!["memory-1".to_string()],
3521            freshness_expires_at_ms: Some(now + 86_400_000),
3522            metadata: Some(serde_json::json!({"source_kind":"web"})),
3523            created_at_ms: now,
3524            updated_at_ms: now,
3525        };
3526        db.upsert_knowledge_item(&item).await.unwrap();
3527
3528        let loaded_item = db.get_knowledge_item("item-1").await.unwrap().unwrap();
3529        assert_eq!(loaded_item.id, "item-1");
3530        assert_eq!(loaded_item.space_id, "space-1");
3531        assert_eq!(
3532            loaded_item.coverage_key,
3533            "project-1::marketing/positioning::strategy::pricing"
3534        );
3535        assert_eq!(loaded_item.status, KnowledgeItemStatus::Promoted);
3536        assert_eq!(
3537            loaded_item.artifact_refs,
3538            vec!["artifact://run-1/research-sources".to_string()]
3539        );
3540
3541        let by_space = db.list_knowledge_items("space-1", None).await.unwrap();
3542        assert_eq!(by_space.len(), 1);
3543        let by_coverage = db
3544            .list_knowledge_items(
3545                "space-1",
3546                Some("project-1::marketing/positioning::strategy::pricing"),
3547            )
3548            .await
3549            .unwrap();
3550        assert_eq!(by_coverage.len(), 1);
3551
3552        let coverage = KnowledgeCoverageRecord {
3553            coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3554            space_id: "space-1".to_string(),
3555            latest_item_id: Some("item-1".to_string()),
3556            latest_dedupe_key: Some("item-1-dedupe".to_string()),
3557            last_seen_at_ms: now,
3558            last_promoted_at_ms: Some(now),
3559            freshness_expires_at_ms: Some(now + 86_400_000),
3560            metadata: Some(serde_json::json!({"reuse_reason":"same topic"})),
3561        };
3562        db.upsert_knowledge_coverage(&coverage).await.unwrap();
3563
3564        let loaded_coverage = db
3565            .get_knowledge_coverage(
3566                "project-1::marketing/positioning::strategy::pricing",
3567                "space-1",
3568            )
3569            .await
3570            .unwrap()
3571            .unwrap();
3572        assert_eq!(loaded_coverage.space_id, "space-1");
3573        assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
3574        assert_eq!(
3575            loaded_coverage.latest_dedupe_key.as_deref(),
3576            Some("item-1-dedupe")
3577        );
3578    }
3579
3580    #[tokio::test]
3581    async fn test_knowledge_promotion_working_to_promoted_updates_coverage() {
3582        let (db, _temp) = setup_test_db().await;
3583        let now = chrono::Utc::now().timestamp_millis() as u64;
3584
3585        let space = KnowledgeSpaceRecord {
3586            id: "space-promote-1".to_string(),
3587            scope: KnowledgeScope::Project,
3588            project_id: Some("project-1".to_string()),
3589            namespace: Some("engineering/debugging".to_string()),
3590            title: Some("Engineering debugging".to_string()),
3591            description: Some("Reusable debugging guidance".to_string()),
3592            trust_level: KnowledgeTrustLevel::Promoted,
3593            metadata: None,
3594            created_at_ms: now,
3595            updated_at_ms: now,
3596        };
3597        db.upsert_knowledge_space(&space).await.unwrap();
3598
3599        let item = KnowledgeItemRecord {
3600            id: "item-promote-1".to_string(),
3601            space_id: space.id.clone(),
3602            coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
3603            dedupe_key: "dedupe-promote-1".to_string(),
3604            item_type: "decision".to_string(),
3605            title: "Delay startup-dependent retries".to_string(),
3606            summary: Some("Retry only after startup completed.".to_string()),
3607            payload: serde_json::json!({"action":"delay_retry"}),
3608            trust_level: KnowledgeTrustLevel::Working,
3609            status: KnowledgeItemStatus::Working,
3610            run_id: Some("run-1".to_string()),
3611            artifact_refs: vec!["artifact://run-1/debug".to_string()],
3612            source_memory_ids: vec!["memory-1".to_string()],
3613            freshness_expires_at_ms: None,
3614            metadata: None,
3615            created_at_ms: now,
3616            updated_at_ms: now,
3617        };
3618        db.upsert_knowledge_item(&item).await.unwrap();
3619
3620        let promote = KnowledgePromotionRequest {
3621            item_id: item.id.clone(),
3622            target_status: KnowledgeItemStatus::Promoted,
3623            promoted_at_ms: now + 10,
3624            freshness_expires_at_ms: Some(now + 86_400_000),
3625            reviewer_id: None,
3626            approval_id: None,
3627            reason: Some("validated in workflow".to_string()),
3628        };
3629
3630        let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3631        assert!(result.promoted);
3632        assert_eq!(result.item.status, KnowledgeItemStatus::Promoted);
3633        assert_eq!(result.item.trust_level, KnowledgeTrustLevel::Promoted);
3634        assert_eq!(
3635            result.coverage.latest_item_id.as_deref(),
3636            Some("item-promote-1")
3637        );
3638        assert_eq!(
3639            result.coverage.latest_dedupe_key.as_deref(),
3640            Some("dedupe-promote-1")
3641        );
3642        assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 10));
3643    }
3644
3645    #[tokio::test]
3646    async fn test_knowledge_promotion_promoted_to_approved_default_requires_review() {
3647        let (db, _temp) = setup_test_db().await;
3648        let now = chrono::Utc::now().timestamp_millis() as u64;
3649
3650        let space = KnowledgeSpaceRecord {
3651            id: "space-promote-2".to_string(),
3652            scope: KnowledgeScope::Project,
3653            project_id: Some("project-1".to_string()),
3654            namespace: Some("marketing/positioning".to_string()),
3655            title: Some("Marketing positioning".to_string()),
3656            description: Some("Reusable positioning guidance".to_string()),
3657            trust_level: KnowledgeTrustLevel::Promoted,
3658            metadata: None,
3659            created_at_ms: now,
3660            updated_at_ms: now,
3661        };
3662        db.upsert_knowledge_space(&space).await.unwrap();
3663
3664        let item = KnowledgeItemRecord {
3665            id: "item-promote-2".to_string(),
3666            space_id: space.id.clone(),
3667            coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
3668            dedupe_key: "dedupe-promote-2".to_string(),
3669            item_type: "evidence".to_string(),
3670            title: "Pricing observation".to_string(),
3671            summary: Some("Annual pricing changes created friction".to_string()),
3672            payload: serde_json::json!({"claim":"pricing friction"}),
3673            trust_level: KnowledgeTrustLevel::Promoted,
3674            status: KnowledgeItemStatus::Promoted,
3675            run_id: Some("run-2".to_string()),
3676            artifact_refs: vec!["artifact://run-2/research".to_string()],
3677            source_memory_ids: vec!["memory-2".to_string()],
3678            freshness_expires_at_ms: None,
3679            metadata: None,
3680            created_at_ms: now,
3681            updated_at_ms: now,
3682        };
3683        db.upsert_knowledge_item(&item).await.unwrap();
3684
3685        let promote = KnowledgePromotionRequest {
3686            item_id: item.id.clone(),
3687            target_status: KnowledgeItemStatus::ApprovedDefault,
3688            promoted_at_ms: now + 5,
3689            freshness_expires_at_ms: None,
3690            reviewer_id: None,
3691            approval_id: None,
3692            reason: Some("should require review".to_string()),
3693        };
3694
3695        let err = db.promote_knowledge_item(&promote).await.unwrap_err();
3696        match err {
3697            MemoryError::InvalidConfig(_) => {}
3698            other => panic!("unexpected error: {}", other),
3699        }
3700    }
3701
3702    #[tokio::test]
3703    async fn test_knowledge_promotion_promoted_to_approved_default_updates_coverage() {
3704        let (db, _temp) = setup_test_db().await;
3705        let now = chrono::Utc::now().timestamp_millis() as u64;
3706
3707        let space = KnowledgeSpaceRecord {
3708            id: "space-promote-3".to_string(),
3709            scope: KnowledgeScope::Project,
3710            project_id: Some("project-1".to_string()),
3711            namespace: Some("support/runbooks".to_string()),
3712            title: Some("Support runbooks".to_string()),
3713            description: Some("Reusable runbook guidance".to_string()),
3714            trust_level: KnowledgeTrustLevel::Promoted,
3715            metadata: None,
3716            created_at_ms: now,
3717            updated_at_ms: now,
3718        };
3719        db.upsert_knowledge_space(&space).await.unwrap();
3720
3721        let item = KnowledgeItemRecord {
3722            id: "item-promote-3".to_string(),
3723            space_id: space.id.clone(),
3724            coverage_key: "project-1::support/runbooks::oncall::restart".to_string(),
3725            dedupe_key: "dedupe-promote-3".to_string(),
3726            item_type: "runbook".to_string(),
3727            title: "Restart service and verify".to_string(),
3728            summary: Some("Restart then validate health endpoint.".to_string()),
3729            payload: serde_json::json!({"steps":["restart","healthcheck"]}),
3730            trust_level: KnowledgeTrustLevel::Promoted,
3731            status: KnowledgeItemStatus::Promoted,
3732            run_id: Some("run-3".to_string()),
3733            artifact_refs: vec!["artifact://run-3/runbook".to_string()],
3734            source_memory_ids: vec!["memory-3".to_string()],
3735            freshness_expires_at_ms: None,
3736            metadata: None,
3737            created_at_ms: now,
3738            updated_at_ms: now,
3739        };
3740        db.upsert_knowledge_item(&item).await.unwrap();
3741
3742        let promote = KnowledgePromotionRequest {
3743            item_id: item.id.clone(),
3744            target_status: KnowledgeItemStatus::ApprovedDefault,
3745            promoted_at_ms: now + 12,
3746            freshness_expires_at_ms: Some(now + 172_800_000),
3747            reviewer_id: Some("reviewer-1".to_string()),
3748            approval_id: Some("approval-1".to_string()),
3749            reason: Some("reviewed by ops".to_string()),
3750        };
3751
3752        let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3753        assert!(result.promoted);
3754        assert_eq!(result.item.status, KnowledgeItemStatus::ApprovedDefault);
3755        assert_eq!(
3756            result.item.trust_level,
3757            KnowledgeTrustLevel::ApprovedDefault
3758        );
3759        assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 12));
3760        assert_eq!(
3761            result.coverage.latest_item_id.as_deref(),
3762            Some("item-promote-3")
3763        );
3764    }
3765
3766    #[tokio::test]
3767    async fn test_knowledge_promotion_rejects_deprecated() {
3768        let (db, _temp) = setup_test_db().await;
3769        let now = chrono::Utc::now().timestamp_millis() as u64;
3770
3771        let space = KnowledgeSpaceRecord {
3772            id: "space-promote-4".to_string(),
3773            scope: KnowledgeScope::Project,
3774            project_id: Some("project-1".to_string()),
3775            namespace: Some("ops".to_string()),
3776            title: Some("Ops knowledge".to_string()),
3777            description: Some("Reusable ops knowledge".to_string()),
3778            trust_level: KnowledgeTrustLevel::Promoted,
3779            metadata: None,
3780            created_at_ms: now,
3781            updated_at_ms: now,
3782        };
3783        db.upsert_knowledge_space(&space).await.unwrap();
3784
3785        let item = KnowledgeItemRecord {
3786            id: "item-promote-4".to_string(),
3787            space_id: space.id.clone(),
3788            coverage_key: "project-1::ops::incident::latency".to_string(),
3789            dedupe_key: "dedupe-promote-4".to_string(),
3790            item_type: "decision".to_string(),
3791            title: "Ignore deprecated item".to_string(),
3792            summary: None,
3793            payload: serde_json::json!({"decision":"deprecated"}),
3794            trust_level: KnowledgeTrustLevel::Promoted,
3795            status: KnowledgeItemStatus::Deprecated,
3796            run_id: Some("run-4".to_string()),
3797            artifact_refs: vec![],
3798            source_memory_ids: vec![],
3799            freshness_expires_at_ms: None,
3800            metadata: None,
3801            created_at_ms: now,
3802            updated_at_ms: now,
3803        };
3804        db.upsert_knowledge_item(&item).await.unwrap();
3805
3806        let promote = KnowledgePromotionRequest {
3807            item_id: item.id.clone(),
3808            target_status: KnowledgeItemStatus::Promoted,
3809            promoted_at_ms: now + 1,
3810            freshness_expires_at_ms: None,
3811            reviewer_id: None,
3812            approval_id: None,
3813            reason: None,
3814        };
3815
3816        let err = db.promote_knowledge_item(&promote).await.unwrap_err();
3817        match err {
3818            MemoryError::InvalidConfig(_) => {}
3819            other => panic!("unexpected error: {}", other),
3820        }
3821    }
3822
3823    #[tokio::test]
3824    async fn test_knowledge_promotion_idempotent_updates_coverage() {
3825        let (db, _temp) = setup_test_db().await;
3826        let now = chrono::Utc::now().timestamp_millis() as u64;
3827
3828        let space = KnowledgeSpaceRecord {
3829            id: "space-promote-5".to_string(),
3830            scope: KnowledgeScope::Project,
3831            project_id: Some("project-1".to_string()),
3832            namespace: Some("engineering/ops".to_string()),
3833            title: Some("Engineering ops".to_string()),
3834            description: None,
3835            trust_level: KnowledgeTrustLevel::Promoted,
3836            metadata: None,
3837            created_at_ms: now,
3838            updated_at_ms: now,
3839        };
3840        db.upsert_knowledge_space(&space).await.unwrap();
3841
3842        let item = KnowledgeItemRecord {
3843            id: "item-promote-5".to_string(),
3844            space_id: space.id.clone(),
3845            coverage_key: "project-1::engineering/ops::deploy::guardrails".to_string(),
3846            dedupe_key: "dedupe-promote-5".to_string(),
3847            item_type: "pattern".to_string(),
3848            title: "Deploy guardrails".to_string(),
3849            summary: None,
3850            payload: serde_json::json!({"pattern":"guardrails"}),
3851            trust_level: KnowledgeTrustLevel::Promoted,
3852            status: KnowledgeItemStatus::Promoted,
3853            run_id: Some("run-5".to_string()),
3854            artifact_refs: vec![],
3855            source_memory_ids: vec![],
3856            freshness_expires_at_ms: None,
3857            metadata: None,
3858            created_at_ms: now,
3859            updated_at_ms: now,
3860        };
3861        db.upsert_knowledge_item(&item).await.unwrap();
3862
3863        let promote = KnowledgePromotionRequest {
3864            item_id: item.id.clone(),
3865            target_status: KnowledgeItemStatus::Promoted,
3866            promoted_at_ms: now + 20,
3867            freshness_expires_at_ms: None,
3868            reviewer_id: None,
3869            approval_id: None,
3870            reason: None,
3871        };
3872
3873        let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
3874        assert!(!result.promoted);
3875        assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 20));
3876        assert_eq!(
3877            result.coverage.latest_item_id.as_deref(),
3878            Some("item-promote-5")
3879        );
3880    }
3881
3882    #[tokio::test]
3883    async fn test_knowledge_item_promotion_updates_coverage() {
3884        let (db, _temp) = setup_test_db().await;
3885        let now = chrono::Utc::now().timestamp_millis() as u64;
3886
3887        let space = KnowledgeSpaceRecord {
3888            id: "space-promote".to_string(),
3889            scope: KnowledgeScope::Project,
3890            project_id: Some("project-1".to_string()),
3891            namespace: Some("engineering/debugging".to_string()),
3892            title: Some("Engineering debugging".to_string()),
3893            description: Some("Reusable debugging guidance".to_string()),
3894            trust_level: KnowledgeTrustLevel::Promoted,
3895            metadata: None,
3896            created_at_ms: now,
3897            updated_at_ms: now,
3898        };
3899        db.upsert_knowledge_space(&space).await.unwrap();
3900
3901        let item = KnowledgeItemRecord {
3902            id: "item-promote".to_string(),
3903            space_id: space.id.clone(),
3904            coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
3905            dedupe_key: "dedupe-promote".to_string(),
3906            item_type: "decision".to_string(),
3907            title: "Delay startup-dependent retries".to_string(),
3908            summary: Some("Retry only after startup completes.".to_string()),
3909            payload: serde_json::json!({"action": "delay_retry"}),
3910            trust_level: KnowledgeTrustLevel::Working,
3911            status: KnowledgeItemStatus::Working,
3912            run_id: Some("run-promote".to_string()),
3913            artifact_refs: vec!["artifact://run-promote/report".to_string()],
3914            source_memory_ids: vec!["memory-promote".to_string()],
3915            freshness_expires_at_ms: None,
3916            metadata: Some(serde_json::json!({"source_kind":"run"})),
3917            created_at_ms: now,
3918            updated_at_ms: now,
3919        };
3920        db.upsert_knowledge_item(&item).await.unwrap();
3921
3922        let request = KnowledgePromotionRequest {
3923            item_id: item.id.clone(),
3924            target_status: KnowledgeItemStatus::Promoted,
3925            promoted_at_ms: now + 10,
3926            freshness_expires_at_ms: Some(now + 86_400_000),
3927            reviewer_id: None,
3928            approval_id: None,
3929            reason: Some("validated".to_string()),
3930        };
3931        let promoted = db
3932            .promote_knowledge_item(&request)
3933            .await
3934            .unwrap()
3935            .expect("promotion result");
3936        assert_eq!(promoted.previous_status, KnowledgeItemStatus::Working);
3937        assert!(promoted.promoted);
3938        assert_eq!(promoted.item.status, KnowledgeItemStatus::Promoted);
3939        assert_eq!(promoted.item.trust_level, KnowledgeTrustLevel::Promoted);
3940        assert_eq!(
3941            promoted.item.freshness_expires_at_ms,
3942            Some(now + 86_400_000)
3943        );
3944        assert_eq!(
3945            promoted
3946                .item
3947                .metadata
3948                .as_ref()
3949                .and_then(|value| value.get("promotion"))
3950                .and_then(|value| value.get("to_status"))
3951                .and_then(Value::as_str),
3952            Some("promoted")
3953        );
3954        assert_eq!(
3955            promoted.coverage.latest_item_id.as_deref(),
3956            Some("item-promote")
3957        );
3958        assert_eq!(
3959            promoted.coverage.latest_dedupe_key.as_deref(),
3960            Some("dedupe-promote")
3961        );
3962        assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 10));
3963        assert_eq!(
3964            promoted.coverage.freshness_expires_at_ms,
3965            Some(now + 86_400_000)
3966        );
3967
3968        let loaded = db
3969            .get_knowledge_item("item-promote")
3970            .await
3971            .unwrap()
3972            .unwrap();
3973        assert_eq!(loaded.status, KnowledgeItemStatus::Promoted);
3974        assert_eq!(
3975            loaded
3976                .metadata
3977                .as_ref()
3978                .and_then(|value| value.get("promotion"))
3979                .and_then(|value| value.get("from_status"))
3980                .and_then(Value::as_str),
3981            Some("working")
3982        );
3983    }
3984
3985    #[tokio::test]
3986    async fn test_knowledge_item_approved_default_requires_review() {
3987        let (db, _temp) = setup_test_db().await;
3988        let now = chrono::Utc::now().timestamp_millis() as u64;
3989
3990        let space = KnowledgeSpaceRecord {
3991            id: "space-approved".to_string(),
3992            scope: KnowledgeScope::Project,
3993            project_id: Some("project-1".to_string()),
3994            namespace: Some("marketing/positioning".to_string()),
3995            title: Some("Marketing positioning".to_string()),
3996            description: Some("Reusable positioning guidance".to_string()),
3997            trust_level: KnowledgeTrustLevel::Promoted,
3998            metadata: None,
3999            created_at_ms: now,
4000            updated_at_ms: now,
4001        };
4002        db.upsert_knowledge_space(&space).await.unwrap();
4003
4004        let item = KnowledgeItemRecord {
4005            id: "item-approved".to_string(),
4006            space_id: space.id.clone(),
4007            coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
4008            dedupe_key: "dedupe-approved".to_string(),
4009            item_type: "evidence".to_string(),
4010            title: "Pricing sensitivity observation".to_string(),
4011            summary: Some("Customers reacted to annual pricing changes".to_string()),
4012            payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
4013            trust_level: KnowledgeTrustLevel::Promoted,
4014            status: KnowledgeItemStatus::Promoted,
4015            run_id: Some("run-approved".to_string()),
4016            artifact_refs: vec!["artifact://run-approved/research".to_string()],
4017            source_memory_ids: vec!["memory-approved".to_string()],
4018            freshness_expires_at_ms: Some(now + 1234),
4019            metadata: None,
4020            created_at_ms: now,
4021            updated_at_ms: now,
4022        };
4023        db.upsert_knowledge_item(&item).await.unwrap();
4024
4025        let request = KnowledgePromotionRequest {
4026            item_id: item.id.clone(),
4027            target_status: KnowledgeItemStatus::ApprovedDefault,
4028            promoted_at_ms: now + 20,
4029            freshness_expires_at_ms: Some(now + 90_000_000),
4030            reviewer_id: Some("reviewer-1".to_string()),
4031            approval_id: Some("approval-1".to_string()),
4032            reason: Some("approved as default guidance".to_string()),
4033        };
4034        let promoted = db
4035            .promote_knowledge_item(&request)
4036            .await
4037            .unwrap()
4038            .expect("promotion result");
4039        assert_eq!(promoted.previous_status, KnowledgeItemStatus::Promoted);
4040        assert_eq!(promoted.item.status, KnowledgeItemStatus::ApprovedDefault);
4041        assert_eq!(
4042            promoted.item.trust_level,
4043            KnowledgeTrustLevel::ApprovedDefault
4044        );
4045        assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 20));
4046        assert_eq!(
4047            promoted
4048                .item
4049                .metadata
4050                .as_ref()
4051                .and_then(|value| value.get("promotion"))
4052                .and_then(|value| value.get("approval_id"))
4053                .and_then(Value::as_str),
4054            Some("approval-1")
4055        );
4056    }
4057
4058    #[tokio::test]
4059    async fn test_knowledge_item_promotion_rejects_invalid_transition() {
4060        let (db, _temp) = setup_test_db().await;
4061        let now = chrono::Utc::now().timestamp_millis() as u64;
4062
4063        let space = KnowledgeSpaceRecord {
4064            id: "space-invalid".to_string(),
4065            scope: KnowledgeScope::Project,
4066            project_id: Some("project-1".to_string()),
4067            namespace: Some("support".to_string()),
4068            title: Some("Support".to_string()),
4069            description: Some("Support guidance".to_string()),
4070            trust_level: KnowledgeTrustLevel::Promoted,
4071            metadata: None,
4072            created_at_ms: now,
4073            updated_at_ms: now,
4074        };
4075        db.upsert_knowledge_space(&space).await.unwrap();
4076
4077        let item = KnowledgeItemRecord {
4078            id: "item-invalid".to_string(),
4079            space_id: space.id.clone(),
4080            coverage_key: "project-1::support::workflow::triage".to_string(),
4081            dedupe_key: "dedupe-invalid".to_string(),
4082            item_type: "decision".to_string(),
4083            title: "Triage first".to_string(),
4084            summary: None,
4085            payload: serde_json::json!({"action":"triage"}),
4086            trust_level: KnowledgeTrustLevel::Working,
4087            status: KnowledgeItemStatus::Working,
4088            run_id: Some("run-invalid".to_string()),
4089            artifact_refs: vec![],
4090            source_memory_ids: vec![],
4091            freshness_expires_at_ms: None,
4092            metadata: None,
4093            created_at_ms: now,
4094            updated_at_ms: now,
4095        };
4096        db.upsert_knowledge_item(&item).await.unwrap();
4097
4098        let request = KnowledgePromotionRequest {
4099            item_id: item.id.clone(),
4100            target_status: KnowledgeItemStatus::ApprovedDefault,
4101            promoted_at_ms: now + 1,
4102            freshness_expires_at_ms: None,
4103            reviewer_id: Some("reviewer-1".to_string()),
4104            approval_id: Some("approval-1".to_string()),
4105            reason: Some("should fail".to_string()),
4106        };
4107        let err = db.promote_knowledge_item(&request).await.unwrap_err();
4108        assert!(matches!(err, MemoryError::InvalidConfig(_)));
4109        let loaded = db.get_knowledge_item(&item.id).await.unwrap().unwrap();
4110        assert_eq!(loaded.status, KnowledgeItemStatus::Working);
4111    }
4112}