Skip to main content

tandem_memory/
db.rs

1// Database Layer Module
2// SQLite + sqlite-vec for vector storage
3
4use crate::types::{
5    ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6    MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier, ProjectMemoryStats,
7    DEFAULT_EMBEDDING_DIMENSION,
8};
9use chrono::{DateTime, Utc};
10use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
11use sqlite_vec::sqlite3_vec_init;
12use std::collections::HashSet;
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18type ProjectIndexStatusRow = (
19    Option<String>,
20    Option<i64>,
21    Option<i64>,
22    Option<i64>,
23    Option<i64>,
24    Option<i64>,
25);
26
27/// Database connection manager
28pub struct MemoryDatabase {
29    conn: Arc<Mutex<Connection>>,
30    db_path: std::path::PathBuf,
31}
32
33impl MemoryDatabase {
34    /// Initialize or open the memory database
35    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
36        // Register sqlite-vec extension
37        unsafe {
38            sqlite3_auto_extension(Some(std::mem::transmute::<
39                *const (),
40                unsafe extern "C" fn(
41                    *mut rusqlite::ffi::sqlite3,
42                    *mut *mut i8,
43                    *const rusqlite::ffi::sqlite3_api_routines,
44                ) -> i32,
45            >(sqlite3_vec_init as *const ())));
46        }
47
48        let conn = Connection::open(db_path)?;
49        conn.busy_timeout(Duration::from_secs(10))?;
50
51        // Enable WAL mode for better concurrency
52        // PRAGMA journal_mode returns a row, so we use query_row to ignore it
53        conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
54        conn.execute("PRAGMA synchronous = NORMAL", [])?;
55
56        let db = Self {
57            conn: Arc::new(Mutex::new(conn)),
58            db_path: db_path.to_path_buf(),
59        };
60
61        // Initialize schema
62        db.init_schema().await?;
63        if let Err(err) = db.validate_vector_tables().await {
64            match &err {
65                crate::types::MemoryError::Database(db_err)
66                    if Self::is_vector_table_error(db_err) =>
67                {
68                    tracing::warn!(
69                        "Detected vector table corruption during startup ({}). Recreating vector tables.",
70                        db_err
71                    );
72                    db.recreate_vector_tables().await?;
73                }
74                _ => return Err(err),
75            }
76        }
77        db.validate_integrity().await?;
78
79        Ok(db)
80    }
81
82    /// Validate base SQLite integrity early so startup recovery can heal corrupt DB files.
83    async fn validate_integrity(&self) -> MemoryResult<()> {
84        let conn = self.conn.lock().await;
85        let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
86        {
87            Ok(value) => value,
88            Err(err) => {
89                // sqlite-vec virtual tables can intermittently return generic SQL logic errors
90                // during integrity probing even when runtime reads/writes still work.
91                // Do not block startup on this probe failure.
92                tracing::warn!(
93                    "Skipping strict PRAGMA quick_check due to probe error: {}",
94                    err
95                );
96                return Ok(());
97            }
98        };
99        if check.trim().eq_ignore_ascii_case("ok") {
100            return Ok(());
101        }
102
103        let lowered = check.to_lowercase();
104        if lowered.contains("malformed")
105            || lowered.contains("corrupt")
106            || lowered.contains("database disk image is malformed")
107        {
108            return Err(crate::types::MemoryError::InvalidConfig(format!(
109                "malformed database integrity check: {}",
110                check
111            )));
112        }
113
114        tracing::warn!(
115            "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
116            check
117        );
118        Ok(())
119    }
120
121    /// Initialize database schema
122    async fn init_schema(&self) -> MemoryResult<()> {
123        let conn = self.conn.lock().await;
124
125        // Extension is already registered globally in new()
126
127        // Session memory chunks table
128        conn.execute(
129            "CREATE TABLE IF NOT EXISTS session_memory_chunks (
130                id TEXT PRIMARY KEY,
131                content TEXT NOT NULL,
132                session_id TEXT NOT NULL,
133                project_id TEXT,
134                source TEXT NOT NULL,
135                created_at TEXT NOT NULL,
136                token_count INTEGER NOT NULL DEFAULT 0,
137                metadata TEXT
138            )",
139            [],
140        )?;
141
142        // Session memory vectors (virtual table)
143        conn.execute(
144            &format!(
145                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
146                    chunk_id TEXT PRIMARY KEY,
147                    embedding float[{}]
148                )",
149                DEFAULT_EMBEDDING_DIMENSION
150            ),
151            [],
152        )?;
153
154        // Project memory chunks table
155        conn.execute(
156            "CREATE TABLE IF NOT EXISTS project_memory_chunks (
157                id TEXT PRIMARY KEY,
158                content TEXT NOT NULL,
159                project_id TEXT NOT NULL,
160                session_id TEXT,
161                source TEXT NOT NULL,
162                created_at TEXT NOT NULL,
163                token_count INTEGER NOT NULL DEFAULT 0,
164                metadata TEXT
165            )",
166            [],
167        )?;
168
169        // Migrations: file-derived columns on project_memory_chunks
170        // (SQLite doesn't support IF NOT EXISTS for columns, so we inspect table_info)
171        let existing_cols: HashSet<String> = {
172            let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
173            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
174            rows.collect::<Result<HashSet<_>, _>>()?
175        };
176
177        if !existing_cols.contains("source_path") {
178            conn.execute(
179                "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
180                [],
181            )?;
182        }
183        if !existing_cols.contains("source_mtime") {
184            conn.execute(
185                "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
186                [],
187            )?;
188        }
189        if !existing_cols.contains("source_size") {
190            conn.execute(
191                "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
192                [],
193            )?;
194        }
195        if !existing_cols.contains("source_hash") {
196            conn.execute(
197                "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
198                [],
199            )?;
200        }
201
202        // Project memory vectors (virtual table)
203        conn.execute(
204            &format!(
205                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
206                    chunk_id TEXT PRIMARY KEY,
207                    embedding float[{}]
208                )",
209                DEFAULT_EMBEDDING_DIMENSION
210            ),
211            [],
212        )?;
213
214        // File indexing tables (project-scoped)
215        conn.execute(
216            "CREATE TABLE IF NOT EXISTS project_file_index (
217                project_id TEXT NOT NULL,
218                path TEXT NOT NULL,
219                mtime INTEGER NOT NULL,
220                size INTEGER NOT NULL,
221                hash TEXT NOT NULL,
222                indexed_at TEXT NOT NULL,
223                PRIMARY KEY(project_id, path)
224            )",
225            [],
226        )?;
227
228        conn.execute(
229            "CREATE TABLE IF NOT EXISTS project_index_status (
230                project_id TEXT PRIMARY KEY,
231                last_indexed_at TEXT,
232                last_total_files INTEGER,
233                last_processed_files INTEGER,
234                last_indexed_files INTEGER,
235                last_skipped_files INTEGER,
236                last_errors INTEGER
237            )",
238            [],
239        )?;
240
241        // Global memory chunks table
242        conn.execute(
243            "CREATE TABLE IF NOT EXISTS global_memory_chunks (
244                id TEXT PRIMARY KEY,
245                content TEXT NOT NULL,
246                source TEXT NOT NULL,
247                created_at TEXT NOT NULL,
248                token_count INTEGER NOT NULL DEFAULT 0,
249                metadata TEXT
250            )",
251            [],
252        )?;
253
254        // Global memory vectors (virtual table)
255        conn.execute(
256            &format!(
257                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
258                    chunk_id TEXT PRIMARY KEY,
259                    embedding float[{}]
260                )",
261                DEFAULT_EMBEDDING_DIMENSION
262            ),
263            [],
264        )?;
265
266        // Memory configuration table
267        conn.execute(
268            "CREATE TABLE IF NOT EXISTS memory_config (
269                project_id TEXT PRIMARY KEY,
270                max_chunks INTEGER NOT NULL DEFAULT 10000,
271                chunk_size INTEGER NOT NULL DEFAULT 512,
272                retrieval_k INTEGER NOT NULL DEFAULT 5,
273                auto_cleanup INTEGER NOT NULL DEFAULT 1,
274                session_retention_days INTEGER NOT NULL DEFAULT 30,
275                token_budget INTEGER NOT NULL DEFAULT 5000,
276                chunk_overlap INTEGER NOT NULL DEFAULT 64,
277                updated_at TEXT NOT NULL
278            )",
279            [],
280        )?;
281
282        // Cleanup log table
283        conn.execute(
284            "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
285                id TEXT PRIMARY KEY,
286                cleanup_type TEXT NOT NULL,
287                tier TEXT NOT NULL,
288                project_id TEXT,
289                session_id TEXT,
290                chunks_deleted INTEGER NOT NULL DEFAULT 0,
291                bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
292                created_at TEXT NOT NULL
293            )",
294            [],
295        )?;
296
297        // Create indexes for better query performance
298        conn.execute(
299            "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
300            [],
301        )?;
302        conn.execute(
303            "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
304            [],
305        )?;
306        conn.execute(
307            "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
308            [],
309        )?;
310        conn.execute(
311            "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
312            [],
313        )?;
314        conn.execute(
315            "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
316            [],
317        )?;
318        conn.execute(
319            "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
320            [],
321        )?;
322
323        // Global user memory records (FTS-backed baseline retrieval path)
324        conn.execute(
325            "CREATE TABLE IF NOT EXISTS memory_records (
326                id TEXT PRIMARY KEY,
327                user_id TEXT NOT NULL,
328                source_type TEXT NOT NULL,
329                content TEXT NOT NULL,
330                content_hash TEXT NOT NULL,
331                run_id TEXT NOT NULL,
332                session_id TEXT,
333                message_id TEXT,
334                tool_name TEXT,
335                project_tag TEXT,
336                channel_tag TEXT,
337                host_tag TEXT,
338                metadata TEXT,
339                provenance TEXT,
340                redaction_status TEXT NOT NULL,
341                redaction_count INTEGER NOT NULL DEFAULT 0,
342                visibility TEXT NOT NULL DEFAULT 'private',
343                demoted INTEGER NOT NULL DEFAULT 0,
344                score_boost REAL NOT NULL DEFAULT 0.0,
345                created_at_ms INTEGER NOT NULL,
346                updated_at_ms INTEGER NOT NULL,
347                expires_at_ms INTEGER
348            )",
349            [],
350        )?;
351        conn.execute(
352            "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
353                ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
354            [],
355        )?;
356        conn.execute(
357            "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
358                ON memory_records(user_id, created_at_ms DESC)",
359            [],
360        )?;
361        conn.execute(
362            "CREATE INDEX IF NOT EXISTS idx_memory_records_run
363                ON memory_records(run_id)",
364            [],
365        )?;
366        conn.execute(
367            "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
368                id UNINDEXED,
369                user_id UNINDEXED,
370                content
371            )",
372            [],
373        )?;
374        conn.execute(
375            "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
376                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
377            END",
378            [],
379        )?;
380        conn.execute(
381            "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
382                DELETE FROM memory_records_fts WHERE id = old.id;
383            END",
384            [],
385        )?;
386        conn.execute(
387            "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
388                DELETE FROM memory_records_fts WHERE id = old.id;
389                INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
390            END",
391            [],
392        )?;
393
394        Ok(())
395    }
396
397    /// Validate that sqlite-vec tables are readable.
398    /// This catches legacy/corrupted vector blobs early so startup can recover.
399    pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
400        let conn = self.conn.lock().await;
401        let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
402
403        for table in [
404            "session_memory_vectors",
405            "project_memory_vectors",
406            "global_memory_vectors",
407        ] {
408            let sql = format!("SELECT COUNT(*) FROM {}", table);
409            let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
410
411            // COUNT(*) can pass even when vector chunk blobs are unreadable.
412            // Probe sqlite-vec MATCH execution to surface latent blob corruption.
413            if row_count > 0 {
414                let probe_sql = format!(
415                    "SELECT chunk_id, distance
416                     FROM {}
417                     WHERE embedding MATCH ?1 AND k = 1",
418                    table
419                );
420                let mut stmt = conn.prepare(&probe_sql)?;
421                let mut rows = stmt.query(params![probe_embedding.as_str()])?;
422                let _ = rows.next()?;
423            }
424        }
425        Ok(())
426    }
427
428    fn is_vector_table_error(err: &rusqlite::Error) -> bool {
429        let text = err.to_string().to_lowercase();
430        text.contains("vector blob")
431            || text.contains("chunks iter error")
432            || text.contains("chunks iter")
433            || text.contains("internal sqlite-vec error")
434            || text.contains("insert rowids id")
435            || text.contains("sql logic error")
436            || text.contains("database disk image is malformed")
437            || text.contains("session_memory_vectors")
438            || text.contains("project_memory_vectors")
439            || text.contains("global_memory_vectors")
440            || text.contains("vec0")
441    }
442
443    async fn recreate_vector_tables(&self) -> MemoryResult<()> {
444        let conn = self.conn.lock().await;
445
446        for base in [
447            "session_memory_vectors",
448            "project_memory_vectors",
449            "global_memory_vectors",
450        ] {
451            // Drop vec virtual table and common sqlite-vec shadow tables first.
452            for name in [
453                base.to_string(),
454                format!("{}_chunks", base),
455                format!("{}_info", base),
456                format!("{}_rowids", base),
457                format!("{}_vector_chunks00", base),
458            ] {
459                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
460                conn.execute(&sql, [])?;
461            }
462
463            // Drop any additional shadow tables (e.g. *_vector_chunks01).
464            let like_pattern = format!("{base}_%");
465            let mut stmt = conn.prepare(
466                "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
467            )?;
468            let table_names = stmt
469                .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
470                .collect::<Result<Vec<_>, _>>()?;
471            drop(stmt);
472            for name in table_names {
473                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
474                conn.execute(&sql, [])?;
475            }
476        }
477
478        conn.execute(
479            &format!(
480                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
481                    chunk_id TEXT PRIMARY KEY,
482                    embedding float[{}]
483                )",
484                DEFAULT_EMBEDDING_DIMENSION
485            ),
486            [],
487        )?;
488
489        conn.execute(
490            &format!(
491                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
492                    chunk_id TEXT PRIMARY KEY,
493                    embedding float[{}]
494                )",
495                DEFAULT_EMBEDDING_DIMENSION
496            ),
497            [],
498        )?;
499
500        conn.execute(
501            &format!(
502                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
503                    chunk_id TEXT PRIMARY KEY,
504                    embedding float[{}]
505                )",
506                DEFAULT_EMBEDDING_DIMENSION
507            ),
508            [],
509        )?;
510
511        Ok(())
512    }
513
514    /// Ensure vector tables are readable and recreate them if corruption is detected.
515    /// Returns true when a repair was performed.
516    pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
517        match self.validate_vector_tables().await {
518            Ok(()) => Ok(false),
519            Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
520                tracing::warn!(
521                    "Memory vector tables appear corrupted ({}). Recreating vector tables.",
522                    err
523                );
524                self.recreate_vector_tables().await?;
525                Ok(true)
526            }
527            Err(err) => Err(err),
528        }
529    }
530
531    /// Last-resort runtime repair for malformed DB states: drop user memory tables
532    /// and recreate the schema in-place so new writes can proceed.
533    /// This intentionally clears memory content for the active DB file.
534    pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
535        let table_names = {
536            let conn = self.conn.lock().await;
537            let mut stmt = conn.prepare(
538                "SELECT name FROM sqlite_master
539                 WHERE type='table'
540                   AND name NOT LIKE 'sqlite_%'
541                 ORDER BY name",
542            )?;
543            let names = stmt
544                .query_map([], |row| row.get::<_, String>(0))?
545                .collect::<Result<Vec<_>, _>>()?;
546            names
547        };
548
549        {
550            let conn = self.conn.lock().await;
551            for table in table_names {
552                let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
553                let _ = conn.execute(&sql, []);
554            }
555        }
556
557        self.init_schema().await
558    }
559
560    /// Attempt an immediate vector-table repair when a concrete DB error indicates
561    /// sqlite-vec internals are failing at statement/rowid level.
562    pub async fn try_repair_after_error(
563        &self,
564        err: &crate::types::MemoryError,
565    ) -> MemoryResult<bool> {
566        match err {
567            crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
568                tracing::warn!(
569                    "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
570                    db_err
571                );
572                self.recreate_vector_tables().await?;
573                Ok(true)
574            }
575            _ => Ok(false),
576        }
577    }
578
579    /// Store a chunk with its embedding
580    pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
581        let conn = self.conn.lock().await;
582
583        let (chunks_table, vectors_table) = match chunk.tier {
584            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
585            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
586            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
587        };
588
589        let created_at_str = chunk.created_at.to_rfc3339();
590        let metadata_str = chunk
591            .metadata
592            .as_ref()
593            .map(|m| m.to_string())
594            .unwrap_or_default();
595
596        // Insert chunk
597        match chunk.tier {
598            MemoryTier::Session => {
599                conn.execute(
600                    &format!(
601                        "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata) 
602                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
603                        chunks_table
604                    ),
605                    params![
606                        chunk.id,
607                        chunk.content,
608                        chunk.session_id.as_ref().unwrap_or(&String::new()),
609                        chunk.project_id,
610                        chunk.source,
611                        created_at_str,
612                        chunk.token_count,
613                        metadata_str
614                    ],
615                )?;
616            }
617            MemoryTier::Project => {
618                conn.execute(
619                    &format!(
620                        "INSERT INTO {} (
621                            id, content, project_id, session_id, source, created_at, token_count, metadata,
622                            source_path, source_mtime, source_size, source_hash
623                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
624                        chunks_table
625                    ),
626                    params![
627                        chunk.id,
628                        chunk.content,
629                        chunk.project_id.as_ref().unwrap_or(&String::new()),
630                        chunk.session_id,
631                        chunk.source,
632                        created_at_str,
633                        chunk.token_count,
634                        metadata_str,
635                        chunk.source_path.clone(),
636                        chunk.source_mtime,
637                        chunk.source_size,
638                        chunk.source_hash.clone()
639                    ],
640                )?;
641            }
642            MemoryTier::Global => {
643                conn.execute(
644                    &format!(
645                        "INSERT INTO {} (id, content, source, created_at, token_count, metadata) 
646                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
647                        chunks_table
648                    ),
649                    params![
650                        chunk.id,
651                        chunk.content,
652                        chunk.source,
653                        created_at_str,
654                        chunk.token_count,
655                        metadata_str
656                    ],
657                )?;
658            }
659        }
660
661        // Insert embedding
662        let embedding_json = format!(
663            "[{}]",
664            embedding
665                .iter()
666                .map(|f| f.to_string())
667                .collect::<Vec<_>>()
668                .join(",")
669        );
670        conn.execute(
671            &format!(
672                "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
673                vectors_table
674            ),
675            params![chunk.id, embedding_json],
676        )?;
677
678        Ok(())
679    }
680
681    /// Search for similar chunks
682    pub async fn search_similar(
683        &self,
684        query_embedding: &[f32],
685        tier: MemoryTier,
686        project_id: Option<&str>,
687        session_id: Option<&str>,
688        limit: i64,
689    ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
690        let conn = self.conn.lock().await;
691
692        let (chunks_table, vectors_table) = match tier {
693            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
694            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
695            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
696        };
697
698        let embedding_json = format!(
699            "[{}]",
700            query_embedding
701                .iter()
702                .map(|f| f.to_string())
703                .collect::<Vec<_>>()
704                .join(",")
705        );
706
707        // Build query based on tier and filters
708        let results = match tier {
709            MemoryTier::Session => {
710                if let Some(sid) = session_id {
711                    let sql = format!(
712                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
713                                v.distance
714                         FROM {} AS v
715                         JOIN {} AS c ON v.chunk_id = c.id
716                         WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
717                         ORDER BY v.distance",
718                        vectors_table, chunks_table
719                    );
720                    let mut stmt = conn.prepare(&sql)?;
721                    let results = stmt
722                        .query_map(params![sid, embedding_json, limit], |row| {
723                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
724                        })?
725                        .collect::<Result<Vec<_>, _>>()?;
726                    results
727                } else if let Some(pid) = project_id {
728                    let sql = format!(
729                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
730                                v.distance
731                         FROM {} AS v
732                         JOIN {} AS c ON v.chunk_id = c.id
733                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
734                         ORDER BY v.distance",
735                        vectors_table, chunks_table
736                    );
737                    let mut stmt = conn.prepare(&sql)?;
738                    let results = stmt
739                        .query_map(params![pid, embedding_json, limit], |row| {
740                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
741                        })?
742                        .collect::<Result<Vec<_>, _>>()?;
743                    results
744                } else {
745                    let sql = format!(
746                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
747                                v.distance
748                         FROM {} AS v
749                         JOIN {} AS c ON v.chunk_id = c.id
750                         WHERE v.embedding MATCH ?1 AND k = ?2
751                         ORDER BY v.distance",
752                        vectors_table, chunks_table
753                    );
754                    let mut stmt = conn.prepare(&sql)?;
755                    let results = stmt
756                        .query_map(params![embedding_json, limit], |row| {
757                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
758                        })?
759                        .collect::<Result<Vec<_>, _>>()?;
760                    results
761                }
762            }
763            MemoryTier::Project => {
764                if let Some(pid) = project_id {
765                    let sql = format!(
766                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
767                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
768                                v.distance
769                         FROM {} AS v
770                         JOIN {} AS c ON v.chunk_id = c.id
771                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
772                         ORDER BY v.distance",
773                        vectors_table, chunks_table
774                    );
775                    let mut stmt = conn.prepare(&sql)?;
776                    let results = stmt
777                        .query_map(params![pid, embedding_json, limit], |row| {
778                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
779                        })?
780                        .collect::<Result<Vec<_>, _>>()?;
781                    results
782                } else {
783                    let sql = format!(
784                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
785                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
786                                v.distance
787                         FROM {} AS v
788                         JOIN {} AS c ON v.chunk_id = c.id
789                         WHERE v.embedding MATCH ?1 AND k = ?2
790                         ORDER BY v.distance",
791                        vectors_table, chunks_table
792                    );
793                    let mut stmt = conn.prepare(&sql)?;
794                    let results = stmt
795                        .query_map(params![embedding_json, limit], |row| {
796                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
797                        })?
798                        .collect::<Result<Vec<_>, _>>()?;
799                    results
800                }
801            }
802            MemoryTier::Global => {
803                let sql = format!(
804                    "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
805                            v.distance
806                     FROM {} AS v
807                     JOIN {} AS c ON v.chunk_id = c.id
808                     WHERE v.embedding MATCH ?1 AND k = ?2
809                     ORDER BY v.distance",
810                    vectors_table, chunks_table
811                );
812                let mut stmt = conn.prepare(&sql)?;
813                let results = stmt
814                    .query_map(params![embedding_json, limit], |row| {
815                        Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
816                    })?
817                    .collect::<Result<Vec<_>, _>>()?;
818                results
819            }
820        };
821
822        Ok(results)
823    }
824
825    /// Get chunks by session ID
826    pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
827        let conn = self.conn.lock().await;
828
829        let mut stmt = conn.prepare(
830            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
831             FROM session_memory_chunks
832             WHERE session_id = ?1
833             ORDER BY created_at DESC",
834        )?;
835
836        let chunks = stmt
837            .query_map(params![session_id], |row| {
838                row_to_chunk(row, MemoryTier::Session)
839            })?
840            .collect::<Result<Vec<_>, _>>()?;
841
842        Ok(chunks)
843    }
844
845    /// Get chunks by project ID
846    pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
847        let conn = self.conn.lock().await;
848
849        let mut stmt = conn.prepare(
850            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
851                    source_path, source_mtime, source_size, source_hash
852             FROM project_memory_chunks
853             WHERE project_id = ?1
854             ORDER BY created_at DESC",
855        )?;
856
857        let chunks = stmt
858            .query_map(params![project_id], |row| {
859                row_to_chunk(row, MemoryTier::Project)
860            })?
861            .collect::<Result<Vec<_>, _>>()?;
862
863        Ok(chunks)
864    }
865
866    /// Get global chunks
867    pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
868        let conn = self.conn.lock().await;
869
870        let mut stmt = conn.prepare(
871            "SELECT id, content, source, created_at, token_count, metadata
872             FROM global_memory_chunks
873             ORDER BY created_at DESC
874             LIMIT ?1",
875        )?;
876
877        let chunks = stmt
878            .query_map(params![limit], |row| {
879                let id: String = row.get(0)?;
880                let content: String = row.get(1)?;
881                let source: String = row.get(2)?;
882                let created_at_str: String = row.get(3)?;
883                let token_count: i64 = row.get(4)?;
884                let metadata_str: Option<String> = row.get(5)?;
885
886                let created_at = DateTime::parse_from_rfc3339(&created_at_str)
887                    .map_err(|e| {
888                        rusqlite::Error::FromSqlConversionFailure(
889                            3,
890                            rusqlite::types::Type::Text,
891                            Box::new(e),
892                        )
893                    })?
894                    .with_timezone(&Utc);
895
896                let metadata = metadata_str
897                    .filter(|s| !s.is_empty())
898                    .and_then(|s| serde_json::from_str(&s).ok());
899
900                Ok(MemoryChunk {
901                    id,
902                    content,
903                    tier: MemoryTier::Global,
904                    session_id: None,
905                    project_id: None,
906                    source,
907                    source_path: None,
908                    source_mtime: None,
909                    source_size: None,
910                    source_hash: None,
911                    created_at,
912                    token_count,
913                    metadata,
914                })
915            })?
916            .collect::<Result<Vec<_>, _>>()?;
917
918        Ok(chunks)
919    }
920
921    /// Clear session memory
922    pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
923        let conn = self.conn.lock().await;
924
925        // Get count before deletion
926        let count: i64 = conn.query_row(
927            "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
928            params![session_id],
929            |row| row.get(0),
930        )?;
931
932        // Delete vectors first (foreign key constraint)
933        conn.execute(
934            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
935             (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
936            params![session_id],
937        )?;
938
939        // Delete chunks
940        conn.execute(
941            "DELETE FROM session_memory_chunks WHERE session_id = ?1",
942            params![session_id],
943        )?;
944
945        Ok(count as u64)
946    }
947
948    /// Clear project memory
949    pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
950        let conn = self.conn.lock().await;
951
952        // Get count before deletion
953        let count: i64 = conn.query_row(
954            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
955            params![project_id],
956            |row| row.get(0),
957        )?;
958
959        // Delete vectors first
960        conn.execute(
961            "DELETE FROM project_memory_vectors WHERE chunk_id IN 
962             (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
963            params![project_id],
964        )?;
965
966        // Delete chunks
967        conn.execute(
968            "DELETE FROM project_memory_chunks WHERE project_id = ?1",
969            params![project_id],
970        )?;
971
972        Ok(count as u64)
973    }
974
975    /// Clear old session memory based on retention policy
976    pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
977        let conn = self.conn.lock().await;
978
979        let cutoff = Utc::now() - chrono::Duration::days(retention_days);
980        let cutoff_str = cutoff.to_rfc3339();
981
982        // Get count before deletion
983        let count: i64 = conn.query_row(
984            "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
985            params![cutoff_str],
986            |row| row.get(0),
987        )?;
988
989        // Delete vectors first
990        conn.execute(
991            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
992             (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
993            params![cutoff_str],
994        )?;
995
996        // Delete chunks
997        conn.execute(
998            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
999            params![cutoff_str],
1000        )?;
1001
1002        Ok(count as u64)
1003    }
1004
1005    /// Get or create memory config for a project
1006    pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1007        let conn = self.conn.lock().await;
1008
1009        let result: Option<MemoryConfig> = conn
1010            .query_row(
1011                "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1012                        session_retention_days, token_budget, chunk_overlap
1013                 FROM memory_config WHERE project_id = ?1",
1014                params![project_id],
1015                |row| {
1016                    Ok(MemoryConfig {
1017                        max_chunks: row.get(0)?,
1018                        chunk_size: row.get(1)?,
1019                        retrieval_k: row.get(2)?,
1020                        auto_cleanup: row.get::<_, i64>(3)? != 0,
1021                        session_retention_days: row.get(4)?,
1022                        token_budget: row.get(5)?,
1023                        chunk_overlap: row.get(6)?,
1024                    })
1025                },
1026            )
1027            .optional()?;
1028
1029        match result {
1030            Some(config) => Ok(config),
1031            None => {
1032                // Create default config
1033                let config = MemoryConfig::default();
1034                let updated_at = Utc::now().to_rfc3339();
1035
1036                conn.execute(
1037                    "INSERT INTO memory_config 
1038                     (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1039                      session_retention_days, token_budget, chunk_overlap, updated_at)
1040                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1041                    params![
1042                        project_id,
1043                        config.max_chunks,
1044                        config.chunk_size,
1045                        config.retrieval_k,
1046                        config.auto_cleanup as i64,
1047                        config.session_retention_days,
1048                        config.token_budget,
1049                        config.chunk_overlap,
1050                        updated_at
1051                    ],
1052                )?;
1053
1054                Ok(config)
1055            }
1056        }
1057    }
1058
1059    /// Update memory config for a project
1060    pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1061        let conn = self.conn.lock().await;
1062
1063        let updated_at = Utc::now().to_rfc3339();
1064
1065        conn.execute(
1066            "INSERT OR REPLACE INTO memory_config 
1067             (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
1068              session_retention_days, token_budget, chunk_overlap, updated_at)
1069             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1070            params![
1071                project_id,
1072                config.max_chunks,
1073                config.chunk_size,
1074                config.retrieval_k,
1075                config.auto_cleanup as i64,
1076                config.session_retention_days,
1077                config.token_budget,
1078                config.chunk_overlap,
1079                updated_at
1080            ],
1081        )?;
1082
1083        Ok(())
1084    }
1085
1086    /// Get memory statistics
1087    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1088        let conn = self.conn.lock().await;
1089
1090        // Count chunks
1091        let session_chunks: i64 =
1092            conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1093                row.get(0)
1094            })?;
1095
1096        let project_chunks: i64 =
1097            conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1098                row.get(0)
1099            })?;
1100
1101        let global_chunks: i64 =
1102            conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1103                row.get(0)
1104            })?;
1105
1106        // Calculate sizes
1107        let session_bytes: i64 = conn.query_row(
1108            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1109            [],
1110            |row| row.get(0),
1111        )?;
1112
1113        let project_bytes: i64 = conn.query_row(
1114            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1115            [],
1116            |row| row.get(0),
1117        )?;
1118
1119        let global_bytes: i64 = conn.query_row(
1120            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1121            [],
1122            |row| row.get(0),
1123        )?;
1124
1125        // Get last cleanup
1126        let last_cleanup: Option<String> = conn
1127            .query_row(
1128                "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1129                [],
1130                |row| row.get(0),
1131            )
1132            .optional()?;
1133
1134        let last_cleanup = last_cleanup.and_then(|s| {
1135            DateTime::parse_from_rfc3339(&s)
1136                .ok()
1137                .map(|dt| dt.with_timezone(&Utc))
1138        });
1139
1140        // Get file size
1141        let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1142
1143        Ok(MemoryStats {
1144            total_chunks: session_chunks + project_chunks + global_chunks,
1145            session_chunks,
1146            project_chunks,
1147            global_chunks,
1148            total_bytes: session_bytes + project_bytes + global_bytes,
1149            session_bytes,
1150            project_bytes,
1151            global_bytes,
1152            file_size,
1153            last_cleanup,
1154        })
1155    }
1156
1157    /// Log cleanup operation
1158    pub async fn log_cleanup(
1159        &self,
1160        cleanup_type: &str,
1161        tier: MemoryTier,
1162        project_id: Option<&str>,
1163        session_id: Option<&str>,
1164        chunks_deleted: i64,
1165        bytes_reclaimed: i64,
1166    ) -> MemoryResult<()> {
1167        let conn = self.conn.lock().await;
1168
1169        let id = uuid::Uuid::new_v4().to_string();
1170        let created_at = Utc::now().to_rfc3339();
1171
1172        conn.execute(
1173            "INSERT INTO memory_cleanup_log 
1174             (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1175             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1176            params![
1177                id,
1178                cleanup_type,
1179                tier.to_string(),
1180                project_id,
1181                session_id,
1182                chunks_deleted,
1183                bytes_reclaimed,
1184                created_at
1185            ],
1186        )?;
1187
1188        Ok(())
1189    }
1190
1191    /// Vacuum the database to reclaim space
1192    pub async fn vacuum(&self) -> MemoryResult<()> {
1193        let conn = self.conn.lock().await;
1194        conn.execute("VACUUM", [])?;
1195        Ok(())
1196    }
1197
1198    // ---------------------------------------------------------------------
1199    // Project file indexing helpers
1200    // ---------------------------------------------------------------------
1201
1202    pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1203        let conn = self.conn.lock().await;
1204        let n: i64 = conn.query_row(
1205            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1206            params![project_id],
1207            |row| row.get(0),
1208        )?;
1209        Ok(n)
1210    }
1211
1212    pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1213        let conn = self.conn.lock().await;
1214        let exists: Option<i64> = conn
1215            .query_row(
1216                "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1217                params![project_id],
1218                |row| row.get(0),
1219            )
1220            .optional()?;
1221        Ok(exists.is_some())
1222    }
1223
1224    pub async fn get_file_index_entry(
1225        &self,
1226        project_id: &str,
1227        path: &str,
1228    ) -> MemoryResult<Option<(i64, i64, String)>> {
1229        let conn = self.conn.lock().await;
1230        let row: Option<(i64, i64, String)> = conn
1231            .query_row(
1232                "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1233                params![project_id, path],
1234                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1235            )
1236            .optional()?;
1237        Ok(row)
1238    }
1239
1240    pub async fn upsert_file_index_entry(
1241        &self,
1242        project_id: &str,
1243        path: &str,
1244        mtime: i64,
1245        size: i64,
1246        hash: &str,
1247    ) -> MemoryResult<()> {
1248        let conn = self.conn.lock().await;
1249        let indexed_at = Utc::now().to_rfc3339();
1250        conn.execute(
1251            "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1252             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1253             ON CONFLICT(project_id, path) DO UPDATE SET
1254                mtime = excluded.mtime,
1255                size = excluded.size,
1256                hash = excluded.hash,
1257                indexed_at = excluded.indexed_at",
1258            params![project_id, path, mtime, size, hash, indexed_at],
1259        )?;
1260        Ok(())
1261    }
1262
1263    pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1264        let conn = self.conn.lock().await;
1265        conn.execute(
1266            "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1267            params![project_id, path],
1268        )?;
1269        Ok(())
1270    }
1271
1272    pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1273        let conn = self.conn.lock().await;
1274        let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1275        let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1276        Ok(rows.collect::<Result<Vec<_>, _>>()?)
1277    }
1278
1279    pub async fn delete_project_file_chunks_by_path(
1280        &self,
1281        project_id: &str,
1282        source_path: &str,
1283    ) -> MemoryResult<(i64, i64)> {
1284        let conn = self.conn.lock().await;
1285
1286        let chunks_deleted: i64 = conn.query_row(
1287            "SELECT COUNT(*) FROM project_memory_chunks
1288             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1289            params![project_id, source_path],
1290            |row| row.get(0),
1291        )?;
1292
1293        let bytes_estimated: i64 = conn.query_row(
1294            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1295             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1296            params![project_id, source_path],
1297            |row| row.get(0),
1298        )?;
1299
1300        // Delete vectors first (keep order consistent with other clears)
1301        conn.execute(
1302            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1303             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1304            params![project_id, source_path],
1305        )?;
1306
1307        conn.execute(
1308            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1309            params![project_id, source_path],
1310        )?;
1311
1312        Ok((chunks_deleted, bytes_estimated))
1313    }
1314
1315    pub async fn upsert_project_index_status(
1316        &self,
1317        project_id: &str,
1318        total_files: i64,
1319        processed_files: i64,
1320        indexed_files: i64,
1321        skipped_files: i64,
1322        errors: i64,
1323    ) -> MemoryResult<()> {
1324        let conn = self.conn.lock().await;
1325        let last_indexed_at = Utc::now().to_rfc3339();
1326        conn.execute(
1327            "INSERT INTO project_index_status (
1328                project_id, last_indexed_at, last_total_files, last_processed_files,
1329                last_indexed_files, last_skipped_files, last_errors
1330             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1331             ON CONFLICT(project_id) DO UPDATE SET
1332                last_indexed_at = excluded.last_indexed_at,
1333                last_total_files = excluded.last_total_files,
1334                last_processed_files = excluded.last_processed_files,
1335                last_indexed_files = excluded.last_indexed_files,
1336                last_skipped_files = excluded.last_skipped_files,
1337                last_errors = excluded.last_errors",
1338            params![
1339                project_id,
1340                last_indexed_at,
1341                total_files,
1342                processed_files,
1343                indexed_files,
1344                skipped_files,
1345                errors
1346            ],
1347        )?;
1348        Ok(())
1349    }
1350
1351    pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1352        let conn = self.conn.lock().await;
1353
1354        let project_chunks: i64 = conn.query_row(
1355            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1356            params![project_id],
1357            |row| row.get(0),
1358        )?;
1359
1360        let project_bytes: i64 = conn.query_row(
1361            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1362            params![project_id],
1363            |row| row.get(0),
1364        )?;
1365
1366        let file_index_chunks: i64 = conn.query_row(
1367            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1368            params![project_id],
1369            |row| row.get(0),
1370        )?;
1371
1372        let file_index_bytes: i64 = conn.query_row(
1373            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1374            params![project_id],
1375            |row| row.get(0),
1376        )?;
1377
1378        let indexed_files: i64 = conn.query_row(
1379            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1380            params![project_id],
1381            |row| row.get(0),
1382        )?;
1383
1384        let status_row: Option<ProjectIndexStatusRow> =
1385            conn
1386                .query_row(
1387                    "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1388                     FROM project_index_status WHERE project_id = ?1",
1389                    params![project_id],
1390                    |row| {
1391                        Ok((
1392                            row.get(0)?,
1393                            row.get(1)?,
1394                            row.get(2)?,
1395                            row.get(3)?,
1396                            row.get(4)?,
1397                            row.get(5)?,
1398                        ))
1399                    },
1400                )
1401                .optional()?;
1402
1403        let (
1404            last_indexed_at,
1405            last_total_files,
1406            last_processed_files,
1407            last_indexed_files,
1408            last_skipped_files,
1409            last_errors,
1410        ) = status_row.unwrap_or((None, None, None, None, None, None));
1411
1412        let last_indexed_at = last_indexed_at.and_then(|s| {
1413            DateTime::parse_from_rfc3339(&s)
1414                .ok()
1415                .map(|dt| dt.with_timezone(&Utc))
1416        });
1417
1418        Ok(ProjectMemoryStats {
1419            project_id: project_id.to_string(),
1420            project_chunks,
1421            project_bytes,
1422            file_index_chunks,
1423            file_index_bytes,
1424            indexed_files,
1425            last_indexed_at,
1426            last_total_files,
1427            last_processed_files,
1428            last_indexed_files,
1429            last_skipped_files,
1430            last_errors,
1431        })
1432    }
1433
1434    pub async fn clear_project_file_index(
1435        &self,
1436        project_id: &str,
1437        vacuum: bool,
1438    ) -> MemoryResult<ClearFileIndexResult> {
1439        let conn = self.conn.lock().await;
1440
1441        let chunks_deleted: i64 = conn.query_row(
1442            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1443            params![project_id],
1444            |row| row.get(0),
1445        )?;
1446
1447        let bytes_estimated: i64 = conn.query_row(
1448            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1449            params![project_id],
1450            |row| row.get(0),
1451        )?;
1452
1453        // Delete vectors first
1454        conn.execute(
1455            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1456             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1457            params![project_id],
1458        )?;
1459
1460        // Delete file chunks
1461        conn.execute(
1462            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1463            params![project_id],
1464        )?;
1465
1466        // Clear file index tracking + status
1467        conn.execute(
1468            "DELETE FROM project_file_index WHERE project_id = ?1",
1469            params![project_id],
1470        )?;
1471        conn.execute(
1472            "DELETE FROM project_index_status WHERE project_id = ?1",
1473            params![project_id],
1474        )?;
1475
1476        drop(conn); // release lock before VACUUM (which needs exclusive access)
1477
1478        if vacuum {
1479            self.vacuum().await?;
1480        }
1481
1482        Ok(ClearFileIndexResult {
1483            chunks_deleted,
1484            bytes_estimated,
1485            did_vacuum: vacuum,
1486        })
1487    }
1488
1489    // ------------------------------------------------------------------
1490    // Memory hygiene
1491    // ------------------------------------------------------------------
1492
1493    /// Delete session memory chunks older than `retention_days` days.
1494    ///
1495    /// Also removes orphaned vector entries for the deleted chunks so the
1496    /// sqlite-vec virtual table stays consistent.
1497    ///
1498    /// Returns the number of chunk rows deleted.
1499    /// If `retention_days` is 0 hygiene is disabled and this returns Ok(0).
1500    pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1501        if retention_days == 0 {
1502            return Ok(0);
1503        }
1504
1505        let conn = self.conn.lock().await;
1506
1507        // WAL is already active (set in new()) — no need to set it again here.
1508        let cutoff =
1509            (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1510
1511        // Remove orphaned vector entries first (chunk_id FK would dangle otherwise)
1512        conn.execute(
1513            "DELETE FROM session_memory_vectors
1514             WHERE chunk_id IN (
1515                 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1516             )",
1517            params![cutoff],
1518        )?;
1519
1520        let deleted = conn.execute(
1521            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1522            params![cutoff],
1523        )?;
1524
1525        if deleted > 0 {
1526            tracing::info!(
1527                retention_days,
1528                deleted,
1529                "memory hygiene: pruned old session chunks"
1530            );
1531        }
1532
1533        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1534        Ok(deleted as u64)
1535    }
1536
1537    /// Run scheduled hygiene: read `session_retention_days` from `memory_config`
1538    /// (falling back to `env_override` if provided) and prune stale session chunks.
1539    ///
1540    /// Returns `Ok(chunks_deleted)`. This method is intentionally best-effort —
1541    /// callers should log errors and continue.
1542    pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
1543        // Prefer the env override, fall back to the DB config for the null project.
1544        let retention_days = if env_override_days > 0 {
1545            env_override_days
1546        } else {
1547            // Try to read the global (project_id = '__global__') config if present.
1548            let conn = self.conn.lock().await;
1549            let days: Option<i64> = conn
1550                .query_row(
1551                    "SELECT session_retention_days FROM memory_config
1552                     WHERE project_id = '__global__' LIMIT 1",
1553                    [],
1554                    |row| row.get(0),
1555                )
1556                .ok();
1557            drop(conn);
1558            days.unwrap_or(30) as u32
1559        };
1560
1561        self.prune_old_session_chunks(retention_days).await
1562    }
1563
1564    pub async fn put_global_memory_record(
1565        &self,
1566        record: &GlobalMemoryRecord,
1567    ) -> MemoryResult<GlobalMemoryWriteResult> {
1568        let conn = self.conn.lock().await;
1569
1570        let existing: Option<String> = conn
1571            .query_row(
1572                "SELECT id FROM memory_records
1573                 WHERE user_id = ?1
1574                   AND source_type = ?2
1575                   AND content_hash = ?3
1576                   AND run_id = ?4
1577                   AND IFNULL(session_id, '') = IFNULL(?5, '')
1578                   AND IFNULL(message_id, '') = IFNULL(?6, '')
1579                   AND IFNULL(tool_name, '') = IFNULL(?7, '')
1580                 LIMIT 1",
1581                params![
1582                    record.user_id,
1583                    record.source_type,
1584                    record.content_hash,
1585                    record.run_id,
1586                    record.session_id,
1587                    record.message_id,
1588                    record.tool_name
1589                ],
1590                |row| row.get(0),
1591            )
1592            .optional()?;
1593
1594        if let Some(id) = existing {
1595            return Ok(GlobalMemoryWriteResult {
1596                id,
1597                stored: false,
1598                deduped: true,
1599            });
1600        }
1601
1602        let metadata = record
1603            .metadata
1604            .as_ref()
1605            .map(ToString::to_string)
1606            .unwrap_or_default();
1607        let provenance = record
1608            .provenance
1609            .as_ref()
1610            .map(ToString::to_string)
1611            .unwrap_or_default();
1612        conn.execute(
1613            "INSERT INTO memory_records(
1614                id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
1615                project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
1616                visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
1617            ) VALUES (
1618                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
1619                ?10, ?11, ?12, ?13, ?14, ?15, ?16,
1620                ?17, ?18, ?19, ?20, ?21, ?22
1621            )",
1622            params![
1623                record.id,
1624                record.user_id,
1625                record.source_type,
1626                record.content,
1627                record.content_hash,
1628                record.run_id,
1629                record.session_id,
1630                record.message_id,
1631                record.tool_name,
1632                record.project_tag,
1633                record.channel_tag,
1634                record.host_tag,
1635                metadata,
1636                provenance,
1637                record.redaction_status,
1638                i64::from(record.redaction_count),
1639                record.visibility,
1640                if record.demoted { 1i64 } else { 0i64 },
1641                record.score_boost,
1642                record.created_at_ms as i64,
1643                record.updated_at_ms as i64,
1644                record.expires_at_ms.map(|v| v as i64),
1645            ],
1646        )?;
1647
1648        Ok(GlobalMemoryWriteResult {
1649            id: record.id.clone(),
1650            stored: true,
1651            deduped: false,
1652        })
1653    }
1654
1655    #[allow(clippy::too_many_arguments)]
1656    pub async fn search_global_memory(
1657        &self,
1658        user_id: &str,
1659        query: &str,
1660        limit: i64,
1661        project_tag: Option<&str>,
1662        channel_tag: Option<&str>,
1663        host_tag: Option<&str>,
1664    ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
1665        let conn = self.conn.lock().await;
1666        let now_ms = chrono::Utc::now().timestamp_millis();
1667        let mut hits = Vec::new();
1668
1669        let fts_query = build_fts_query(query);
1670        let search_limit = limit.clamp(1, 100);
1671        let maybe_rows = conn.prepare(
1672            "SELECT
1673                m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
1674                m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
1675                m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
1676                m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
1677                bm25(memory_records_fts) AS rank
1678             FROM memory_records_fts
1679             JOIN memory_records m ON m.id = memory_records_fts.id
1680             WHERE memory_records_fts MATCH ?1
1681               AND m.user_id = ?2
1682               AND m.demoted = 0
1683               AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
1684               AND (?4 IS NULL OR m.project_tag = ?4)
1685               AND (?5 IS NULL OR m.channel_tag = ?5)
1686               AND (?6 IS NULL OR m.host_tag = ?6)
1687             ORDER BY rank ASC
1688             LIMIT ?7"
1689        );
1690
1691        if let Ok(mut stmt) = maybe_rows {
1692            let rows = stmt.query_map(
1693                params![
1694                    fts_query,
1695                    user_id,
1696                    now_ms,
1697                    project_tag,
1698                    channel_tag,
1699                    host_tag,
1700                    search_limit
1701                ],
1702                |row| {
1703                    let record = row_to_global_record(row)?;
1704                    let rank = row.get::<_, f64>(22)?;
1705                    let score = 1.0 / (1.0 + rank.max(0.0));
1706                    Ok(GlobalMemorySearchHit { record, score })
1707                },
1708            )?;
1709            for row in rows {
1710                hits.push(row?);
1711            }
1712        }
1713
1714        if !hits.is_empty() {
1715            return Ok(hits);
1716        }
1717
1718        let like = format!("%{}%", query.trim());
1719        let mut stmt = conn.prepare(
1720            "SELECT
1721                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1722                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1723                redaction_status, redaction_count, visibility, demoted, score_boost,
1724                created_at_ms, updated_at_ms, expires_at_ms
1725             FROM memory_records
1726             WHERE user_id = ?1
1727               AND demoted = 0
1728               AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
1729               AND (?3 IS NULL OR project_tag = ?3)
1730               AND (?4 IS NULL OR channel_tag = ?4)
1731               AND (?5 IS NULL OR host_tag = ?5)
1732               AND (?6 = '' OR content LIKE ?7)
1733             ORDER BY created_at_ms DESC
1734             LIMIT ?8",
1735        )?;
1736        let rows = stmt.query_map(
1737            params![
1738                user_id,
1739                now_ms,
1740                project_tag,
1741                channel_tag,
1742                host_tag,
1743                query.trim(),
1744                like,
1745                search_limit
1746            ],
1747            |row| {
1748                let record = row_to_global_record(row)?;
1749                Ok(GlobalMemorySearchHit {
1750                    record,
1751                    score: 0.25,
1752                })
1753            },
1754        )?;
1755        for row in rows {
1756            hits.push(row?);
1757        }
1758
1759        Ok(hits)
1760    }
1761
1762    pub async fn list_global_memory(
1763        &self,
1764        user_id: &str,
1765        q: Option<&str>,
1766        limit: i64,
1767        offset: i64,
1768    ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
1769        let conn = self.conn.lock().await;
1770        let query = q.unwrap_or("").trim();
1771        let like = format!("%{}%", query);
1772        let mut stmt = conn.prepare(
1773            "SELECT
1774                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1775                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1776                redaction_status, redaction_count, visibility, demoted, score_boost,
1777                created_at_ms, updated_at_ms, expires_at_ms
1778             FROM memory_records
1779             WHERE user_id = ?1
1780               AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
1781             ORDER BY created_at_ms DESC
1782             LIMIT ?4 OFFSET ?5",
1783        )?;
1784        let rows = stmt.query_map(
1785            params![user_id, query, like, limit.clamp(1, 1000), offset.max(0)],
1786            row_to_global_record,
1787        )?;
1788        let mut out = Vec::new();
1789        for row in rows {
1790            out.push(row?);
1791        }
1792        Ok(out)
1793    }
1794
1795    pub async fn set_global_memory_visibility(
1796        &self,
1797        id: &str,
1798        visibility: &str,
1799        demoted: bool,
1800    ) -> MemoryResult<bool> {
1801        let conn = self.conn.lock().await;
1802        let now_ms = chrono::Utc::now().timestamp_millis();
1803        let changed = conn.execute(
1804            "UPDATE memory_records
1805             SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
1806             WHERE id = ?1",
1807            params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
1808        )?;
1809        Ok(changed > 0)
1810    }
1811
1812    pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
1813        let conn = self.conn.lock().await;
1814        let mut stmt = conn.prepare(
1815            "SELECT
1816                id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1817                tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1818                redaction_status, redaction_count, visibility, demoted, score_boost,
1819                created_at_ms, updated_at_ms, expires_at_ms
1820             FROM memory_records
1821             WHERE id = ?1
1822             LIMIT 1",
1823        )?;
1824        let record = stmt
1825            .query_row(params![id], row_to_global_record)
1826            .optional()?;
1827        Ok(record)
1828    }
1829
1830    pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
1831        let conn = self.conn.lock().await;
1832        let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
1833        Ok(changed > 0)
1834    }
1835}
1836
1837/// Convert a database row to a MemoryChunk
1838fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1839    let id: String = row.get(0)?;
1840    let content: String = row.get(1)?;
1841
1842    let session_id: Option<String> = match tier {
1843        MemoryTier::Session => Some(row.get(2)?),
1844        MemoryTier::Project => row.get(2)?,
1845        MemoryTier::Global => None,
1846    };
1847
1848    let project_id: Option<String> = match tier {
1849        MemoryTier::Session => row.get(3)?,
1850        MemoryTier::Project => Some(row.get(3)?),
1851        MemoryTier::Global => None,
1852    };
1853
1854    let source: String = row.get(4)?;
1855    let created_at_str: String = row.get(5)?;
1856    let token_count: i64 = row.get(6)?;
1857    let metadata_str: Option<String> = row.get(7)?;
1858
1859    let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1860        .map_err(|e| {
1861            rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1862        })?
1863        .with_timezone(&Utc);
1864
1865    let metadata = metadata_str
1866        .filter(|s| !s.is_empty())
1867        .and_then(|s| serde_json::from_str(&s).ok());
1868
1869    let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1870    let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1871    let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1872    let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1873
1874    Ok(MemoryChunk {
1875        id,
1876        content,
1877        tier,
1878        session_id,
1879        project_id,
1880        source,
1881        source_path,
1882        source_mtime,
1883        source_size,
1884        source_hash,
1885        created_at,
1886        token_count,
1887        metadata,
1888    })
1889}
1890
1891fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
1892    let metadata_str: Option<String> = row.get(12)?;
1893    let provenance_str: Option<String> = row.get(13)?;
1894    Ok(GlobalMemoryRecord {
1895        id: row.get(0)?,
1896        user_id: row.get(1)?,
1897        source_type: row.get(2)?,
1898        content: row.get(3)?,
1899        content_hash: row.get(4)?,
1900        run_id: row.get(5)?,
1901        session_id: row.get(6)?,
1902        message_id: row.get(7)?,
1903        tool_name: row.get(8)?,
1904        project_tag: row.get(9)?,
1905        channel_tag: row.get(10)?,
1906        host_tag: row.get(11)?,
1907        metadata: metadata_str
1908            .filter(|s| !s.is_empty())
1909            .and_then(|s| serde_json::from_str(&s).ok()),
1910        provenance: provenance_str
1911            .filter(|s| !s.is_empty())
1912            .and_then(|s| serde_json::from_str(&s).ok()),
1913        redaction_status: row.get(14)?,
1914        redaction_count: row.get::<_, i64>(15)? as u32,
1915        visibility: row.get(16)?,
1916        demoted: row.get::<_, i64>(17)? != 0,
1917        score_boost: row.get(18)?,
1918        created_at_ms: row.get::<_, i64>(19)? as u64,
1919        updated_at_ms: row.get::<_, i64>(20)? as u64,
1920        expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
1921    })
1922}
1923
1924fn build_fts_query(query: &str) -> String {
1925    let tokens = query
1926        .split_whitespace()
1927        .filter_map(|tok| {
1928            let cleaned =
1929                tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
1930            if cleaned.is_empty() {
1931                None
1932            } else {
1933                Some(format!("\"{}\"", cleaned))
1934            }
1935        })
1936        .collect::<Vec<_>>();
1937    if tokens.is_empty() {
1938        "\"\"".to_string()
1939    } else {
1940        tokens.join(" OR ")
1941    }
1942}
1943
1944#[cfg(test)]
1945mod tests {
1946    use super::*;
1947    use tempfile::TempDir;
1948
1949    async fn setup_test_db() -> (MemoryDatabase, TempDir) {
1950        let temp_dir = TempDir::new().unwrap();
1951        let db_path = temp_dir.path().join("test_memory.db");
1952        let db = MemoryDatabase::new(&db_path).await.unwrap();
1953        (db, temp_dir)
1954    }
1955
1956    #[tokio::test]
1957    async fn test_init_schema() {
1958        let (db, _temp) = setup_test_db().await;
1959        // If we get here, schema was initialized successfully
1960        let stats = db.get_stats().await.unwrap();
1961        assert_eq!(stats.total_chunks, 0);
1962    }
1963
1964    #[tokio::test]
1965    async fn test_store_and_retrieve_chunk() {
1966        let (db, _temp) = setup_test_db().await;
1967
1968        let chunk = MemoryChunk {
1969            id: "test-1".to_string(),
1970            content: "Test content".to_string(),
1971            tier: MemoryTier::Session,
1972            session_id: Some("session-1".to_string()),
1973            project_id: Some("project-1".to_string()),
1974            source: "user_message".to_string(),
1975            source_path: None,
1976            source_mtime: None,
1977            source_size: None,
1978            source_hash: None,
1979            created_at: Utc::now(),
1980            token_count: 10,
1981            metadata: None,
1982        };
1983
1984        let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
1985        db.store_chunk(&chunk, &embedding).await.unwrap();
1986
1987        let chunks = db.get_session_chunks("session-1").await.unwrap();
1988        assert_eq!(chunks.len(), 1);
1989        assert_eq!(chunks[0].content, "Test content");
1990    }
1991
1992    #[tokio::test]
1993    async fn test_config_crud() {
1994        let (db, _temp) = setup_test_db().await;
1995
1996        let config = db.get_or_create_config("project-1").await.unwrap();
1997        assert_eq!(config.max_chunks, 10000);
1998
1999        let new_config = MemoryConfig {
2000            max_chunks: 5000,
2001            ..Default::default()
2002        };
2003        db.update_config("project-1", &new_config).await.unwrap();
2004
2005        let updated = db.get_or_create_config("project-1").await.unwrap();
2006        assert_eq!(updated.max_chunks, 5000);
2007    }
2008
2009    #[tokio::test]
2010    async fn test_global_memory_put_search_and_dedup() {
2011        let (db, _temp) = setup_test_db().await;
2012        let now = chrono::Utc::now().timestamp_millis() as u64;
2013        let record = GlobalMemoryRecord {
2014            id: "gm-1".to_string(),
2015            user_id: "user-a".to_string(),
2016            source_type: "user_message".to_string(),
2017            content: "remember rust workspace layout".to_string(),
2018            content_hash: "h1".to_string(),
2019            run_id: "run-1".to_string(),
2020            session_id: Some("s1".to_string()),
2021            message_id: Some("m1".to_string()),
2022            tool_name: None,
2023            project_tag: Some("proj-x".to_string()),
2024            channel_tag: None,
2025            host_tag: None,
2026            metadata: None,
2027            provenance: None,
2028            redaction_status: "passed".to_string(),
2029            redaction_count: 0,
2030            visibility: "private".to_string(),
2031            demoted: false,
2032            score_boost: 0.0,
2033            created_at_ms: now,
2034            updated_at_ms: now,
2035            expires_at_ms: None,
2036        };
2037        let first = db.put_global_memory_record(&record).await.unwrap();
2038        assert!(first.stored);
2039        let second = db.put_global_memory_record(&record).await.unwrap();
2040        assert!(second.deduped);
2041
2042        let hits = db
2043            .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
2044            .await
2045            .unwrap();
2046        assert!(!hits.is_empty());
2047        assert_eq!(hits[0].record.id, "gm-1");
2048    }
2049}