Skip to main content

tandem_memory/
db.rs

1// Database Layer Module
2// SQLite + sqlite-vec for vector storage
3
4use crate::types::{
5    ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6    MemoryChunk, MemoryConfig, MemoryError, MemoryResult, MemoryStats, MemoryTier,
7    ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
8};
9use chrono::{DateTime, Utc};
10use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
11use sqlite_vec::sqlite3_vec_init;
12use std::collections::HashSet;
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18type ProjectIndexStatusRow = (
19    Option<String>,
20    Option<i64>,
21    Option<i64>,
22    Option<i64>,
23    Option<i64>,
24    Option<i64>,
25);
26
27/// Database connection manager
28pub struct MemoryDatabase {
29    conn: Arc<Mutex<Connection>>,
30    db_path: std::path::PathBuf,
31}
32
33impl MemoryDatabase {
34    /// Initialize or open the memory database
35    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
36        // Register sqlite-vec extension
37        unsafe {
38            sqlite3_auto_extension(Some(std::mem::transmute::<
39                *const (),
40                unsafe extern "C" fn(
41                    *mut rusqlite::ffi::sqlite3,
42                    *mut *mut i8,
43                    *const rusqlite::ffi::sqlite3_api_routines,
44                ) -> i32,
45            >(sqlite3_vec_init as *const ())));
46        }
47
48        let conn = Connection::open(db_path)?;
49        conn.busy_timeout(Duration::from_secs(10))?;
50
51        // Enable WAL mode for better concurrency
52        // PRAGMA journal_mode returns a row, so we use query_row to ignore it
53        conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
54        conn.execute("PRAGMA synchronous = NORMAL", [])?;
55
56        let db = Self {
57            conn: Arc::new(Mutex::new(conn)),
58            db_path: db_path.to_path_buf(),
59        };
60
61        // Initialize schema
62        db.init_schema().await?;
63        if let Err(err) = db.validate_vector_tables().await {
64            match &err {
65                crate::types::MemoryError::Database(db_err)
66                    if Self::is_vector_table_error(db_err) =>
67                {
68                    tracing::warn!(
69                        "Detected vector table corruption during startup ({}). Recreating vector tables.",
70                        db_err
71                    );
72                    db.recreate_vector_tables().await?;
73                }
74                _ => return Err(err),
75            }
76        }
77        db.validate_integrity().await?;
78
79        Ok(db)
80    }
81
82    /// Validate base SQLite integrity early so startup recovery can heal corrupt DB files.
83    async fn validate_integrity(&self) -> MemoryResult<()> {
84        let conn = self.conn.lock().await;
85        let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
86        {
87            Ok(value) => value,
88            Err(err) => {
89                // sqlite-vec virtual tables can intermittently return generic SQL logic errors
90                // during integrity probing even when runtime reads/writes still work.
91                // Do not block startup on this probe failure.
92                tracing::warn!(
93                    "Skipping strict PRAGMA quick_check due to probe error: {}",
94                    err
95                );
96                return Ok(());
97            }
98        };
99        if check.trim().eq_ignore_ascii_case("ok") {
100            return Ok(());
101        }
102
103        let lowered = check.to_lowercase();
104        if lowered.contains("malformed")
105            || lowered.contains("corrupt")
106            || lowered.contains("database disk image is malformed")
107        {
108            return Err(crate::types::MemoryError::InvalidConfig(format!(
109                "malformed database integrity check: {}",
110                check
111            )));
112        }
113
114        tracing::warn!(
115            "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
116            check
117        );
118        Ok(())
119    }
120
121    /// Initialize database schema
122    async fn init_schema(&self) -> MemoryResult<()> {
123        let conn = self.conn.lock().await;
124
125        // Extension is already registered globally in new()
126
127        // Session memory chunks table
128        conn.execute(
129            "CREATE TABLE IF NOT EXISTS session_memory_chunks (
130                id TEXT PRIMARY KEY,
131                content TEXT NOT NULL,
132                session_id TEXT NOT NULL,
133                project_id TEXT,
134                source TEXT NOT NULL,
135                created_at TEXT NOT NULL,
136                token_count INTEGER NOT NULL DEFAULT 0,
137                metadata TEXT
138            )",
139            [],
140        )?;
141        let session_existing_cols: HashSet<String> = {
142            let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
143            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
144            rows.collect::<Result<HashSet<_>, _>>()?
145        };
146        if !session_existing_cols.contains("source_path") {
147            conn.execute(
148                "ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
149                [],
150            )?;
151        }
152        if !session_existing_cols.contains("source_mtime") {
153            conn.execute(
154                "ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
155                [],
156            )?;
157        }
158        if !session_existing_cols.contains("source_size") {
159            conn.execute(
160                "ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
161                [],
162            )?;
163        }
164        if !session_existing_cols.contains("source_hash") {
165            conn.execute(
166                "ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
167                [],
168            )?;
169        }
170
171        // Session memory vectors (virtual table)
172        conn.execute(
173            &format!(
174                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
175                    chunk_id TEXT PRIMARY KEY,
176                    embedding float[{}]
177                )",
178                DEFAULT_EMBEDDING_DIMENSION
179            ),
180            [],
181        )?;
182
183        // Project memory chunks table
184        conn.execute(
185            "CREATE TABLE IF NOT EXISTS project_memory_chunks (
186                id TEXT PRIMARY KEY,
187                content TEXT NOT NULL,
188                project_id TEXT NOT NULL,
189                session_id TEXT,
190                source TEXT NOT NULL,
191                created_at TEXT NOT NULL,
192                token_count INTEGER NOT NULL DEFAULT 0,
193                metadata TEXT
194            )",
195            [],
196        )?;
197
198        // Migrations: file-derived columns on project_memory_chunks
199        // (SQLite doesn't support IF NOT EXISTS for columns, so we inspect table_info)
200        let existing_cols: HashSet<String> = {
201            let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
202            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
203            rows.collect::<Result<HashSet<_>, _>>()?
204        };
205
206        if !existing_cols.contains("source_path") {
207            conn.execute(
208                "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
209                [],
210            )?;
211        }
212        if !existing_cols.contains("source_mtime") {
213            conn.execute(
214                "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
215                [],
216            )?;
217        }
218        if !existing_cols.contains("source_size") {
219            conn.execute(
220                "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
221                [],
222            )?;
223        }
224        if !existing_cols.contains("source_hash") {
225            conn.execute(
226                "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
227                [],
228            )?;
229        }
230
231        // Project memory vectors (virtual table)
232        conn.execute(
233            &format!(
234                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
235                    chunk_id TEXT PRIMARY KEY,
236                    embedding float[{}]
237                )",
238                DEFAULT_EMBEDDING_DIMENSION
239            ),
240            [],
241        )?;
242
243        // File indexing tables (project-scoped)
244        conn.execute(
245            "CREATE TABLE IF NOT EXISTS project_file_index (
246                project_id TEXT NOT NULL,
247                path TEXT NOT NULL,
248                mtime INTEGER NOT NULL,
249                size INTEGER NOT NULL,
250                hash TEXT NOT NULL,
251                indexed_at TEXT NOT NULL,
252                PRIMARY KEY(project_id, path)
253            )",
254            [],
255        )?;
256        conn.execute(
257            "CREATE TABLE IF NOT EXISTS session_file_index (
258                session_id TEXT NOT NULL,
259                path TEXT NOT NULL,
260                mtime INTEGER NOT NULL,
261                size INTEGER NOT NULL,
262                hash TEXT NOT NULL,
263                indexed_at TEXT NOT NULL,
264                PRIMARY KEY(session_id, path)
265            )",
266            [],
267        )?;
268
269        conn.execute(
270            "CREATE TABLE IF NOT EXISTS project_index_status (
271                project_id TEXT PRIMARY KEY,
272                last_indexed_at TEXT,
273                last_total_files INTEGER,
274                last_processed_files INTEGER,
275                last_indexed_files INTEGER,
276                last_skipped_files INTEGER,
277                last_errors INTEGER
278            )",
279            [],
280        )?;
281
282        // Global memory chunks table
283        conn.execute(
284            "CREATE TABLE IF NOT EXISTS global_memory_chunks (
285                id TEXT PRIMARY KEY,
286                content TEXT NOT NULL,
287                source TEXT NOT NULL,
288                created_at TEXT NOT NULL,
289                token_count INTEGER NOT NULL DEFAULT 0,
290                metadata TEXT
291            )",
292            [],
293        )?;
294        let global_existing_cols: HashSet<String> = {
295            let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
296            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
297            rows.collect::<Result<HashSet<_>, _>>()?
298        };
299        if !global_existing_cols.contains("source_path") {
300            conn.execute(
301                "ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
302                [],
303            )?;
304        }
305        if !global_existing_cols.contains("source_mtime") {
306            conn.execute(
307                "ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
308                [],
309            )?;
310        }
311        if !global_existing_cols.contains("source_size") {
312            conn.execute(
313                "ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
314                [],
315            )?;
316        }
317        if !global_existing_cols.contains("source_hash") {
318            conn.execute(
319                "ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
320                [],
321            )?;
322        }
323
324        // Global memory vectors (virtual table)
325        conn.execute(
326            &format!(
327                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
328                    chunk_id TEXT PRIMARY KEY,
329                    embedding float[{}]
330                )",
331                DEFAULT_EMBEDDING_DIMENSION
332            ),
333            [],
334        )?;
335
336        // Memory configuration table
337        conn.execute(
338            "CREATE TABLE IF NOT EXISTS memory_config (
339                project_id TEXT PRIMARY KEY,
340                max_chunks INTEGER NOT NULL DEFAULT 10000,
341                chunk_size INTEGER NOT NULL DEFAULT 512,
342                retrieval_k INTEGER NOT NULL DEFAULT 5,
343                auto_cleanup INTEGER NOT NULL DEFAULT 1,
344                session_retention_days INTEGER NOT NULL DEFAULT 30,
345                token_budget INTEGER NOT NULL DEFAULT 5000,
346                chunk_overlap INTEGER NOT NULL DEFAULT 64,
347                updated_at TEXT NOT NULL
348            )",
349            [],
350        )?;
351
352        // Cleanup log table
353        conn.execute(
354            "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
355                id TEXT PRIMARY KEY,
356                cleanup_type TEXT NOT NULL,
357                tier TEXT NOT NULL,
358                project_id TEXT,
359                session_id TEXT,
360                chunks_deleted INTEGER NOT NULL DEFAULT 0,
361                bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
362                created_at TEXT NOT NULL
363            )",
364            [],
365        )?;
366
367        // Create indexes for better query performance
368        conn.execute(
369            "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
370            [],
371        )?;
372        conn.execute(
373            "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
374            [],
375        )?;
376        conn.execute(
377            "CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
378            [],
379        )?;
380        conn.execute(
381            "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
382            [],
383        )?;
384        conn.execute(
385            "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
386            [],
387        )?;
388        conn.execute(
389            "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
390            [],
391        )?;
392        conn.execute(
393            "CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
394            [],
395        )?;
396        conn.execute(
397            "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
398            [],
399        )?;
400        conn.execute(
401            "CREATE TABLE IF NOT EXISTS global_file_index (
402                path TEXT PRIMARY KEY,
403                mtime INTEGER NOT NULL,
404                size INTEGER NOT NULL,
405                hash TEXT NOT NULL,
406                indexed_at TEXT NOT NULL
407            )",
408            [],
409        )?;
410
411        // Global user memory records (FTS-backed baseline retrieval path)
412        conn.execute(
413            "CREATE TABLE IF NOT EXISTS memory_records (
414                id TEXT PRIMARY KEY,
415                user_id TEXT NOT NULL,
416                source_type TEXT NOT NULL,
417                content TEXT NOT NULL,
418                content_hash TEXT NOT NULL,
419                run_id TEXT NOT NULL,
420                session_id TEXT,
421                message_id TEXT,
422                tool_name TEXT,
423                project_tag TEXT,
424                channel_tag TEXT,
425                host_tag TEXT,
426                metadata TEXT,
427                provenance TEXT,
428                redaction_status TEXT NOT NULL,
429                redaction_count INTEGER NOT NULL DEFAULT 0,
430                visibility TEXT NOT NULL DEFAULT 'private',
431                demoted INTEGER NOT NULL DEFAULT 0,
432                score_boost REAL NOT NULL DEFAULT 0.0,
433                created_at_ms INTEGER NOT NULL,
434                updated_at_ms INTEGER NOT NULL,
435                expires_at_ms INTEGER
436            )",
437            [],
438        )?;
439        conn.execute(
440            "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
441                ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
442            [],
443        )?;
444        conn.execute(
445            "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
446                ON memory_records(user_id, created_at_ms DESC)",
447            [],
448        )?;
449        conn.execute(
450            "CREATE INDEX IF NOT EXISTS idx_memory_records_run
451                ON memory_records(run_id)",
452            [],
453        )?;
454        conn.execute(
455            "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
456                id UNINDEXED,
457                user_id UNINDEXED,
458                content
459            )",
460            [],
461        )?;
462        conn.execute(
463            "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
464                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
465            END",
466            [],
467        )?;
468        conn.execute(
469            "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
470                DELETE FROM memory_records_fts WHERE id = old.id;
471            END",
472            [],
473        )?;
474        conn.execute(
475            "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
476                DELETE FROM memory_records_fts WHERE id = old.id;
477                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
478            END",
479            [],
480        )?;
481
482        conn.execute(
483            "CREATE TABLE IF NOT EXISTS memory_nodes (
484                id TEXT PRIMARY KEY,
485                uri TEXT NOT NULL UNIQUE,
486                parent_uri TEXT,
487                node_type TEXT NOT NULL,
488                created_at TEXT NOT NULL,
489                updated_at TEXT NOT NULL,
490                metadata TEXT
491            )",
492            [],
493        )?;
494        conn.execute(
495            "CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
496            [],
497        )?;
498        conn.execute(
499            "CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
500            [],
501        )?;
502
503        conn.execute(
504            "CREATE TABLE IF NOT EXISTS memory_layers (
505                id TEXT PRIMARY KEY,
506                node_id TEXT NOT NULL,
507                layer_type TEXT NOT NULL,
508                content TEXT NOT NULL,
509                token_count INTEGER NOT NULL,
510                embedding_id TEXT,
511                created_at TEXT NOT NULL,
512                source_chunk_id TEXT,
513                FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
514            )",
515            [],
516        )?;
517        conn.execute(
518            "CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
519            [],
520        )?;
521        conn.execute(
522            "CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
523            [],
524        )?;
525
526        conn.execute(
527            "CREATE TABLE IF NOT EXISTS memory_retrieval_state (
528                node_id TEXT PRIMARY KEY,
529                active_layer TEXT NOT NULL DEFAULT 'L0',
530                last_accessed TEXT,
531                access_count INTEGER DEFAULT 0,
532                FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
533            )",
534            [],
535        )?;
536
537        Ok(())
538    }
539
540    /// Validate that sqlite-vec tables are readable.
541    /// This catches legacy/corrupted vector blobs early so startup can recover.
542    pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
543        let conn = self.conn.lock().await;
544        let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
545
546        for table in [
547            "session_memory_vectors",
548            "project_memory_vectors",
549            "global_memory_vectors",
550        ] {
551            let sql = format!("SELECT COUNT(*) FROM {}", table);
552            let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
553
554            // COUNT(*) can pass even when vector chunk blobs are unreadable.
555            // Probe sqlite-vec MATCH execution to surface latent blob corruption.
556            if row_count > 0 {
557                let probe_sql = format!(
558                    "SELECT chunk_id, distance
559                     FROM {}
560                     WHERE embedding MATCH ?1 AND k = 1",
561                    table
562                );
563                let mut stmt = conn.prepare(&probe_sql)?;
564                let mut rows = stmt.query(params![probe_embedding.as_str()])?;
565                let _ = rows.next()?;
566            }
567        }
568        Ok(())
569    }
570
571    fn is_vector_table_error(err: &rusqlite::Error) -> bool {
572        let text = err.to_string().to_lowercase();
573        text.contains("vector blob")
574            || text.contains("chunks iter error")
575            || text.contains("chunks iter")
576            || text.contains("internal sqlite-vec error")
577            || text.contains("insert rowids id")
578            || text.contains("sql logic error")
579            || text.contains("database disk image is malformed")
580            || text.contains("session_memory_vectors")
581            || text.contains("project_memory_vectors")
582            || text.contains("global_memory_vectors")
583            || text.contains("vec0")
584    }
585
586    async fn recreate_vector_tables(&self) -> MemoryResult<()> {
587        let conn = self.conn.lock().await;
588
589        for base in [
590            "session_memory_vectors",
591            "project_memory_vectors",
592            "global_memory_vectors",
593        ] {
594            // Drop vec virtual table and common sqlite-vec shadow tables first.
595            for name in [
596                base.to_string(),
597                format!("{}_chunks", base),
598                format!("{}_info", base),
599                format!("{}_rowids", base),
600                format!("{}_vector_chunks00", base),
601            ] {
602                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
603                conn.execute(&sql, [])?;
604            }
605
606            // Drop any additional shadow tables (e.g. *_vector_chunks01).
607            let like_pattern = format!("{base}_%");
608            let mut stmt = conn.prepare(
609                "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
610            )?;
611            let table_names = stmt
612                .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
613                .collect::<Result<Vec<_>, _>>()?;
614            drop(stmt);
615            for name in table_names {
616                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
617                conn.execute(&sql, [])?;
618            }
619        }
620
621        conn.execute(
622            &format!(
623                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
624                    chunk_id TEXT PRIMARY KEY,
625                    embedding float[{}]
626                )",
627                DEFAULT_EMBEDDING_DIMENSION
628            ),
629            [],
630        )?;
631
632        conn.execute(
633            &format!(
634                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
635                    chunk_id TEXT PRIMARY KEY,
636                    embedding float[{}]
637                )",
638                DEFAULT_EMBEDDING_DIMENSION
639            ),
640            [],
641        )?;
642
643        conn.execute(
644            &format!(
645                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
646                    chunk_id TEXT PRIMARY KEY,
647                    embedding float[{}]
648                )",
649                DEFAULT_EMBEDDING_DIMENSION
650            ),
651            [],
652        )?;
653
654        Ok(())
655    }
656
657    /// Ensure vector tables are readable and recreate them if corruption is detected.
658    /// Returns true when a repair was performed.
659    pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
660        match self.validate_vector_tables().await {
661            Ok(()) => Ok(false),
662            Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
663                tracing::warn!(
664                    "Memory vector tables appear corrupted ({}). Recreating vector tables.",
665                    err
666                );
667                self.recreate_vector_tables().await?;
668                Ok(true)
669            }
670            Err(err) => Err(err),
671        }
672    }
673
674    /// Last-resort runtime repair for malformed DB states: drop user memory tables
675    /// and recreate the schema in-place so new writes can proceed.
676    /// This intentionally clears memory content for the active DB file.
677    pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
678        let table_names = {
679            let conn = self.conn.lock().await;
680            let mut stmt = conn.prepare(
681                "SELECT name FROM sqlite_master
682                 WHERE type='table'
683                   AND name NOT LIKE 'sqlite_%'
684                 ORDER BY name",
685            )?;
686            let names = stmt
687                .query_map([], |row| row.get::<_, String>(0))?
688                .collect::<Result<Vec<_>, _>>()?;
689            names
690        };
691
692        {
693            let conn = self.conn.lock().await;
694            for table in table_names {
695                let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
696                let _ = conn.execute(&sql, []);
697            }
698        }
699
700        self.init_schema().await
701    }
702
703    /// Attempt an immediate vector-table repair when a concrete DB error indicates
704    /// sqlite-vec internals are failing at statement/rowid level.
705    pub async fn try_repair_after_error(
706        &self,
707        err: &crate::types::MemoryError,
708    ) -> MemoryResult<bool> {
709        match err {
710            crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
711                tracing::warn!(
712                    "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
713                    db_err
714                );
715                self.recreate_vector_tables().await?;
716                Ok(true)
717            }
718            _ => Ok(false),
719        }
720    }
721
722    /// Store a chunk with its embedding
723    pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
724        let conn = self.conn.lock().await;
725
726        let (chunks_table, vectors_table) = match chunk.tier {
727            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
728            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
729            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
730        };
731
732        let created_at_str = chunk.created_at.to_rfc3339();
733        let metadata_str = chunk
734            .metadata
735            .as_ref()
736            .map(|m| m.to_string())
737            .unwrap_or_default();
738
739        // Insert chunk
740        match chunk.tier {
741            MemoryTier::Session => {
742                conn.execute(
743                    &format!(
744                        "INSERT INTO {} (
745                            id, content, session_id, project_id, source, created_at, token_count, metadata,
746                            source_path, source_mtime, source_size, source_hash
747                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
748                        chunks_table
749                    ),
750                    params![
751                        chunk.id,
752                        chunk.content,
753                        chunk.session_id.as_ref().unwrap_or(&String::new()),
754                        chunk.project_id,
755                        chunk.source,
756                        created_at_str,
757                        chunk.token_count,
758                        metadata_str,
759                        chunk.source_path.clone(),
760                        chunk.source_mtime,
761                        chunk.source_size,
762                        chunk.source_hash.clone()
763                    ],
764                )?;
765            }
766            MemoryTier::Project => {
767                conn.execute(
768                    &format!(
769                        "INSERT INTO {} (
770                            id, content, project_id, session_id, source, created_at, token_count, metadata,
771                            source_path, source_mtime, source_size, source_hash
772                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
773                        chunks_table
774                    ),
775                    params![
776                        chunk.id,
777                        chunk.content,
778                        chunk.project_id.as_ref().unwrap_or(&String::new()),
779                        chunk.session_id,
780                        chunk.source,
781                        created_at_str,
782                        chunk.token_count,
783                        metadata_str,
784                        chunk.source_path.clone(),
785                        chunk.source_mtime,
786                        chunk.source_size,
787                        chunk.source_hash.clone()
788                    ],
789                )?;
790            }
791            MemoryTier::Global => {
792                conn.execute(
793                    &format!(
794                        "INSERT INTO {} (
795                            id, content, source, created_at, token_count, metadata,
796                            source_path, source_mtime, source_size, source_hash
797                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
798                        chunks_table
799                    ),
800                    params![
801                        chunk.id,
802                        chunk.content,
803                        chunk.source,
804                        created_at_str,
805                        chunk.token_count,
806                        metadata_str,
807                        chunk.source_path.clone(),
808                        chunk.source_mtime,
809                        chunk.source_size,
810                        chunk.source_hash.clone()
811                    ],
812                )?;
813            }
814        }
815
816        // Insert embedding
817        let embedding_json = format!(
818            "[{}]",
819            embedding
820                .iter()
821                .map(|f| f.to_string())
822                .collect::<Vec<_>>()
823                .join(",")
824        );
825        conn.execute(
826            &format!(
827                "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
828                vectors_table
829            ),
830            params![chunk.id, embedding_json],
831        )?;
832
833        Ok(())
834    }
835
836    /// Search for similar chunks
837    pub async fn search_similar(
838        &self,
839        query_embedding: &[f32],
840        tier: MemoryTier,
841        project_id: Option<&str>,
842        session_id: Option<&str>,
843        limit: i64,
844    ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
845        let conn = self.conn.lock().await;
846
847        let (chunks_table, vectors_table) = match tier {
848            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
849            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
850            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
851        };
852
853        let embedding_json = format!(
854            "[{}]",
855            query_embedding
856                .iter()
857                .map(|f| f.to_string())
858                .collect::<Vec<_>>()
859                .join(",")
860        );
861
862        // Build query based on tier and filters
863        let results = match tier {
864            MemoryTier::Session => {
865                if let Some(sid) = session_id {
866                    let sql = format!(
867                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
868                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
869                                v.distance
870                         FROM {} AS v
871                         JOIN {} AS c ON v.chunk_id = c.id
872                         WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
873                         ORDER BY v.distance",
874                        vectors_table, chunks_table
875                    );
876                    let mut stmt = conn.prepare(&sql)?;
877                    let results = stmt
878                        .query_map(params![sid, embedding_json, limit], |row| {
879                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
880                        })?
881                        .collect::<Result<Vec<_>, _>>()?;
882                    results
883                } else if let Some(pid) = project_id {
884                    let sql = format!(
885                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
886                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
887                                v.distance
888                         FROM {} AS v
889                         JOIN {} AS c ON v.chunk_id = c.id
890                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
891                         ORDER BY v.distance",
892                        vectors_table, chunks_table
893                    );
894                    let mut stmt = conn.prepare(&sql)?;
895                    let results = stmt
896                        .query_map(params![pid, embedding_json, limit], |row| {
897                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
898                        })?
899                        .collect::<Result<Vec<_>, _>>()?;
900                    results
901                } else {
902                    let sql = format!(
903                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
904                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
905                                v.distance
906                         FROM {} AS v
907                         JOIN {} AS c ON v.chunk_id = c.id
908                         WHERE v.embedding MATCH ?1 AND k = ?2
909                         ORDER BY v.distance",
910                        vectors_table, chunks_table
911                    );
912                    let mut stmt = conn.prepare(&sql)?;
913                    let results = stmt
914                        .query_map(params![embedding_json, limit], |row| {
915                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
916                        })?
917                        .collect::<Result<Vec<_>, _>>()?;
918                    results
919                }
920            }
921            MemoryTier::Project => {
922                if let Some(pid) = project_id {
923                    let sql = format!(
924                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
925                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
926                                v.distance
927                         FROM {} AS v
928                         JOIN {} AS c ON v.chunk_id = c.id
929                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
930                         ORDER BY v.distance",
931                        vectors_table, chunks_table
932                    );
933                    let mut stmt = conn.prepare(&sql)?;
934                    let results = stmt
935                        .query_map(params![pid, embedding_json, limit], |row| {
936                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
937                        })?
938                        .collect::<Result<Vec<_>, _>>()?;
939                    results
940                } else {
941                    let sql = format!(
942                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
943                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
944                                v.distance
945                         FROM {} AS v
946                         JOIN {} AS c ON v.chunk_id = c.id
947                         WHERE v.embedding MATCH ?1 AND k = ?2
948                         ORDER BY v.distance",
949                        vectors_table, chunks_table
950                    );
951                    let mut stmt = conn.prepare(&sql)?;
952                    let results = stmt
953                        .query_map(params![embedding_json, limit], |row| {
954                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
955                        })?
956                        .collect::<Result<Vec<_>, _>>()?;
957                    results
958                }
959            }
960            MemoryTier::Global => {
961                let sql = format!(
962                    "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
963                            c.source_path, c.source_mtime, c.source_size, c.source_hash,
964                            v.distance
965                     FROM {} AS v
966                     JOIN {} AS c ON v.chunk_id = c.id
967                     WHERE v.embedding MATCH ?1 AND k = ?2
968                     ORDER BY v.distance",
969                    vectors_table, chunks_table
970                );
971                let mut stmt = conn.prepare(&sql)?;
972                let results = stmt
973                    .query_map(params![embedding_json, limit], |row| {
974                        Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
975                    })?
976                    .collect::<Result<Vec<_>, _>>()?;
977                results
978            }
979        };
980
981        Ok(results)
982    }
983
984    /// Get chunks by session ID
985    pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
986        let conn = self.conn.lock().await;
987
988        let mut stmt = conn.prepare(
989            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
990                    source_path, source_mtime, source_size, source_hash
991             FROM session_memory_chunks
992             WHERE session_id = ?1
993             ORDER BY created_at DESC",
994        )?;
995
996        let chunks = stmt
997            .query_map(params![session_id], |row| {
998                row_to_chunk(row, MemoryTier::Session)
999            })?
1000            .collect::<Result<Vec<_>, _>>()?;
1001
1002        Ok(chunks)
1003    }
1004
1005    /// Get chunks by project ID
1006    pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1007        let conn = self.conn.lock().await;
1008
1009        let mut stmt = conn.prepare(
1010            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1011                    source_path, source_mtime, source_size, source_hash
1012             FROM project_memory_chunks
1013             WHERE project_id = ?1
1014             ORDER BY created_at DESC",
1015        )?;
1016
1017        let chunks = stmt
1018            .query_map(params![project_id], |row| {
1019                row_to_chunk(row, MemoryTier::Project)
1020            })?
1021            .collect::<Result<Vec<_>, _>>()?;
1022
1023        Ok(chunks)
1024    }
1025
1026    /// Get global chunks
1027    pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
1028        let conn = self.conn.lock().await;
1029
1030        let mut stmt = conn.prepare(
1031            "SELECT id, content, source, created_at, token_count, metadata,
1032                    source_path, source_mtime, source_size, source_hash
1033             FROM global_memory_chunks
1034             ORDER BY created_at DESC
1035             LIMIT ?1",
1036        )?;
1037
1038        let chunks = stmt
1039            .query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
1040            .collect::<Result<Vec<_>, _>>()?;
1041
1042        Ok(chunks)
1043    }
1044
1045    pub async fn global_chunk_exists_by_source_hash(
1046        &self,
1047        source_hash: &str,
1048    ) -> MemoryResult<bool> {
1049        let conn = self.conn.lock().await;
1050        let exists = conn
1051            .query_row(
1052                "SELECT 1 FROM global_memory_chunks WHERE source_hash = ?1 LIMIT 1",
1053                params![source_hash],
1054                |_row| Ok(()),
1055            )
1056            .optional()?
1057            .is_some();
1058        Ok(exists)
1059    }
1060
1061    /// Clear session memory
1062    pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
1063        let conn = self.conn.lock().await;
1064
1065        // Get count before deletion
1066        let count: i64 = conn.query_row(
1067            "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
1068            params![session_id],
1069            |row| row.get(0),
1070        )?;
1071
1072        // Delete vectors first (foreign key constraint)
1073        conn.execute(
1074            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
1075             (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
1076            params![session_id],
1077        )?;
1078
1079        // Delete chunks
1080        conn.execute(
1081            "DELETE FROM session_memory_chunks WHERE session_id = ?1",
1082            params![session_id],
1083        )?;
1084
1085        Ok(count as u64)
1086    }
1087
1088    /// Clear project memory
1089    pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
1090        let conn = self.conn.lock().await;
1091
1092        // Get count before deletion
1093        let count: i64 = conn.query_row(
1094            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1095            params![project_id],
1096            |row| row.get(0),
1097        )?;
1098
1099        // Delete vectors first
1100        conn.execute(
1101            "DELETE FROM project_memory_vectors WHERE chunk_id IN 
1102             (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
1103            params![project_id],
1104        )?;
1105
1106        // Delete chunks
1107        conn.execute(
1108            "DELETE FROM project_memory_chunks WHERE project_id = ?1",
1109            params![project_id],
1110        )?;
1111
1112        Ok(count as u64)
1113    }
1114
1115    /// Clear global memory chunks by source prefix (and matching vectors).
1116    pub async fn clear_global_memory_by_source_prefix(
1117        &self,
1118        source_prefix: &str,
1119    ) -> MemoryResult<u64> {
1120        let conn = self.conn.lock().await;
1121        let like = format!("{}%", source_prefix);
1122
1123        let count: i64 = conn.query_row(
1124            "SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
1125            params![like],
1126            |row| row.get(0),
1127        )?;
1128
1129        conn.execute(
1130            "DELETE FROM global_memory_vectors WHERE chunk_id IN
1131             (SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
1132            params![like],
1133        )?;
1134
1135        conn.execute(
1136            "DELETE FROM global_memory_chunks WHERE source LIKE ?1",
1137            params![like],
1138        )?;
1139
1140        Ok(count as u64)
1141    }
1142
1143    /// Delete a single memory chunk by id within the requested scope.
1144    pub async fn delete_chunk(
1145        &self,
1146        tier: MemoryTier,
1147        chunk_id: &str,
1148        project_id: Option<&str>,
1149        session_id: Option<&str>,
1150    ) -> MemoryResult<u64> {
1151        let conn = self.conn.lock().await;
1152
1153        let deleted = match tier {
1154            MemoryTier::Session => {
1155                let Some(session_id) = session_id else {
1156                    return Err(MemoryError::InvalidConfig(
1157                        "session_id is required to delete session memory chunks".to_string(),
1158                    ));
1159                };
1160                conn.execute(
1161                    "DELETE FROM session_memory_vectors WHERE chunk_id IN
1162                     (SELECT id FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2)",
1163                    params![chunk_id, session_id],
1164                )?;
1165                conn.execute(
1166                    "DELETE FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2",
1167                    params![chunk_id, session_id],
1168                )?
1169            }
1170            MemoryTier::Project => {
1171                let Some(project_id) = project_id else {
1172                    return Err(MemoryError::InvalidConfig(
1173                        "project_id is required to delete project memory chunks".to_string(),
1174                    ));
1175                };
1176                conn.execute(
1177                    "DELETE FROM project_memory_vectors WHERE chunk_id IN
1178                     (SELECT id FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2)",
1179                    params![chunk_id, project_id],
1180                )?;
1181                conn.execute(
1182                    "DELETE FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2",
1183                    params![chunk_id, project_id],
1184                )?
1185            }
1186            MemoryTier::Global => {
1187                conn.execute(
1188                    "DELETE FROM global_memory_vectors WHERE chunk_id IN
1189                     (SELECT id FROM global_memory_chunks WHERE id = ?1)",
1190                    params![chunk_id],
1191                )?;
1192                conn.execute(
1193                    "DELETE FROM global_memory_chunks WHERE id = ?1",
1194                    params![chunk_id],
1195                )?
1196            }
1197        };
1198
1199        Ok(deleted as u64)
1200    }
1201
1202    /// Clear old session memory based on retention policy
1203    pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
1204        let conn = self.conn.lock().await;
1205
1206        let cutoff = Utc::now() - chrono::Duration::days(retention_days);
1207        let cutoff_str = cutoff.to_rfc3339();
1208
1209        // Get count before deletion
1210        let count: i64 = conn.query_row(
1211            "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
1212            params![cutoff_str],
1213            |row| row.get(0),
1214        )?;
1215
1216        // Delete vectors first
1217        conn.execute(
1218            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
1219             (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
1220            params![cutoff_str],
1221        )?;
1222
1223        // Delete chunks
1224        conn.execute(
1225            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1226            params![cutoff_str],
1227        )?;
1228
1229        Ok(count as u64)
1230    }
1231
1232    /// Get or create memory config for a project
1233    pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1234        let conn = self.conn.lock().await;
1235
1236        let result: Option<MemoryConfig> = conn
1237            .query_row(
1238                "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1239                        session_retention_days, token_budget, chunk_overlap
1240                 FROM memory_config WHERE project_id = ?1",
1241                params![project_id],
1242                |row| {
1243                    Ok(MemoryConfig {
1244                        max_chunks: row.get(0)?,
1245                        chunk_size: row.get(1)?,
1246                        retrieval_k: row.get(2)?,
1247                        auto_cleanup: row.get::<_, i64>(3)? != 0,
1248                        session_retention_days: row.get(4)?,
1249                        token_budget: row.get(5)?,
1250                        chunk_overlap: row.get(6)?,
1251                    })
1252                },
1253            )
1254            .optional()?;
1255
1256        match result {
1257            Some(config) => Ok(config),
1258            None => {
1259                // Create default config
1260                let config = MemoryConfig::default();
1261                let updated_at = Utc::now().to_rfc3339();
1262
1263                conn.execute(
1264                    "INSERT INTO memory_config 
1265                     (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1266                      session_retention_days, token_budget, chunk_overlap, updated_at)
1267                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1268                    params![
1269                        project_id,
1270                        config.max_chunks,
1271                        config.chunk_size,
1272                        config.retrieval_k,
1273                        config.auto_cleanup as i64,
1274                        config.session_retention_days,
1275                        config.token_budget,
1276                        config.chunk_overlap,
1277                        updated_at
1278                    ],
1279                )?;
1280
1281                Ok(config)
1282            }
1283        }
1284    }
1285
1286    /// Update memory config for a project
1287    pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1288        let conn = self.conn.lock().await;
1289
1290        let updated_at = Utc::now().to_rfc3339();
1291
1292        conn.execute(
1293            "INSERT OR REPLACE INTO memory_config 
1294             (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1295              session_retention_days, token_budget, chunk_overlap, updated_at)
1296             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1297            params![
1298                project_id,
1299                config.max_chunks,
1300                config.chunk_size,
1301                config.retrieval_k,
1302                config.auto_cleanup as i64,
1303                config.session_retention_days,
1304                config.token_budget,
1305                config.chunk_overlap,
1306                updated_at
1307            ],
1308        )?;
1309
1310        Ok(())
1311    }
1312
1313    /// Get memory statistics
1314    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1315        let conn = self.conn.lock().await;
1316
1317        // Count chunks
1318        let session_chunks: i64 =
1319            conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1320                row.get(0)
1321            })?;
1322
1323        let project_chunks: i64 =
1324            conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1325                row.get(0)
1326            })?;
1327
1328        let global_chunks: i64 =
1329            conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1330                row.get(0)
1331            })?;
1332
1333        // Calculate sizes
1334        let session_bytes: i64 = conn.query_row(
1335            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1336            [],
1337            |row| row.get(0),
1338        )?;
1339
1340        let project_bytes: i64 = conn.query_row(
1341            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1342            [],
1343            |row| row.get(0),
1344        )?;
1345
1346        let global_bytes: i64 = conn.query_row(
1347            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1348            [],
1349            |row| row.get(0),
1350        )?;
1351
1352        // Get last cleanup
1353        let last_cleanup: Option<String> = conn
1354            .query_row(
1355                "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1356                [],
1357                |row| row.get(0),
1358            )
1359            .optional()?;
1360
1361        let last_cleanup = last_cleanup.and_then(|s| {
1362            DateTime::parse_from_rfc3339(&s)
1363                .ok()
1364                .map(|dt| dt.with_timezone(&Utc))
1365        });
1366
1367        // Get file size
1368        let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1369
1370        Ok(MemoryStats {
1371            total_chunks: session_chunks + project_chunks + global_chunks,
1372            session_chunks,
1373            project_chunks,
1374            global_chunks,
1375            total_bytes: session_bytes + project_bytes + global_bytes,
1376            session_bytes,
1377            project_bytes,
1378            global_bytes,
1379            file_size,
1380            last_cleanup,
1381        })
1382    }
1383
1384    /// Log cleanup operation
1385    pub async fn log_cleanup(
1386        &self,
1387        cleanup_type: &str,
1388        tier: MemoryTier,
1389        project_id: Option<&str>,
1390        session_id: Option<&str>,
1391        chunks_deleted: i64,
1392        bytes_reclaimed: i64,
1393    ) -> MemoryResult<()> {
1394        let conn = self.conn.lock().await;
1395
1396        let id = uuid::Uuid::new_v4().to_string();
1397        let created_at = Utc::now().to_rfc3339();
1398
1399        conn.execute(
1400            "INSERT INTO memory_cleanup_log 
1401             (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1402             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1403            params![
1404                id,
1405                cleanup_type,
1406                tier.to_string(),
1407                project_id,
1408                session_id,
1409                chunks_deleted,
1410                bytes_reclaimed,
1411                created_at
1412            ],
1413        )?;
1414
1415        Ok(())
1416    }
1417
1418    /// Vacuum the database to reclaim space
1419    pub async fn vacuum(&self) -> MemoryResult<()> {
1420        let conn = self.conn.lock().await;
1421        conn.execute("VACUUM", [])?;
1422        Ok(())
1423    }
1424
1425    // ---------------------------------------------------------------------
1426    // Project file indexing helpers
1427    // ---------------------------------------------------------------------
1428
1429    pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1430        let conn = self.conn.lock().await;
1431        let n: i64 = conn.query_row(
1432            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1433            params![project_id],
1434            |row| row.get(0),
1435        )?;
1436        Ok(n)
1437    }
1438
1439    pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1440        let conn = self.conn.lock().await;
1441        let exists: Option<i64> = conn
1442            .query_row(
1443                "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1444                params![project_id],
1445                |row| row.get(0),
1446            )
1447            .optional()?;
1448        Ok(exists.is_some())
1449    }
1450
1451    pub async fn get_file_index_entry(
1452        &self,
1453        project_id: &str,
1454        path: &str,
1455    ) -> MemoryResult<Option<(i64, i64, String)>> {
1456        let conn = self.conn.lock().await;
1457        let row: Option<(i64, i64, String)> = conn
1458            .query_row(
1459                "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1460                params![project_id, path],
1461                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1462            )
1463            .optional()?;
1464        Ok(row)
1465    }
1466
1467    pub async fn upsert_file_index_entry(
1468        &self,
1469        project_id: &str,
1470        path: &str,
1471        mtime: i64,
1472        size: i64,
1473        hash: &str,
1474    ) -> MemoryResult<()> {
1475        let conn = self.conn.lock().await;
1476        let indexed_at = Utc::now().to_rfc3339();
1477        conn.execute(
1478            "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1479             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1480             ON CONFLICT(project_id, path) DO UPDATE SET
1481                mtime = excluded.mtime,
1482                size = excluded.size,
1483                hash = excluded.hash,
1484                indexed_at = excluded.indexed_at",
1485            params![project_id, path, mtime, size, hash, indexed_at],
1486        )?;
1487        Ok(())
1488    }
1489
1490    pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1491        let conn = self.conn.lock().await;
1492        conn.execute(
1493            "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1494            params![project_id, path],
1495        )?;
1496        Ok(())
1497    }
1498
1499    pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1500        let conn = self.conn.lock().await;
1501        let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1502        let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1503        Ok(rows.collect::<Result<Vec<_>, _>>()?)
1504    }
1505
1506    pub async fn delete_project_file_chunks_by_path(
1507        &self,
1508        project_id: &str,
1509        source_path: &str,
1510    ) -> MemoryResult<(i64, i64)> {
1511        let conn = self.conn.lock().await;
1512
1513        let chunks_deleted: i64 = conn.query_row(
1514            "SELECT COUNT(*) FROM project_memory_chunks
1515             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1516            params![project_id, source_path],
1517            |row| row.get(0),
1518        )?;
1519
1520        let bytes_estimated: i64 = conn.query_row(
1521            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1522             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1523            params![project_id, source_path],
1524            |row| row.get(0),
1525        )?;
1526
1527        // Delete vectors first (keep order consistent with other clears)
1528        conn.execute(
1529            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1530             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1531            params![project_id, source_path],
1532        )?;
1533
1534        conn.execute(
1535            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1536            params![project_id, source_path],
1537        )?;
1538
1539        Ok((chunks_deleted, bytes_estimated))
1540    }
1541
1542    pub async fn get_import_index_entry(
1543        &self,
1544        tier: MemoryTier,
1545        session_id: Option<&str>,
1546        project_id: Option<&str>,
1547        path: &str,
1548    ) -> MemoryResult<Option<(i64, i64, String)>> {
1549        let conn = self.conn.lock().await;
1550        let row = match tier {
1551            MemoryTier::Session => {
1552                let session_id = require_scope_id(tier, session_id)?;
1553                conn.query_row(
1554                    "SELECT mtime, size, hash FROM session_file_index WHERE session_id = ?1 AND path = ?2",
1555                    params![session_id, path],
1556                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1557                )
1558                .optional()?
1559            }
1560            MemoryTier::Project => {
1561                let project_id = require_scope_id(tier, project_id)?;
1562                conn.query_row(
1563                    "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1564                    params![project_id, path],
1565                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1566                )
1567                .optional()?
1568            }
1569            MemoryTier::Global => conn
1570                .query_row(
1571                    "SELECT mtime, size, hash FROM global_file_index WHERE path = ?1",
1572                    params![path],
1573                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1574                )
1575                .optional()?,
1576        };
1577        Ok(row)
1578    }
1579
1580    pub async fn upsert_import_index_entry(
1581        &self,
1582        tier: MemoryTier,
1583        session_id: Option<&str>,
1584        project_id: Option<&str>,
1585        path: &str,
1586        mtime: i64,
1587        size: i64,
1588        hash: &str,
1589    ) -> MemoryResult<()> {
1590        let conn = self.conn.lock().await;
1591        let indexed_at = Utc::now().to_rfc3339();
1592        match tier {
1593            MemoryTier::Session => {
1594                let session_id = require_scope_id(tier, session_id)?;
1595                conn.execute(
1596                    "INSERT INTO session_file_index (session_id, path, mtime, size, hash, indexed_at)
1597                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1598                     ON CONFLICT(session_id, path) DO UPDATE SET
1599                        mtime = excluded.mtime,
1600                        size = excluded.size,
1601                        hash = excluded.hash,
1602                        indexed_at = excluded.indexed_at",
1603                    params![session_id, path, mtime, size, hash, indexed_at],
1604                )?;
1605            }
1606            MemoryTier::Project => {
1607                let project_id = require_scope_id(tier, project_id)?;
1608                conn.execute(
1609                    "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1610                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1611                     ON CONFLICT(project_id, path) DO UPDATE SET
1612                        mtime = excluded.mtime,
1613                        size = excluded.size,
1614                        hash = excluded.hash,
1615                        indexed_at = excluded.indexed_at",
1616                    params![project_id, path, mtime, size, hash, indexed_at],
1617                )?;
1618            }
1619            MemoryTier::Global => {
1620                conn.execute(
1621                    "INSERT INTO global_file_index (path, mtime, size, hash, indexed_at)
1622                     VALUES (?1, ?2, ?3, ?4, ?5)
1623                     ON CONFLICT(path) DO UPDATE SET
1624                        mtime = excluded.mtime,
1625                        size = excluded.size,
1626                        hash = excluded.hash,
1627                        indexed_at = excluded.indexed_at",
1628                    params![path, mtime, size, hash, indexed_at],
1629                )?;
1630            }
1631        }
1632        Ok(())
1633    }
1634
1635    pub async fn list_import_index_paths(
1636        &self,
1637        tier: MemoryTier,
1638        session_id: Option<&str>,
1639        project_id: Option<&str>,
1640    ) -> MemoryResult<Vec<String>> {
1641        let conn = self.conn.lock().await;
1642        let rows = match tier {
1643            MemoryTier::Session => {
1644                let session_id = require_scope_id(tier, session_id)?;
1645                let mut stmt =
1646                    conn.prepare("SELECT path FROM session_file_index WHERE session_id = ?1")?;
1647                let rows = stmt.query_map(params![session_id], |row| row.get::<_, String>(0))?;
1648                rows.collect::<Result<Vec<_>, _>>()?
1649            }
1650            MemoryTier::Project => {
1651                let project_id = require_scope_id(tier, project_id)?;
1652                let mut stmt =
1653                    conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1654                let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1655                rows.collect::<Result<Vec<_>, _>>()?
1656            }
1657            MemoryTier::Global => {
1658                let mut stmt = conn.prepare("SELECT path FROM global_file_index")?;
1659                let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
1660                rows.collect::<Result<Vec<_>, _>>()?
1661            }
1662        };
1663        Ok(rows)
1664    }
1665
1666    pub async fn delete_import_index_entry(
1667        &self,
1668        tier: MemoryTier,
1669        session_id: Option<&str>,
1670        project_id: Option<&str>,
1671        path: &str,
1672    ) -> MemoryResult<()> {
1673        let conn = self.conn.lock().await;
1674        match tier {
1675            MemoryTier::Session => {
1676                let session_id = require_scope_id(tier, session_id)?;
1677                conn.execute(
1678                    "DELETE FROM session_file_index WHERE session_id = ?1 AND path = ?2",
1679                    params![session_id, path],
1680                )?;
1681            }
1682            MemoryTier::Project => {
1683                let project_id = require_scope_id(tier, project_id)?;
1684                conn.execute(
1685                    "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1686                    params![project_id, path],
1687                )?;
1688            }
1689            MemoryTier::Global => {
1690                conn.execute(
1691                    "DELETE FROM global_file_index WHERE path = ?1",
1692                    params![path],
1693                )?;
1694            }
1695        }
1696        Ok(())
1697    }
1698
1699    pub async fn delete_file_chunks_by_path(
1700        &self,
1701        tier: MemoryTier,
1702        session_id: Option<&str>,
1703        project_id: Option<&str>,
1704        source_path: &str,
1705    ) -> MemoryResult<(i64, i64)> {
1706        let conn = self.conn.lock().await;
1707        let result = match tier {
1708            MemoryTier::Session => {
1709                let session_id = require_scope_id(tier, session_id)?;
1710                let chunks_deleted: i64 = conn.query_row(
1711                    "SELECT COUNT(*) FROM session_memory_chunks
1712                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1713                    params![session_id, source_path],
1714                    |row| row.get(0),
1715                )?;
1716                let bytes_estimated: i64 = conn.query_row(
1717                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
1718                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1719                    params![session_id, source_path],
1720                    |row| row.get(0),
1721                )?;
1722                conn.execute(
1723                    "DELETE FROM session_memory_vectors WHERE chunk_id IN
1724                     (SELECT id FROM session_memory_chunks WHERE session_id = ?1 AND source = 'file' AND source_path = ?2)",
1725                    params![session_id, source_path],
1726                )?;
1727                conn.execute(
1728                    "DELETE FROM session_memory_chunks
1729                     WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1730                    params![session_id, source_path],
1731                )?;
1732                (chunks_deleted, bytes_estimated)
1733            }
1734            MemoryTier::Project => {
1735                let project_id = require_scope_id(tier, project_id)?;
1736                let chunks_deleted: i64 = conn.query_row(
1737                    "SELECT COUNT(*) FROM project_memory_chunks
1738                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1739                    params![project_id, source_path],
1740                    |row| row.get(0),
1741                )?;
1742                let bytes_estimated: i64 = conn.query_row(
1743                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1744                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1745                    params![project_id, source_path],
1746                    |row| row.get(0),
1747                )?;
1748                conn.execute(
1749                    "DELETE FROM project_memory_vectors WHERE chunk_id IN
1750                     (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1751                    params![project_id, source_path],
1752                )?;
1753                conn.execute(
1754                    "DELETE FROM project_memory_chunks
1755                     WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1756                    params![project_id, source_path],
1757                )?;
1758                (chunks_deleted, bytes_estimated)
1759            }
1760            MemoryTier::Global => {
1761                let chunks_deleted: i64 = conn.query_row(
1762                    "SELECT COUNT(*) FROM global_memory_chunks
1763                     WHERE source = 'file' AND source_path = ?1",
1764                    params![source_path],
1765                    |row| row.get(0),
1766                )?;
1767                let bytes_estimated: i64 = conn.query_row(
1768                    "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
1769                     WHERE source = 'file' AND source_path = ?1",
1770                    params![source_path],
1771                    |row| row.get(0),
1772                )?;
1773                conn.execute(
1774                    "DELETE FROM global_memory_vectors WHERE chunk_id IN
1775                     (SELECT id FROM global_memory_chunks WHERE source = 'file' AND source_path = ?1)",
1776                    params![source_path],
1777                )?;
1778                conn.execute(
1779                    "DELETE FROM global_memory_chunks
1780                     WHERE source = 'file' AND source_path = ?1",
1781                    params![source_path],
1782                )?;
1783                (chunks_deleted, bytes_estimated)
1784            }
1785        };
1786        Ok(result)
1787    }
1788
1789    pub async fn upsert_project_index_status(
1790        &self,
1791        project_id: &str,
1792        total_files: i64,
1793        processed_files: i64,
1794        indexed_files: i64,
1795        skipped_files: i64,
1796        errors: i64,
1797    ) -> MemoryResult<()> {
1798        let conn = self.conn.lock().await;
1799        let last_indexed_at = Utc::now().to_rfc3339();
1800        conn.execute(
1801            "INSERT INTO project_index_status (
1802                project_id, last_indexed_at, last_total_files, last_processed_files,
1803                last_indexed_files, last_skipped_files, last_errors
1804             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1805             ON CONFLICT(project_id) DO UPDATE SET
1806                last_indexed_at = excluded.last_indexed_at,
1807                last_total_files = excluded.last_total_files,
1808                last_processed_files = excluded.last_processed_files,
1809                last_indexed_files = excluded.last_indexed_files,
1810                last_skipped_files = excluded.last_skipped_files,
1811                last_errors = excluded.last_errors",
1812            params![
1813                project_id,
1814                last_indexed_at,
1815                total_files,
1816                processed_files,
1817                indexed_files,
1818                skipped_files,
1819                errors
1820            ],
1821        )?;
1822        Ok(())
1823    }
1824
1825    pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1826        let conn = self.conn.lock().await;
1827
1828        let project_chunks: i64 = conn.query_row(
1829            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1830            params![project_id],
1831            |row| row.get(0),
1832        )?;
1833
1834        let project_bytes: i64 = conn.query_row(
1835            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1836            params![project_id],
1837            |row| row.get(0),
1838        )?;
1839
1840        let file_index_chunks: i64 = conn.query_row(
1841            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1842            params![project_id],
1843            |row| row.get(0),
1844        )?;
1845
1846        let file_index_bytes: i64 = conn.query_row(
1847            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1848            params![project_id],
1849            |row| row.get(0),
1850        )?;
1851
1852        let indexed_files: i64 = conn.query_row(
1853            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1854            params![project_id],
1855            |row| row.get(0),
1856        )?;
1857
1858        let status_row: Option<ProjectIndexStatusRow> =
1859            conn
1860                .query_row(
1861                    "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1862                     FROM project_index_status WHERE project_id = ?1",
1863                    params![project_id],
1864                    |row| {
1865                        Ok((
1866                            row.get(0)?,
1867                            row.get(1)?,
1868                            row.get(2)?,
1869                            row.get(3)?,
1870                            row.get(4)?,
1871                            row.get(5)?,
1872                        ))
1873                    },
1874                )
1875                .optional()?;
1876
1877        let (
1878            last_indexed_at,
1879            last_total_files,
1880            last_processed_files,
1881            last_indexed_files,
1882            last_skipped_files,
1883            last_errors,
1884        ) = status_row.unwrap_or((None, None, None, None, None, None));
1885
1886        let last_indexed_at = last_indexed_at.and_then(|s| {
1887            DateTime::parse_from_rfc3339(&s)
1888                .ok()
1889                .map(|dt| dt.with_timezone(&Utc))
1890        });
1891
1892        Ok(ProjectMemoryStats {
1893            project_id: project_id.to_string(),
1894            project_chunks,
1895            project_bytes,
1896            file_index_chunks,
1897            file_index_bytes,
1898            indexed_files,
1899            last_indexed_at,
1900            last_total_files,
1901            last_processed_files,
1902            last_indexed_files,
1903            last_skipped_files,
1904            last_errors,
1905        })
1906    }
1907
1908    pub async fn clear_project_file_index(
1909        &self,
1910        project_id: &str,
1911        vacuum: bool,
1912    ) -> MemoryResult<ClearFileIndexResult> {
1913        let conn = self.conn.lock().await;
1914
1915        let chunks_deleted: i64 = conn.query_row(
1916            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1917            params![project_id],
1918            |row| row.get(0),
1919        )?;
1920
1921        let bytes_estimated: i64 = conn.query_row(
1922            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1923            params![project_id],
1924            |row| row.get(0),
1925        )?;
1926
1927        // Delete vectors first
1928        conn.execute(
1929            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1930             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1931            params![project_id],
1932        )?;
1933
1934        // Delete file chunks
1935        conn.execute(
1936            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1937            params![project_id],
1938        )?;
1939
1940        // Clear file index tracking + status
1941        conn.execute(
1942            "DELETE FROM project_file_index WHERE project_id = ?1",
1943            params![project_id],
1944        )?;
1945        conn.execute(
1946            "DELETE FROM project_index_status WHERE project_id = ?1",
1947            params![project_id],
1948        )?;
1949
1950        drop(conn); // release lock before VACUUM (which needs exclusive access)
1951
1952        if vacuum {
1953            self.vacuum().await?;
1954        }
1955
1956        Ok(ClearFileIndexResult {
1957            chunks_deleted,
1958            bytes_estimated,
1959            did_vacuum: vacuum,
1960        })
1961    }
1962
1963    // ------------------------------------------------------------------
1964    // Memory hygiene
1965    // ------------------------------------------------------------------
1966
1967    /// Delete session memory chunks older than `retention_days` days.
1968    ///
1969    /// Also removes orphaned vector entries for the deleted chunks so the
1970    /// sqlite-vec virtual table stays consistent.
1971    ///
1972    /// Returns the number of chunk rows deleted.
1973    /// If `retention_days` is 0 hygiene is disabled and this returns Ok(0).
1974    pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1975        if retention_days == 0 {
1976            return Ok(0);
1977        }
1978
1979        let conn = self.conn.lock().await;
1980
1981        // WAL is already active (set in new()) — no need to set it again here.
1982        let cutoff =
1983            (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1984
1985        // Remove orphaned vector entries first (chunk_id FK would dangle otherwise)
1986        conn.execute(
1987            "DELETE FROM session_memory_vectors
1988             WHERE chunk_id IN (
1989                 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1990             )",
1991            params![cutoff],
1992        )?;
1993
1994        let deleted = conn.execute(
1995            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1996            params![cutoff],
1997        )?;
1998
1999        if deleted > 0 {
2000            tracing::info!(
2001                retention_days,
2002                deleted,
2003                "memory hygiene: pruned old session chunks"
2004            );
2005        }
2006
2007        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2008        Ok(deleted as u64)
2009    }
2010
2011    /// Run scheduled hygiene: read `session_retention_days` from `memory_config`
2012    /// (falling back to `env_override` if provided) and prune stale session chunks.
2013    ///
2014    /// Returns `Ok(chunks_deleted)`. This method is intentionally best-effort —
2015    /// callers should log errors and continue.
2016    pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
2017        // Prefer the env override, fall back to the DB config for the null project.
2018        let retention_days = if env_override_days > 0 {
2019            env_override_days
2020        } else {
2021            // Try to read the global (project_id = '__global__') config if present.
2022            let conn = self.conn.lock().await;
2023            let days: Option<i64> = conn
2024                .query_row(
2025                    "SELECT session_retention_days FROM memory_config
2026                     WHERE project_id = '__global__' LIMIT 1",
2027                    [],
2028                    |row| row.get(0),
2029                )
2030                .ok();
2031            drop(conn);
2032            days.unwrap_or(30) as u32
2033        };
2034
2035        self.prune_old_session_chunks(retention_days).await
2036    }
2037
2038    pub async fn put_global_memory_record(
2039        &self,
2040        record: &GlobalMemoryRecord,
2041    ) -> MemoryResult<GlobalMemoryWriteResult> {
2042        let conn = self.conn.lock().await;
2043
2044        let existing: Option<String> = conn
2045            .query_row(
2046                "SELECT id FROM memory_records
2047                 WHERE user_id = ?1
2048                   AND source_type = ?2
2049                   AND content_hash = ?3
2050                   AND run_id = ?4
2051                   AND IFNULL(session_id, '') = IFNULL(?5, '')
2052                   AND IFNULL(message_id, '') = IFNULL(?6, '')
2053                   AND IFNULL(tool_name, '') = IFNULL(?7, '')
2054                 LIMIT 1",
2055                params![
2056                    record.user_id,
2057                    record.source_type,
2058                    record.content_hash,
2059                    record.run_id,
2060                    record.session_id,
2061                    record.message_id,
2062                    record.tool_name
2063                ],
2064                |row| row.get(0),
2065            )
2066            .optional()?;
2067
2068        if let Some(id) = existing {
2069            return Ok(GlobalMemoryWriteResult {
2070                id,
2071                stored: false,
2072                deduped: true,
2073            });
2074        }
2075
2076        let metadata = record
2077            .metadata
2078            .as_ref()
2079            .map(ToString::to_string)
2080            .unwrap_or_default();
2081        let provenance = record
2082            .provenance
2083            .as_ref()
2084            .map(ToString::to_string)
2085            .unwrap_or_default();
2086        conn.execute(
2087            "INSERT INTO memory_records(
2088                id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
2089                project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
2090                visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
2091            ) VALUES (
2092                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
2093                ?10, ?11, ?12, ?13, ?14, ?15, ?16,
2094                ?17, ?18, ?19, ?20, ?21, ?22
2095            )",
2096            params![
2097                record.id,
2098                record.user_id,
2099                record.source_type,
2100                record.content,
2101                record.content_hash,
2102                record.run_id,
2103                record.session_id,
2104                record.message_id,
2105                record.tool_name,
2106                record.project_tag,
2107                record.channel_tag,
2108                record.host_tag,
2109                metadata,
2110                provenance,
2111                record.redaction_status,
2112                i64::from(record.redaction_count),
2113                record.visibility,
2114                if record.demoted { 1i64 } else { 0i64 },
2115                record.score_boost,
2116                record.created_at_ms as i64,
2117                record.updated_at_ms as i64,
2118                record.expires_at_ms.map(|v| v as i64),
2119            ],
2120        )?;
2121
2122        Ok(GlobalMemoryWriteResult {
2123            id: record.id.clone(),
2124            stored: true,
2125            deduped: false,
2126        })
2127    }
2128
2129    #[allow(clippy::too_many_arguments)]
2130    pub async fn search_global_memory(
2131        &self,
2132        user_id: &str,
2133        query: &str,
2134        limit: i64,
2135        project_tag: Option<&str>,
2136        channel_tag: Option<&str>,
2137        host_tag: Option<&str>,
2138    ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
2139        let conn = self.conn.lock().await;
2140        let now_ms = chrono::Utc::now().timestamp_millis();
2141        let mut hits = Vec::new();
2142
2143        let fts_query = build_fts_query(query);
2144        let search_limit = limit.clamp(1, 100);
2145        let maybe_rows = conn.prepare(
2146            "SELECT
2147                m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
2148                m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
2149                m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
2150                m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
2151                bm25(memory_records_fts) AS rank
2152             FROM memory_records_fts
2153             JOIN memory_records m ON m.id = memory_records_fts.id
2154             WHERE memory_records_fts MATCH ?1
2155               AND m.user_id = ?2
2156               AND m.demoted = 0
2157               AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
2158               AND (?4 IS NULL OR m.project_tag = ?4)
2159               AND (?5 IS NULL OR m.channel_tag = ?5)
2160               AND (?6 IS NULL OR m.host_tag = ?6)
2161             ORDER BY rank ASC
2162             LIMIT ?7"
2163        );
2164
2165        if let Ok(mut stmt) = maybe_rows {
2166            let rows = stmt.query_map(
2167                params![
2168                    fts_query,
2169                    user_id,
2170                    now_ms,
2171                    project_tag,
2172                    channel_tag,
2173                    host_tag,
2174                    search_limit
2175                ],
2176                |row| {
2177                    let record = row_to_global_record(row)?;
2178                    let rank = row.get::<_, f64>(22)?;
2179                    let score = 1.0 / (1.0 + rank.max(0.0));
2180                    Ok(GlobalMemorySearchHit { record, score })
2181                },
2182            )?;
2183            for row in rows {
2184                hits.push(row?);
2185            }
2186        }
2187
2188        if !hits.is_empty() {
2189            return Ok(hits);
2190        }
2191
2192        let like = format!("%{}%", query.trim());
2193        let mut stmt = conn.prepare(
2194            "SELECT
2195                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2196                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2197                redaction_status, redaction_count, visibility, demoted, score_boost,
2198                created_at_ms, updated_at_ms, expires_at_ms
2199             FROM memory_records
2200             WHERE user_id = ?1
2201               AND demoted = 0
2202               AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
2203               AND (?3 IS NULL OR project_tag = ?3)
2204               AND (?4 IS NULL OR channel_tag = ?4)
2205               AND (?5 IS NULL OR host_tag = ?5)
2206               AND (?6 = '' OR content LIKE ?7)
2207             ORDER BY created_at_ms DESC
2208             LIMIT ?8",
2209        )?;
2210        let rows = stmt.query_map(
2211            params![
2212                user_id,
2213                now_ms,
2214                project_tag,
2215                channel_tag,
2216                host_tag,
2217                query.trim(),
2218                like,
2219                search_limit
2220            ],
2221            |row| {
2222                let record = row_to_global_record(row)?;
2223                Ok(GlobalMemorySearchHit {
2224                    record,
2225                    score: 0.25,
2226                })
2227            },
2228        )?;
2229        for row in rows {
2230            hits.push(row?);
2231        }
2232
2233        Ok(hits)
2234    }
2235
2236    pub async fn list_global_memory(
2237        &self,
2238        user_id: &str,
2239        q: Option<&str>,
2240        project_tag: Option<&str>,
2241        channel_tag: Option<&str>,
2242        limit: i64,
2243        offset: i64,
2244    ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
2245        let conn = self.conn.lock().await;
2246        let query = q.unwrap_or("").trim();
2247        let like = format!("%{}%", query);
2248        let mut stmt = conn.prepare(
2249            "SELECT
2250                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2251                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2252                redaction_status, redaction_count, visibility, demoted, score_boost,
2253                created_at_ms, updated_at_ms, expires_at_ms
2254             FROM memory_records
2255             WHERE user_id = ?1
2256               AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
2257               AND (?4 IS NULL OR project_tag = ?4)
2258               AND (?5 IS NULL OR channel_tag = ?5)
2259             ORDER BY created_at_ms DESC
2260             LIMIT ?6 OFFSET ?7",
2261        )?;
2262        let rows = stmt.query_map(
2263            params![
2264                user_id,
2265                query,
2266                like,
2267                project_tag,
2268                channel_tag,
2269                limit.clamp(1, 1000),
2270                offset.max(0)
2271            ],
2272            row_to_global_record,
2273        )?;
2274        let mut out = Vec::new();
2275        for row in rows {
2276            out.push(row?);
2277        }
2278        Ok(out)
2279    }
2280
2281    pub async fn set_global_memory_visibility(
2282        &self,
2283        id: &str,
2284        visibility: &str,
2285        demoted: bool,
2286    ) -> MemoryResult<bool> {
2287        let conn = self.conn.lock().await;
2288        let now_ms = chrono::Utc::now().timestamp_millis();
2289        let changed = conn.execute(
2290            "UPDATE memory_records
2291             SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
2292             WHERE id = ?1",
2293            params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
2294        )?;
2295        Ok(changed > 0)
2296    }
2297
2298    pub async fn update_global_memory_context(
2299        &self,
2300        id: &str,
2301        visibility: &str,
2302        demoted: bool,
2303        metadata: Option<&serde_json::Value>,
2304        provenance: Option<&serde_json::Value>,
2305    ) -> MemoryResult<bool> {
2306        let conn = self.conn.lock().await;
2307        let now_ms = chrono::Utc::now().timestamp_millis();
2308        let metadata = metadata.map(ToString::to_string).unwrap_or_default();
2309        let provenance = provenance.map(ToString::to_string).unwrap_or_default();
2310        let changed = conn.execute(
2311            "UPDATE memory_records
2312             SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
2313             WHERE id = ?1",
2314            params![
2315                id,
2316                visibility,
2317                if demoted { 1i64 } else { 0i64 },
2318                metadata,
2319                provenance,
2320                now_ms,
2321            ],
2322        )?;
2323        Ok(changed > 0)
2324    }
2325
2326    pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
2327        let conn = self.conn.lock().await;
2328        let mut stmt = conn.prepare(
2329            "SELECT
2330                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2331                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2332                redaction_status, redaction_count, visibility, demoted, score_boost,
2333                created_at_ms, updated_at_ms, expires_at_ms
2334             FROM memory_records
2335             WHERE id = ?1
2336             LIMIT 1",
2337        )?;
2338        let record = stmt
2339            .query_row(params![id], row_to_global_record)
2340            .optional()?;
2341        Ok(record)
2342    }
2343
2344    pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
2345        let conn = self.conn.lock().await;
2346        let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
2347        Ok(changed > 0)
2348    }
2349}
2350
2351/// Convert a database row to a MemoryChunk
2352fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
2353    let id: String = row.get(0)?;
2354    let content: String = row.get(1)?;
2355    let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
2356        match tier {
2357            MemoryTier::Session => (
2358                Some(row.get(2)?),
2359                row.get(3)?,
2360                4usize,
2361                5usize,
2362                6usize,
2363                7usize,
2364            ),
2365            MemoryTier::Project => (
2366                row.get(2)?,
2367                Some(row.get(3)?),
2368                4usize,
2369                5usize,
2370                6usize,
2371                7usize,
2372            ),
2373            MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
2374        };
2375
2376    let source: String = row.get(source_idx)?;
2377    let created_at_str: String = row.get(created_at_idx)?;
2378    let token_count: i64 = row.get(token_count_idx)?;
2379    let metadata_str: Option<String> = row.get(metadata_idx)?;
2380
2381    let created_at = DateTime::parse_from_rfc3339(&created_at_str)
2382        .map_err(|e| {
2383            rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
2384        })?
2385        .with_timezone(&Utc);
2386
2387    let metadata = metadata_str
2388        .filter(|s| !s.is_empty())
2389        .and_then(|s| serde_json::from_str(&s).ok());
2390
2391    let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
2392    let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
2393    let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
2394    let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
2395
2396    Ok(MemoryChunk {
2397        id,
2398        content,
2399        tier,
2400        session_id,
2401        project_id,
2402        source,
2403        source_path,
2404        source_mtime,
2405        source_size,
2406        source_hash,
2407        created_at,
2408        token_count,
2409        metadata,
2410    })
2411}
2412
2413fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
2414    scope
2415        .filter(|value| !value.trim().is_empty())
2416        .ok_or_else(|| {
2417            crate::types::MemoryError::InvalidConfig(match tier {
2418                MemoryTier::Session => "tier=session requires session_id".to_string(),
2419                MemoryTier::Project => "tier=project requires project_id".to_string(),
2420                MemoryTier::Global => "tier=global does not require a scope id".to_string(),
2421            })
2422        })
2423}
2424
2425fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
2426    let metadata_str: Option<String> = row.get(12)?;
2427    let provenance_str: Option<String> = row.get(13)?;
2428    Ok(GlobalMemoryRecord {
2429        id: row.get(0)?,
2430        user_id: row.get(1)?,
2431        source_type: row.get(2)?,
2432        content: row.get(3)?,
2433        content_hash: row.get(4)?,
2434        run_id: row.get(5)?,
2435        session_id: row.get(6)?,
2436        message_id: row.get(7)?,
2437        tool_name: row.get(8)?,
2438        project_tag: row.get(9)?,
2439        channel_tag: row.get(10)?,
2440        host_tag: row.get(11)?,
2441        metadata: metadata_str
2442            .filter(|s| !s.is_empty())
2443            .and_then(|s| serde_json::from_str(&s).ok()),
2444        provenance: provenance_str
2445            .filter(|s| !s.is_empty())
2446            .and_then(|s| serde_json::from_str(&s).ok()),
2447        redaction_status: row.get(14)?,
2448        redaction_count: row.get::<_, i64>(15)? as u32,
2449        visibility: row.get(16)?,
2450        demoted: row.get::<_, i64>(17)? != 0,
2451        score_boost: row.get(18)?,
2452        created_at_ms: row.get::<_, i64>(19)? as u64,
2453        updated_at_ms: row.get::<_, i64>(20)? as u64,
2454        expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
2455    })
2456}
2457
2458impl MemoryDatabase {
2459    pub async fn get_node_by_uri(
2460        &self,
2461        uri: &str,
2462    ) -> MemoryResult<Option<crate::types::MemoryNode>> {
2463        let conn = self.conn.lock().await;
2464        let mut stmt = conn.prepare(
2465            "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2466             FROM memory_nodes WHERE uri = ?1",
2467        )?;
2468
2469        let result = stmt.query_row(params![uri], |row| {
2470            let node_type_str: String = row.get(3)?;
2471            let node_type = node_type_str
2472                .parse()
2473                .unwrap_or(crate::types::NodeType::File);
2474            let metadata_str: Option<String> = row.get(6)?;
2475            Ok(crate::types::MemoryNode {
2476                id: row.get(0)?,
2477                uri: row.get(1)?,
2478                parent_uri: row.get(2)?,
2479                node_type,
2480                created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2481                updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2482                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2483            })
2484        });
2485
2486        match result {
2487            Ok(node) => Ok(Some(node)),
2488            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2489            Err(e) => Err(MemoryError::Database(e)),
2490        }
2491    }
2492
2493    pub async fn create_node(
2494        &self,
2495        uri: &str,
2496        parent_uri: Option<&str>,
2497        node_type: crate::types::NodeType,
2498        metadata: Option<&serde_json::Value>,
2499    ) -> MemoryResult<String> {
2500        let id = uuid::Uuid::new_v4().to_string();
2501        let now = Utc::now().to_rfc3339();
2502        let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
2503
2504        let conn = self.conn.lock().await;
2505        conn.execute(
2506            "INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
2507             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2508            params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
2509        )?;
2510
2511        Ok(id)
2512    }
2513
2514    pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
2515        let conn = self.conn.lock().await;
2516        let mut stmt = conn.prepare(
2517            "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2518             FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
2519        )?;
2520
2521        let rows = stmt.query_map(params![uri], |row| {
2522            let node_type_str: String = row.get(3)?;
2523            let node_type = node_type_str
2524                .parse()
2525                .unwrap_or(crate::types::NodeType::File);
2526            let metadata_str: Option<String> = row.get(6)?;
2527            Ok(crate::types::MemoryNode {
2528                id: row.get(0)?,
2529                uri: row.get(1)?,
2530                parent_uri: row.get(2)?,
2531                node_type,
2532                created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2533                updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2534                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2535            })
2536        })?;
2537
2538        rows.collect::<Result<Vec<_>, _>>()
2539            .map_err(MemoryError::Database)
2540    }
2541
2542    pub async fn get_layer(
2543        &self,
2544        node_id: &str,
2545        layer_type: crate::types::LayerType,
2546    ) -> MemoryResult<Option<crate::types::MemoryLayer>> {
2547        let conn = self.conn.lock().await;
2548        let mut stmt = conn.prepare(
2549            "SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
2550             FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
2551        )?;
2552
2553        let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
2554            let layer_type_str: String = row.get(2)?;
2555            let layer_type = layer_type_str
2556                .parse()
2557                .unwrap_or(crate::types::LayerType::L2);
2558            Ok(crate::types::MemoryLayer {
2559                id: row.get(0)?,
2560                node_id: row.get(1)?,
2561                layer_type,
2562                content: row.get(3)?,
2563                token_count: row.get(4)?,
2564                embedding_id: row.get(5)?,
2565                created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
2566                source_chunk_id: row.get(7)?,
2567            })
2568        });
2569
2570        match result {
2571            Ok(layer) => Ok(Some(layer)),
2572            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2573            Err(e) => Err(MemoryError::Database(e)),
2574        }
2575    }
2576
2577    pub async fn create_layer(
2578        &self,
2579        node_id: &str,
2580        layer_type: crate::types::LayerType,
2581        content: &str,
2582        token_count: i64,
2583        source_chunk_id: Option<&str>,
2584    ) -> MemoryResult<String> {
2585        let id = uuid::Uuid::new_v4().to_string();
2586        let now = Utc::now().to_rfc3339();
2587
2588        let conn = self.conn.lock().await;
2589        conn.execute(
2590            "INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
2591             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2592            params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
2593        )?;
2594
2595        Ok(id)
2596    }
2597
2598    pub async fn get_children_tree(
2599        &self,
2600        parent_uri: &str,
2601        max_depth: usize,
2602    ) -> MemoryResult<Vec<crate::types::TreeNode>> {
2603        if max_depth == 0 {
2604            return Ok(Vec::new());
2605        }
2606
2607        let children = self.list_directory(parent_uri).await?;
2608        let mut tree_nodes = Vec::new();
2609
2610        for child in children {
2611            let layer_summary = self.get_layer_summary(&child.id).await?;
2612
2613            let grandchildren = if child.node_type == crate::types::NodeType::Directory {
2614                Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
2615            } else {
2616                Vec::new()
2617            };
2618
2619            tree_nodes.push(crate::types::TreeNode {
2620                node: child,
2621                children: grandchildren,
2622                layer_summary,
2623            });
2624        }
2625
2626        Ok(tree_nodes)
2627    }
2628
2629    async fn get_layer_summary(
2630        &self,
2631        node_id: &str,
2632    ) -> MemoryResult<Option<crate::types::LayerSummary>> {
2633        let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
2634        let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
2635        let has_l2 = self
2636            .get_layer(node_id, crate::types::LayerType::L2)
2637            .await?
2638            .is_some();
2639
2640        if l0.is_none() && l1.is_none() && !has_l2 {
2641            return Ok(None);
2642        }
2643
2644        Ok(Some(crate::types::LayerSummary {
2645            l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
2646            l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
2647            has_l2,
2648        }))
2649    }
2650
2651    pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
2652        let conn = self.conn.lock().await;
2653        let count: i64 = conn.query_row(
2654            "SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
2655            params![uri],
2656            |row| row.get(0),
2657        )?;
2658        Ok(count > 0)
2659    }
2660}
2661
2662fn truncate_string(s: &str, max_len: usize) -> String {
2663    if s.len() <= max_len {
2664        s.to_string()
2665    } else {
2666        format!("{}...", &s[..max_len.saturating_sub(3)])
2667    }
2668}
2669
2670fn build_fts_query(query: &str) -> String {
2671    let tokens = query
2672        .split_whitespace()
2673        .filter_map(|tok| {
2674            let cleaned =
2675                tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
2676            if cleaned.is_empty() {
2677                None
2678            } else {
2679                Some(format!("\"{}\"", cleaned))
2680            }
2681        })
2682        .collect::<Vec<_>>();
2683    if tokens.is_empty() {
2684        "\"\"".to_string()
2685    } else {
2686        tokens.join(" OR ")
2687    }
2688}
2689
2690#[cfg(test)]
2691mod tests {
2692    use super::*;
2693    use tempfile::TempDir;
2694
2695    async fn setup_test_db() -> (MemoryDatabase, TempDir) {
2696        let temp_dir = TempDir::new().unwrap();
2697        let db_path = temp_dir.path().join("test_memory.db");
2698        let db = MemoryDatabase::new(&db_path).await.unwrap();
2699        (db, temp_dir)
2700    }
2701
2702    #[tokio::test]
2703    async fn test_init_schema() {
2704        let (db, _temp) = setup_test_db().await;
2705        // If we get here, schema was initialized successfully
2706        let stats = db.get_stats().await.unwrap();
2707        assert_eq!(stats.total_chunks, 0);
2708    }
2709
2710    #[tokio::test]
2711    async fn test_store_and_retrieve_chunk() {
2712        let (db, _temp) = setup_test_db().await;
2713
2714        let chunk = MemoryChunk {
2715            id: "test-1".to_string(),
2716            content: "Test content".to_string(),
2717            tier: MemoryTier::Session,
2718            session_id: Some("session-1".to_string()),
2719            project_id: Some("project-1".to_string()),
2720            source: "user_message".to_string(),
2721            source_path: None,
2722            source_mtime: None,
2723            source_size: None,
2724            source_hash: None,
2725            created_at: Utc::now(),
2726            token_count: 10,
2727            metadata: None,
2728        };
2729
2730        let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
2731        db.store_chunk(&chunk, &embedding).await.unwrap();
2732
2733        let chunks = db.get_session_chunks("session-1").await.unwrap();
2734        assert_eq!(chunks.len(), 1);
2735        assert_eq!(chunks[0].content, "Test content");
2736    }
2737
2738    #[tokio::test]
2739    async fn test_store_and_retrieve_global_chunk() {
2740        let (db, _temp) = setup_test_db().await;
2741
2742        let chunk = MemoryChunk {
2743            id: "global-1".to_string(),
2744            content: "Global note".to_string(),
2745            tier: MemoryTier::Global,
2746            session_id: None,
2747            project_id: None,
2748            source: "agent_note".to_string(),
2749            source_path: None,
2750            source_mtime: None,
2751            source_size: None,
2752            source_hash: None,
2753            created_at: Utc::now(),
2754            token_count: 7,
2755            metadata: Some(serde_json::json!({"kind":"test"})),
2756        };
2757
2758        let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
2759        db.store_chunk(&chunk, &embedding).await.unwrap();
2760
2761        let chunks = db.get_global_chunks(10).await.unwrap();
2762        assert_eq!(chunks.len(), 1);
2763        assert_eq!(chunks[0].content, "Global note");
2764        assert_eq!(chunks[0].source, "agent_note");
2765        assert_eq!(chunks[0].token_count, 7);
2766        assert_eq!(chunks[0].tier, MemoryTier::Global);
2767    }
2768
2769    #[tokio::test]
2770    async fn test_global_chunk_exists_by_source_hash() {
2771        let (db, _temp) = setup_test_db().await;
2772
2773        let chunk = MemoryChunk {
2774            id: "global-hash".to_string(),
2775            content: "Global hash note".to_string(),
2776            tier: MemoryTier::Global,
2777            session_id: None,
2778            project_id: None,
2779            source: "chat_exchange".to_string(),
2780            source_path: None,
2781            source_mtime: None,
2782            source_size: None,
2783            source_hash: Some("hash-123".to_string()),
2784            created_at: Utc::now(),
2785            token_count: 5,
2786            metadata: None,
2787        };
2788
2789        let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
2790        db.store_chunk(&chunk, &embedding).await.unwrap();
2791
2792        assert!(db
2793            .global_chunk_exists_by_source_hash("hash-123")
2794            .await
2795            .unwrap());
2796        assert!(!db
2797            .global_chunk_exists_by_source_hash("missing-hash")
2798            .await
2799            .unwrap());
2800    }
2801
2802    #[tokio::test]
2803    async fn test_config_crud() {
2804        let (db, _temp) = setup_test_db().await;
2805
2806        let config = db.get_or_create_config("project-1").await.unwrap();
2807        assert_eq!(config.max_chunks, 10000);
2808
2809        let new_config = MemoryConfig {
2810            max_chunks: 5000,
2811            ..Default::default()
2812        };
2813        db.update_config("project-1", &new_config).await.unwrap();
2814
2815        let updated = db.get_or_create_config("project-1").await.unwrap();
2816        assert_eq!(updated.max_chunks, 5000);
2817    }
2818
2819    #[tokio::test]
2820    async fn test_global_memory_put_search_and_dedup() {
2821        let (db, _temp) = setup_test_db().await;
2822        let now = chrono::Utc::now().timestamp_millis() as u64;
2823        let record = GlobalMemoryRecord {
2824            id: "gm-1".to_string(),
2825            user_id: "user-a".to_string(),
2826            source_type: "user_message".to_string(),
2827            content: "remember rust workspace layout".to_string(),
2828            content_hash: "h1".to_string(),
2829            run_id: "run-1".to_string(),
2830            session_id: Some("s1".to_string()),
2831            message_id: Some("m1".to_string()),
2832            tool_name: None,
2833            project_tag: Some("proj-x".to_string()),
2834            channel_tag: None,
2835            host_tag: None,
2836            metadata: None,
2837            provenance: None,
2838            redaction_status: "passed".to_string(),
2839            redaction_count: 0,
2840            visibility: "private".to_string(),
2841            demoted: false,
2842            score_boost: 0.0,
2843            created_at_ms: now,
2844            updated_at_ms: now,
2845            expires_at_ms: None,
2846        };
2847        let first = db.put_global_memory_record(&record).await.unwrap();
2848        assert!(first.stored);
2849        let second = db.put_global_memory_record(&record).await.unwrap();
2850        assert!(second.deduped);
2851
2852        let hits = db
2853            .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
2854            .await
2855            .unwrap();
2856        assert!(!hits.is_empty());
2857        assert_eq!(hits[0].record.id, "gm-1");
2858    }
2859}