Skip to main content

tandem_memory/
db.rs

1// Database Layer Module
2// SQLite + sqlite-vec for vector storage
3
4use crate::types::{
5    ClearFileIndexResult, MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier,
6    ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
7};
8use chrono::{DateTime, Utc};
9use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
10use sqlite_vec::sqlite3_vec_init;
11use std::collections::HashSet;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16
17type ProjectIndexStatusRow = (
18    Option<String>,
19    Option<i64>,
20    Option<i64>,
21    Option<i64>,
22    Option<i64>,
23    Option<i64>,
24);
25
26/// Database connection manager
27pub struct MemoryDatabase {
28    conn: Arc<Mutex<Connection>>,
29    db_path: std::path::PathBuf,
30}
31
32impl MemoryDatabase {
33    /// Initialize or open the memory database
34    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
35        // Register sqlite-vec extension
36        unsafe {
37            sqlite3_auto_extension(Some(std::mem::transmute::<
38                *const (),
39                unsafe extern "C" fn(
40                    *mut rusqlite::ffi::sqlite3,
41                    *mut *mut i8,
42                    *const rusqlite::ffi::sqlite3_api_routines,
43                ) -> i32,
44            >(sqlite3_vec_init as *const ())));
45        }
46
47        let conn = Connection::open(db_path)?;
48        conn.busy_timeout(Duration::from_secs(10))?;
49
50        // Enable WAL mode for better concurrency
51        // PRAGMA journal_mode returns a row, so we use query_row to ignore it
52        conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
53        conn.execute("PRAGMA synchronous = NORMAL", [])?;
54
55        let db = Self {
56            conn: Arc::new(Mutex::new(conn)),
57            db_path: db_path.to_path_buf(),
58        };
59
60        // Initialize schema
61        db.init_schema().await?;
62        if let Err(err) = db.validate_vector_tables().await {
63            match &err {
64                crate::types::MemoryError::Database(db_err)
65                    if Self::is_vector_table_error(db_err) =>
66                {
67                    tracing::warn!(
68                        "Detected vector table corruption during startup ({}). Recreating vector tables.",
69                        db_err
70                    );
71                    db.recreate_vector_tables().await?;
72                }
73                _ => return Err(err),
74            }
75        }
76        db.validate_integrity().await?;
77
78        Ok(db)
79    }
80
81    /// Validate base SQLite integrity early so startup recovery can heal corrupt DB files.
82    async fn validate_integrity(&self) -> MemoryResult<()> {
83        let conn = self.conn.lock().await;
84        let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
85        {
86            Ok(value) => value,
87            Err(err) => {
88                // sqlite-vec virtual tables can intermittently return generic SQL logic errors
89                // during integrity probing even when runtime reads/writes still work.
90                // Do not block startup on this probe failure.
91                tracing::warn!(
92                    "Skipping strict PRAGMA quick_check due to probe error: {}",
93                    err
94                );
95                return Ok(());
96            }
97        };
98        if check.trim().eq_ignore_ascii_case("ok") {
99            return Ok(());
100        }
101
102        let lowered = check.to_lowercase();
103        if lowered.contains("malformed")
104            || lowered.contains("corrupt")
105            || lowered.contains("database disk image is malformed")
106        {
107            return Err(crate::types::MemoryError::InvalidConfig(format!(
108                "malformed database integrity check: {}",
109                check
110            )));
111        }
112
113        tracing::warn!(
114            "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
115            check
116        );
117        Ok(())
118    }
119
120    /// Initialize database schema
121    async fn init_schema(&self) -> MemoryResult<()> {
122        let conn = self.conn.lock().await;
123
124        // Extension is already registered globally in new()
125
126        // Session memory chunks table
127        conn.execute(
128            "CREATE TABLE IF NOT EXISTS session_memory_chunks (
129                id TEXT PRIMARY KEY,
130                content TEXT NOT NULL,
131                session_id TEXT NOT NULL,
132                project_id TEXT,
133                source TEXT NOT NULL,
134                created_at TEXT NOT NULL,
135                token_count INTEGER NOT NULL DEFAULT 0,
136                metadata TEXT
137            )",
138            [],
139        )?;
140
141        // Session memory vectors (virtual table)
142        conn.execute(
143            &format!(
144                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
145                    chunk_id TEXT PRIMARY KEY,
146                    embedding float[{}]
147                )",
148                DEFAULT_EMBEDDING_DIMENSION
149            ),
150            [],
151        )?;
152
153        // Project memory chunks table
154        conn.execute(
155            "CREATE TABLE IF NOT EXISTS project_memory_chunks (
156                id TEXT PRIMARY KEY,
157                content TEXT NOT NULL,
158                project_id TEXT NOT NULL,
159                session_id TEXT,
160                source TEXT NOT NULL,
161                created_at TEXT NOT NULL,
162                token_count INTEGER NOT NULL DEFAULT 0,
163                metadata TEXT
164            )",
165            [],
166        )?;
167
168        // Migrations: file-derived columns on project_memory_chunks
169        // (SQLite doesn't support IF NOT EXISTS for columns, so we inspect table_info)
170        let existing_cols: HashSet<String> = {
171            let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
172            let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
173            rows.collect::<Result<HashSet<_>, _>>()?
174        };
175
176        if !existing_cols.contains("source_path") {
177            conn.execute(
178                "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
179                [],
180            )?;
181        }
182        if !existing_cols.contains("source_mtime") {
183            conn.execute(
184                "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
185                [],
186            )?;
187        }
188        if !existing_cols.contains("source_size") {
189            conn.execute(
190                "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
191                [],
192            )?;
193        }
194        if !existing_cols.contains("source_hash") {
195            conn.execute(
196                "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
197                [],
198            )?;
199        }
200
201        // Project memory vectors (virtual table)
202        conn.execute(
203            &format!(
204                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
205                    chunk_id TEXT PRIMARY KEY,
206                    embedding float[{}]
207                )",
208                DEFAULT_EMBEDDING_DIMENSION
209            ),
210            [],
211        )?;
212
213        // File indexing tables (project-scoped)
214        conn.execute(
215            "CREATE TABLE IF NOT EXISTS project_file_index (
216                project_id TEXT NOT NULL,
217                path TEXT NOT NULL,
218                mtime INTEGER NOT NULL,
219                size INTEGER NOT NULL,
220                hash TEXT NOT NULL,
221                indexed_at TEXT NOT NULL,
222                PRIMARY KEY(project_id, path)
223            )",
224            [],
225        )?;
226
227        conn.execute(
228            "CREATE TABLE IF NOT EXISTS project_index_status (
229                project_id TEXT PRIMARY KEY,
230                last_indexed_at TEXT,
231                last_total_files INTEGER,
232                last_processed_files INTEGER,
233                last_indexed_files INTEGER,
234                last_skipped_files INTEGER,
235                last_errors INTEGER
236            )",
237            [],
238        )?;
239
240        // Global memory chunks table
241        conn.execute(
242            "CREATE TABLE IF NOT EXISTS global_memory_chunks (
243                id TEXT PRIMARY KEY,
244                content TEXT NOT NULL,
245                source TEXT NOT NULL,
246                created_at TEXT NOT NULL,
247                token_count INTEGER NOT NULL DEFAULT 0,
248                metadata TEXT
249            )",
250            [],
251        )?;
252
253        // Global memory vectors (virtual table)
254        conn.execute(
255            &format!(
256                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
257                    chunk_id TEXT PRIMARY KEY,
258                    embedding float[{}]
259                )",
260                DEFAULT_EMBEDDING_DIMENSION
261            ),
262            [],
263        )?;
264
265        // Memory configuration table
266        conn.execute(
267            "CREATE TABLE IF NOT EXISTS memory_config (
268                project_id TEXT PRIMARY KEY,
269                max_chunks INTEGER NOT NULL DEFAULT 10000,
270                chunk_size INTEGER NOT NULL DEFAULT 512,
271                retrieval_k INTEGER NOT NULL DEFAULT 5,
272                auto_cleanup INTEGER NOT NULL DEFAULT 1,
273                session_retention_days INTEGER NOT NULL DEFAULT 30,
274                token_budget INTEGER NOT NULL DEFAULT 5000,
275                chunk_overlap INTEGER NOT NULL DEFAULT 64,
276                updated_at TEXT NOT NULL
277            )",
278            [],
279        )?;
280
281        // Cleanup log table
282        conn.execute(
283            "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
284                id TEXT PRIMARY KEY,
285                cleanup_type TEXT NOT NULL,
286                tier TEXT NOT NULL,
287                project_id TEXT,
288                session_id TEXT,
289                chunks_deleted INTEGER NOT NULL DEFAULT 0,
290                bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
291                created_at TEXT NOT NULL
292            )",
293            [],
294        )?;
295
296        // Create indexes for better query performance
297        conn.execute(
298            "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
299            [],
300        )?;
301        conn.execute(
302            "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
303            [],
304        )?;
305        conn.execute(
306            "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
307            [],
308        )?;
309        conn.execute(
310            "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
311            [],
312        )?;
313        conn.execute(
314            "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
315            [],
316        )?;
317        conn.execute(
318            "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
319            [],
320        )?;
321
322        Ok(())
323    }
324
325    /// Validate that sqlite-vec tables are readable.
326    /// This catches legacy/corrupted vector blobs early so startup can recover.
327    pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
328        let conn = self.conn.lock().await;
329        let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
330
331        for table in [
332            "session_memory_vectors",
333            "project_memory_vectors",
334            "global_memory_vectors",
335        ] {
336            let sql = format!("SELECT COUNT(*) FROM {}", table);
337            let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
338
339            // COUNT(*) can pass even when vector chunk blobs are unreadable.
340            // Probe sqlite-vec MATCH execution to surface latent blob corruption.
341            if row_count > 0 {
342                let probe_sql = format!(
343                    "SELECT chunk_id, distance
344                     FROM {}
345                     WHERE embedding MATCH ?1 AND k = 1",
346                    table
347                );
348                let mut stmt = conn.prepare(&probe_sql)?;
349                let mut rows = stmt.query(params![probe_embedding.as_str()])?;
350                let _ = rows.next()?;
351            }
352        }
353        Ok(())
354    }
355
356    fn is_vector_table_error(err: &rusqlite::Error) -> bool {
357        let text = err.to_string().to_lowercase();
358        text.contains("vector blob")
359            || text.contains("chunks iter error")
360            || text.contains("chunks iter")
361            || text.contains("internal sqlite-vec error")
362            || text.contains("insert rowids id")
363            || text.contains("sql logic error")
364            || text.contains("database disk image is malformed")
365            || text.contains("session_memory_vectors")
366            || text.contains("project_memory_vectors")
367            || text.contains("global_memory_vectors")
368            || text.contains("vec0")
369    }
370
371    async fn recreate_vector_tables(&self) -> MemoryResult<()> {
372        let conn = self.conn.lock().await;
373
374        for base in [
375            "session_memory_vectors",
376            "project_memory_vectors",
377            "global_memory_vectors",
378        ] {
379            // Drop vec virtual table and common sqlite-vec shadow tables first.
380            for name in [
381                base.to_string(),
382                format!("{}_chunks", base),
383                format!("{}_info", base),
384                format!("{}_rowids", base),
385                format!("{}_vector_chunks00", base),
386            ] {
387                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
388                conn.execute(&sql, [])?;
389            }
390
391            // Drop any additional shadow tables (e.g. *_vector_chunks01).
392            let like_pattern = format!("{base}_%");
393            let mut stmt = conn.prepare(
394                "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
395            )?;
396            let table_names = stmt
397                .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
398                .collect::<Result<Vec<_>, _>>()?;
399            drop(stmt);
400            for name in table_names {
401                let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
402                conn.execute(&sql, [])?;
403            }
404        }
405
406        conn.execute(
407            &format!(
408                "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
409                    chunk_id TEXT PRIMARY KEY,
410                    embedding float[{}]
411                )",
412                DEFAULT_EMBEDDING_DIMENSION
413            ),
414            [],
415        )?;
416
417        conn.execute(
418            &format!(
419                "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
420                    chunk_id TEXT PRIMARY KEY,
421                    embedding float[{}]
422                )",
423                DEFAULT_EMBEDDING_DIMENSION
424            ),
425            [],
426        )?;
427
428        conn.execute(
429            &format!(
430                "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
431                    chunk_id TEXT PRIMARY KEY,
432                    embedding float[{}]
433                )",
434                DEFAULT_EMBEDDING_DIMENSION
435            ),
436            [],
437        )?;
438
439        Ok(())
440    }
441
442    /// Ensure vector tables are readable and recreate them if corruption is detected.
443    /// Returns true when a repair was performed.
444    pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
445        match self.validate_vector_tables().await {
446            Ok(()) => Ok(false),
447            Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
448                tracing::warn!(
449                    "Memory vector tables appear corrupted ({}). Recreating vector tables.",
450                    err
451                );
452                self.recreate_vector_tables().await?;
453                Ok(true)
454            }
455            Err(err) => Err(err),
456        }
457    }
458
459    /// Last-resort runtime repair for malformed DB states: drop user memory tables
460    /// and recreate the schema in-place so new writes can proceed.
461    /// This intentionally clears memory content for the active DB file.
462    pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
463        let table_names = {
464            let conn = self.conn.lock().await;
465            let mut stmt = conn.prepare(
466                "SELECT name FROM sqlite_master
467                 WHERE type='table'
468                   AND name NOT LIKE 'sqlite_%'
469                 ORDER BY name",
470            )?;
471            let names = stmt
472                .query_map([], |row| row.get::<_, String>(0))?
473                .collect::<Result<Vec<_>, _>>()?;
474            names
475        };
476
477        {
478            let conn = self.conn.lock().await;
479            for table in table_names {
480                let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
481                let _ = conn.execute(&sql, []);
482            }
483        }
484
485        self.init_schema().await
486    }
487
488    /// Attempt an immediate vector-table repair when a concrete DB error indicates
489    /// sqlite-vec internals are failing at statement/rowid level.
490    pub async fn try_repair_after_error(
491        &self,
492        err: &crate::types::MemoryError,
493    ) -> MemoryResult<bool> {
494        match err {
495            crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
496                tracing::warn!(
497                    "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
498                    db_err
499                );
500                self.recreate_vector_tables().await?;
501                Ok(true)
502            }
503            _ => Ok(false),
504        }
505    }
506
507    /// Store a chunk with its embedding
508    pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
509        let conn = self.conn.lock().await;
510
511        let (chunks_table, vectors_table) = match chunk.tier {
512            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
513            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
514            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
515        };
516
517        let created_at_str = chunk.created_at.to_rfc3339();
518        let metadata_str = chunk
519            .metadata
520            .as_ref()
521            .map(|m| m.to_string())
522            .unwrap_or_default();
523
524        // Insert chunk
525        match chunk.tier {
526            MemoryTier::Session => {
527                conn.execute(
528                    &format!(
529                        "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata) 
530                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
531                        chunks_table
532                    ),
533                    params![
534                        chunk.id,
535                        chunk.content,
536                        chunk.session_id.as_ref().unwrap_or(&String::new()),
537                        chunk.project_id,
538                        chunk.source,
539                        created_at_str,
540                        chunk.token_count,
541                        metadata_str
542                    ],
543                )?;
544            }
545            MemoryTier::Project => {
546                conn.execute(
547                    &format!(
548                        "INSERT INTO {} (
549                            id, content, project_id, session_id, source, created_at, token_count, metadata,
550                            source_path, source_mtime, source_size, source_hash
551                         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
552                        chunks_table
553                    ),
554                    params![
555                        chunk.id,
556                        chunk.content,
557                        chunk.project_id.as_ref().unwrap_or(&String::new()),
558                        chunk.session_id,
559                        chunk.source,
560                        created_at_str,
561                        chunk.token_count,
562                        metadata_str,
563                        chunk.source_path.clone(),
564                        chunk.source_mtime,
565                        chunk.source_size,
566                        chunk.source_hash.clone()
567                    ],
568                )?;
569            }
570            MemoryTier::Global => {
571                conn.execute(
572                    &format!(
573                        "INSERT INTO {} (id, content, source, created_at, token_count, metadata) 
574                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
575                        chunks_table
576                    ),
577                    params![
578                        chunk.id,
579                        chunk.content,
580                        chunk.source,
581                        created_at_str,
582                        chunk.token_count,
583                        metadata_str
584                    ],
585                )?;
586            }
587        }
588
589        // Insert embedding
590        let embedding_json = format!(
591            "[{}]",
592            embedding
593                .iter()
594                .map(|f| f.to_string())
595                .collect::<Vec<_>>()
596                .join(",")
597        );
598        conn.execute(
599            &format!(
600                "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
601                vectors_table
602            ),
603            params![chunk.id, embedding_json],
604        )?;
605
606        Ok(())
607    }
608
609    /// Search for similar chunks
610    pub async fn search_similar(
611        &self,
612        query_embedding: &[f32],
613        tier: MemoryTier,
614        project_id: Option<&str>,
615        session_id: Option<&str>,
616        limit: i64,
617    ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
618        let conn = self.conn.lock().await;
619
620        let (chunks_table, vectors_table) = match tier {
621            MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
622            MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
623            MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
624        };
625
626        let embedding_json = format!(
627            "[{}]",
628            query_embedding
629                .iter()
630                .map(|f| f.to_string())
631                .collect::<Vec<_>>()
632                .join(",")
633        );
634
635        // Build query based on tier and filters
636        let results = match tier {
637            MemoryTier::Session => {
638                if let Some(sid) = session_id {
639                    let sql = format!(
640                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
641                                v.distance
642                         FROM {} AS v
643                         JOIN {} AS c ON v.chunk_id = c.id
644                         WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
645                         ORDER BY v.distance",
646                        vectors_table, chunks_table
647                    );
648                    let mut stmt = conn.prepare(&sql)?;
649                    let results = stmt
650                        .query_map(params![sid, embedding_json, limit], |row| {
651                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
652                        })?
653                        .collect::<Result<Vec<_>, _>>()?;
654                    results
655                } else if let Some(pid) = project_id {
656                    let sql = format!(
657                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
658                                v.distance
659                         FROM {} AS v
660                         JOIN {} AS c ON v.chunk_id = c.id
661                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
662                         ORDER BY v.distance",
663                        vectors_table, chunks_table
664                    );
665                    let mut stmt = conn.prepare(&sql)?;
666                    let results = stmt
667                        .query_map(params![pid, embedding_json, limit], |row| {
668                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
669                        })?
670                        .collect::<Result<Vec<_>, _>>()?;
671                    results
672                } else {
673                    let sql = format!(
674                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
675                                v.distance
676                         FROM {} AS v
677                         JOIN {} AS c ON v.chunk_id = c.id
678                         WHERE v.embedding MATCH ?1 AND k = ?2
679                         ORDER BY v.distance",
680                        vectors_table, chunks_table
681                    );
682                    let mut stmt = conn.prepare(&sql)?;
683                    let results = stmt
684                        .query_map(params![embedding_json, limit], |row| {
685                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
686                        })?
687                        .collect::<Result<Vec<_>, _>>()?;
688                    results
689                }
690            }
691            MemoryTier::Project => {
692                if let Some(pid) = project_id {
693                    let sql = format!(
694                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
695                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
696                                v.distance
697                         FROM {} AS v
698                         JOIN {} AS c ON v.chunk_id = c.id
699                         WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
700                         ORDER BY v.distance",
701                        vectors_table, chunks_table
702                    );
703                    let mut stmt = conn.prepare(&sql)?;
704                    let results = stmt
705                        .query_map(params![pid, embedding_json, limit], |row| {
706                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
707                        })?
708                        .collect::<Result<Vec<_>, _>>()?;
709                    results
710                } else {
711                    let sql = format!(
712                        "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
713                                c.source_path, c.source_mtime, c.source_size, c.source_hash,
714                                v.distance
715                         FROM {} AS v
716                         JOIN {} AS c ON v.chunk_id = c.id
717                         WHERE v.embedding MATCH ?1 AND k = ?2
718                         ORDER BY v.distance",
719                        vectors_table, chunks_table
720                    );
721                    let mut stmt = conn.prepare(&sql)?;
722                    let results = stmt
723                        .query_map(params![embedding_json, limit], |row| {
724                            Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
725                        })?
726                        .collect::<Result<Vec<_>, _>>()?;
727                    results
728                }
729            }
730            MemoryTier::Global => {
731                let sql = format!(
732                    "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
733                            v.distance
734                     FROM {} AS v
735                     JOIN {} AS c ON v.chunk_id = c.id
736                     WHERE v.embedding MATCH ?1 AND k = ?2
737                     ORDER BY v.distance",
738                    vectors_table, chunks_table
739                );
740                let mut stmt = conn.prepare(&sql)?;
741                let results = stmt
742                    .query_map(params![embedding_json, limit], |row| {
743                        Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
744                    })?
745                    .collect::<Result<Vec<_>, _>>()?;
746                results
747            }
748        };
749
750        Ok(results)
751    }
752
753    /// Get chunks by session ID
754    pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
755        let conn = self.conn.lock().await;
756
757        let mut stmt = conn.prepare(
758            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
759             FROM session_memory_chunks
760             WHERE session_id = ?1
761             ORDER BY created_at DESC",
762        )?;
763
764        let chunks = stmt
765            .query_map(params![session_id], |row| {
766                row_to_chunk(row, MemoryTier::Session)
767            })?
768            .collect::<Result<Vec<_>, _>>()?;
769
770        Ok(chunks)
771    }
772
773    /// Get chunks by project ID
774    pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
775        let conn = self.conn.lock().await;
776
777        let mut stmt = conn.prepare(
778            "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
779                    source_path, source_mtime, source_size, source_hash
780             FROM project_memory_chunks
781             WHERE project_id = ?1
782             ORDER BY created_at DESC",
783        )?;
784
785        let chunks = stmt
786            .query_map(params![project_id], |row| {
787                row_to_chunk(row, MemoryTier::Project)
788            })?
789            .collect::<Result<Vec<_>, _>>()?;
790
791        Ok(chunks)
792    }
793
794    /// Get global chunks
795    pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
796        let conn = self.conn.lock().await;
797
798        let mut stmt = conn.prepare(
799            "SELECT id, content, source, created_at, token_count, metadata
800             FROM global_memory_chunks
801             ORDER BY created_at DESC
802             LIMIT ?1",
803        )?;
804
805        let chunks = stmt
806            .query_map(params![limit], |row| {
807                let id: String = row.get(0)?;
808                let content: String = row.get(1)?;
809                let source: String = row.get(2)?;
810                let created_at_str: String = row.get(3)?;
811                let token_count: i64 = row.get(4)?;
812                let metadata_str: Option<String> = row.get(5)?;
813
814                let created_at = DateTime::parse_from_rfc3339(&created_at_str)
815                    .map_err(|e| {
816                        rusqlite::Error::FromSqlConversionFailure(
817                            3,
818                            rusqlite::types::Type::Text,
819                            Box::new(e),
820                        )
821                    })?
822                    .with_timezone(&Utc);
823
824                let metadata = metadata_str
825                    .filter(|s| !s.is_empty())
826                    .and_then(|s| serde_json::from_str(&s).ok());
827
828                Ok(MemoryChunk {
829                    id,
830                    content,
831                    tier: MemoryTier::Global,
832                    session_id: None,
833                    project_id: None,
834                    source,
835                    source_path: None,
836                    source_mtime: None,
837                    source_size: None,
838                    source_hash: None,
839                    created_at,
840                    token_count,
841                    metadata,
842                })
843            })?
844            .collect::<Result<Vec<_>, _>>()?;
845
846        Ok(chunks)
847    }
848
849    /// Clear session memory
850    pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
851        let conn = self.conn.lock().await;
852
853        // Get count before deletion
854        let count: i64 = conn.query_row(
855            "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
856            params![session_id],
857            |row| row.get(0),
858        )?;
859
860        // Delete vectors first (foreign key constraint)
861        conn.execute(
862            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
863             (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
864            params![session_id],
865        )?;
866
867        // Delete chunks
868        conn.execute(
869            "DELETE FROM session_memory_chunks WHERE session_id = ?1",
870            params![session_id],
871        )?;
872
873        Ok(count as u64)
874    }
875
876    /// Clear project memory
877    pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
878        let conn = self.conn.lock().await;
879
880        // Get count before deletion
881        let count: i64 = conn.query_row(
882            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
883            params![project_id],
884            |row| row.get(0),
885        )?;
886
887        // Delete vectors first
888        conn.execute(
889            "DELETE FROM project_memory_vectors WHERE chunk_id IN 
890             (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
891            params![project_id],
892        )?;
893
894        // Delete chunks
895        conn.execute(
896            "DELETE FROM project_memory_chunks WHERE project_id = ?1",
897            params![project_id],
898        )?;
899
900        Ok(count as u64)
901    }
902
903    /// Clear old session memory based on retention policy
904    pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
905        let conn = self.conn.lock().await;
906
907        let cutoff = Utc::now() - chrono::Duration::days(retention_days);
908        let cutoff_str = cutoff.to_rfc3339();
909
910        // Get count before deletion
911        let count: i64 = conn.query_row(
912            "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
913            params![cutoff_str],
914            |row| row.get(0),
915        )?;
916
917        // Delete vectors first
918        conn.execute(
919            "DELETE FROM session_memory_vectors WHERE chunk_id IN 
920             (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
921            params![cutoff_str],
922        )?;
923
924        // Delete chunks
925        conn.execute(
926            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
927            params![cutoff_str],
928        )?;
929
930        Ok(count as u64)
931    }
932
933    /// Get or create memory config for a project
934    pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
935        let conn = self.conn.lock().await;
936
937        let result: Option<MemoryConfig> = conn
938            .query_row(
939                "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup, 
940                        session_retention_days, token_budget, chunk_overlap
941                 FROM memory_config WHERE project_id = ?1",
942                params![project_id],
943                |row| {
944                    Ok(MemoryConfig {
945                        max_chunks: row.get(0)?,
946                        chunk_size: row.get(1)?,
947                        retrieval_k: row.get(2)?,
948                        auto_cleanup: row.get::<_, i64>(3)? != 0,
949                        session_retention_days: row.get(4)?,
950                        token_budget: row.get(5)?,
951                        chunk_overlap: row.get(6)?,
952                    })
953                },
954            )
955            .optional()?;
956
957        match result {
958            Some(config) => Ok(config),
959            None => {
960                // Create default config
961                let config = MemoryConfig::default();
962                let updated_at = Utc::now().to_rfc3339();
963
964                conn.execute(
965                    "INSERT INTO memory_config 
966                     (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
967                      session_retention_days, token_budget, chunk_overlap, updated_at)
968                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
969                    params![
970                        project_id,
971                        config.max_chunks,
972                        config.chunk_size,
973                        config.retrieval_k,
974                        config.auto_cleanup as i64,
975                        config.session_retention_days,
976                        config.token_budget,
977                        config.chunk_overlap,
978                        updated_at
979                    ],
980                )?;
981
982                Ok(config)
983            }
984        }
985    }
986
987    /// Update memory config for a project
988    pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
989        let conn = self.conn.lock().await;
990
991        let updated_at = Utc::now().to_rfc3339();
992
993        conn.execute(
994            "INSERT OR REPLACE INTO memory_config 
995             (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup, 
996              session_retention_days, token_budget, chunk_overlap, updated_at)
997             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
998            params![
999                project_id,
1000                config.max_chunks,
1001                config.chunk_size,
1002                config.retrieval_k,
1003                config.auto_cleanup as i64,
1004                config.session_retention_days,
1005                config.token_budget,
1006                config.chunk_overlap,
1007                updated_at
1008            ],
1009        )?;
1010
1011        Ok(())
1012    }
1013
1014    /// Get memory statistics
1015    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1016        let conn = self.conn.lock().await;
1017
1018        // Count chunks
1019        let session_chunks: i64 =
1020            conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1021                row.get(0)
1022            })?;
1023
1024        let project_chunks: i64 =
1025            conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1026                row.get(0)
1027            })?;
1028
1029        let global_chunks: i64 =
1030            conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1031                row.get(0)
1032            })?;
1033
1034        // Calculate sizes
1035        let session_bytes: i64 = conn.query_row(
1036            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1037            [],
1038            |row| row.get(0),
1039        )?;
1040
1041        let project_bytes: i64 = conn.query_row(
1042            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1043            [],
1044            |row| row.get(0),
1045        )?;
1046
1047        let global_bytes: i64 = conn.query_row(
1048            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1049            [],
1050            |row| row.get(0),
1051        )?;
1052
1053        // Get last cleanup
1054        let last_cleanup: Option<String> = conn
1055            .query_row(
1056                "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1057                [],
1058                |row| row.get(0),
1059            )
1060            .optional()?;
1061
1062        let last_cleanup = last_cleanup.and_then(|s| {
1063            DateTime::parse_from_rfc3339(&s)
1064                .ok()
1065                .map(|dt| dt.with_timezone(&Utc))
1066        });
1067
1068        // Get file size
1069        let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1070
1071        Ok(MemoryStats {
1072            total_chunks: session_chunks + project_chunks + global_chunks,
1073            session_chunks,
1074            project_chunks,
1075            global_chunks,
1076            total_bytes: session_bytes + project_bytes + global_bytes,
1077            session_bytes,
1078            project_bytes,
1079            global_bytes,
1080            file_size,
1081            last_cleanup,
1082        })
1083    }
1084
1085    /// Log cleanup operation
1086    pub async fn log_cleanup(
1087        &self,
1088        cleanup_type: &str,
1089        tier: MemoryTier,
1090        project_id: Option<&str>,
1091        session_id: Option<&str>,
1092        chunks_deleted: i64,
1093        bytes_reclaimed: i64,
1094    ) -> MemoryResult<()> {
1095        let conn = self.conn.lock().await;
1096
1097        let id = uuid::Uuid::new_v4().to_string();
1098        let created_at = Utc::now().to_rfc3339();
1099
1100        conn.execute(
1101            "INSERT INTO memory_cleanup_log 
1102             (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1103             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1104            params![
1105                id,
1106                cleanup_type,
1107                tier.to_string(),
1108                project_id,
1109                session_id,
1110                chunks_deleted,
1111                bytes_reclaimed,
1112                created_at
1113            ],
1114        )?;
1115
1116        Ok(())
1117    }
1118
1119    /// Vacuum the database to reclaim space
1120    pub async fn vacuum(&self) -> MemoryResult<()> {
1121        let conn = self.conn.lock().await;
1122        conn.execute("VACUUM", [])?;
1123        Ok(())
1124    }
1125
1126    // ---------------------------------------------------------------------
1127    // Project file indexing helpers
1128    // ---------------------------------------------------------------------
1129
1130    pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1131        let conn = self.conn.lock().await;
1132        let n: i64 = conn.query_row(
1133            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1134            params![project_id],
1135            |row| row.get(0),
1136        )?;
1137        Ok(n)
1138    }
1139
1140    pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1141        let conn = self.conn.lock().await;
1142        let exists: Option<i64> = conn
1143            .query_row(
1144                "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1145                params![project_id],
1146                |row| row.get(0),
1147            )
1148            .optional()?;
1149        Ok(exists.is_some())
1150    }
1151
1152    pub async fn get_file_index_entry(
1153        &self,
1154        project_id: &str,
1155        path: &str,
1156    ) -> MemoryResult<Option<(i64, i64, String)>> {
1157        let conn = self.conn.lock().await;
1158        let row: Option<(i64, i64, String)> = conn
1159            .query_row(
1160                "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1161                params![project_id, path],
1162                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1163            )
1164            .optional()?;
1165        Ok(row)
1166    }
1167
1168    pub async fn upsert_file_index_entry(
1169        &self,
1170        project_id: &str,
1171        path: &str,
1172        mtime: i64,
1173        size: i64,
1174        hash: &str,
1175    ) -> MemoryResult<()> {
1176        let conn = self.conn.lock().await;
1177        let indexed_at = Utc::now().to_rfc3339();
1178        conn.execute(
1179            "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1180             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1181             ON CONFLICT(project_id, path) DO UPDATE SET
1182                mtime = excluded.mtime,
1183                size = excluded.size,
1184                hash = excluded.hash,
1185                indexed_at = excluded.indexed_at",
1186            params![project_id, path, mtime, size, hash, indexed_at],
1187        )?;
1188        Ok(())
1189    }
1190
1191    pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1192        let conn = self.conn.lock().await;
1193        conn.execute(
1194            "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1195            params![project_id, path],
1196        )?;
1197        Ok(())
1198    }
1199
1200    pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1201        let conn = self.conn.lock().await;
1202        let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1203        let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1204        Ok(rows.collect::<Result<Vec<_>, _>>()?)
1205    }
1206
1207    pub async fn delete_project_file_chunks_by_path(
1208        &self,
1209        project_id: &str,
1210        source_path: &str,
1211    ) -> MemoryResult<(i64, i64)> {
1212        let conn = self.conn.lock().await;
1213
1214        let chunks_deleted: i64 = conn.query_row(
1215            "SELECT COUNT(*) FROM project_memory_chunks
1216             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1217            params![project_id, source_path],
1218            |row| row.get(0),
1219        )?;
1220
1221        let bytes_estimated: i64 = conn.query_row(
1222            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1223             WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1224            params![project_id, source_path],
1225            |row| row.get(0),
1226        )?;
1227
1228        // Delete vectors first (keep order consistent with other clears)
1229        conn.execute(
1230            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1231             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1232            params![project_id, source_path],
1233        )?;
1234
1235        conn.execute(
1236            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1237            params![project_id, source_path],
1238        )?;
1239
1240        Ok((chunks_deleted, bytes_estimated))
1241    }
1242
1243    pub async fn upsert_project_index_status(
1244        &self,
1245        project_id: &str,
1246        total_files: i64,
1247        processed_files: i64,
1248        indexed_files: i64,
1249        skipped_files: i64,
1250        errors: i64,
1251    ) -> MemoryResult<()> {
1252        let conn = self.conn.lock().await;
1253        let last_indexed_at = Utc::now().to_rfc3339();
1254        conn.execute(
1255            "INSERT INTO project_index_status (
1256                project_id, last_indexed_at, last_total_files, last_processed_files,
1257                last_indexed_files, last_skipped_files, last_errors
1258             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1259             ON CONFLICT(project_id) DO UPDATE SET
1260                last_indexed_at = excluded.last_indexed_at,
1261                last_total_files = excluded.last_total_files,
1262                last_processed_files = excluded.last_processed_files,
1263                last_indexed_files = excluded.last_indexed_files,
1264                last_skipped_files = excluded.last_skipped_files,
1265                last_errors = excluded.last_errors",
1266            params![
1267                project_id,
1268                last_indexed_at,
1269                total_files,
1270                processed_files,
1271                indexed_files,
1272                skipped_files,
1273                errors
1274            ],
1275        )?;
1276        Ok(())
1277    }
1278
1279    pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1280        let conn = self.conn.lock().await;
1281
1282        let project_chunks: i64 = conn.query_row(
1283            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1284            params![project_id],
1285            |row| row.get(0),
1286        )?;
1287
1288        let project_bytes: i64 = conn.query_row(
1289            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1290            params![project_id],
1291            |row| row.get(0),
1292        )?;
1293
1294        let file_index_chunks: i64 = conn.query_row(
1295            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1296            params![project_id],
1297            |row| row.get(0),
1298        )?;
1299
1300        let file_index_bytes: i64 = conn.query_row(
1301            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1302            params![project_id],
1303            |row| row.get(0),
1304        )?;
1305
1306        let indexed_files: i64 = conn.query_row(
1307            "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1308            params![project_id],
1309            |row| row.get(0),
1310        )?;
1311
1312        let status_row: Option<ProjectIndexStatusRow> =
1313            conn
1314                .query_row(
1315                    "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1316                     FROM project_index_status WHERE project_id = ?1",
1317                    params![project_id],
1318                    |row| {
1319                        Ok((
1320                            row.get(0)?,
1321                            row.get(1)?,
1322                            row.get(2)?,
1323                            row.get(3)?,
1324                            row.get(4)?,
1325                            row.get(5)?,
1326                        ))
1327                    },
1328                )
1329                .optional()?;
1330
1331        let (
1332            last_indexed_at,
1333            last_total_files,
1334            last_processed_files,
1335            last_indexed_files,
1336            last_skipped_files,
1337            last_errors,
1338        ) = status_row.unwrap_or((None, None, None, None, None, None));
1339
1340        let last_indexed_at = last_indexed_at.and_then(|s| {
1341            DateTime::parse_from_rfc3339(&s)
1342                .ok()
1343                .map(|dt| dt.with_timezone(&Utc))
1344        });
1345
1346        Ok(ProjectMemoryStats {
1347            project_id: project_id.to_string(),
1348            project_chunks,
1349            project_bytes,
1350            file_index_chunks,
1351            file_index_bytes,
1352            indexed_files,
1353            last_indexed_at,
1354            last_total_files,
1355            last_processed_files,
1356            last_indexed_files,
1357            last_skipped_files,
1358            last_errors,
1359        })
1360    }
1361
1362    pub async fn clear_project_file_index(
1363        &self,
1364        project_id: &str,
1365        vacuum: bool,
1366    ) -> MemoryResult<ClearFileIndexResult> {
1367        let conn = self.conn.lock().await;
1368
1369        let chunks_deleted: i64 = conn.query_row(
1370            "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1371            params![project_id],
1372            |row| row.get(0),
1373        )?;
1374
1375        let bytes_estimated: i64 = conn.query_row(
1376            "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1377            params![project_id],
1378            |row| row.get(0),
1379        )?;
1380
1381        // Delete vectors first
1382        conn.execute(
1383            "DELETE FROM project_memory_vectors WHERE chunk_id IN
1384             (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1385            params![project_id],
1386        )?;
1387
1388        // Delete file chunks
1389        conn.execute(
1390            "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1391            params![project_id],
1392        )?;
1393
1394        // Clear file index tracking + status
1395        conn.execute(
1396            "DELETE FROM project_file_index WHERE project_id = ?1",
1397            params![project_id],
1398        )?;
1399        conn.execute(
1400            "DELETE FROM project_index_status WHERE project_id = ?1",
1401            params![project_id],
1402        )?;
1403
1404        drop(conn); // release lock before VACUUM (which needs exclusive access)
1405
1406        if vacuum {
1407            self.vacuum().await?;
1408        }
1409
1410        Ok(ClearFileIndexResult {
1411            chunks_deleted,
1412            bytes_estimated,
1413            did_vacuum: vacuum,
1414        })
1415    }
1416
1417    // ------------------------------------------------------------------
1418    // Memory hygiene
1419    // ------------------------------------------------------------------
1420
1421    /// Delete session memory chunks older than `retention_days` days.
1422    ///
1423    /// Also removes orphaned vector entries for the deleted chunks so the
1424    /// sqlite-vec virtual table stays consistent.
1425    ///
1426    /// Returns the number of chunk rows deleted.
1427    /// If `retention_days` is 0 hygiene is disabled and this returns Ok(0).
1428    pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1429        if retention_days == 0 {
1430            return Ok(0);
1431        }
1432
1433        let conn = self.conn.lock().await;
1434
1435        // WAL is already active (set in new()) — no need to set it again here.
1436        let cutoff =
1437            (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1438
1439        // Remove orphaned vector entries first (chunk_id FK would dangle otherwise)
1440        conn.execute(
1441            "DELETE FROM session_memory_vectors
1442             WHERE chunk_id IN (
1443                 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1444             )",
1445            params![cutoff],
1446        )?;
1447
1448        let deleted = conn.execute(
1449            "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1450            params![cutoff],
1451        )?;
1452
1453        if deleted > 0 {
1454            tracing::info!(
1455                retention_days,
1456                deleted,
1457                "memory hygiene: pruned old session chunks"
1458            );
1459        }
1460
1461        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1462        Ok(deleted as u64)
1463    }
1464
1465    /// Run scheduled hygiene: read `session_retention_days` from `memory_config`
1466    /// (falling back to `env_override` if provided) and prune stale session chunks.
1467    ///
1468    /// Returns `Ok(chunks_deleted)`. This method is intentionally best-effort —
1469    /// callers should log errors and continue.
1470    pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
1471        // Prefer the env override, fall back to the DB config for the null project.
1472        let retention_days = if env_override_days > 0 {
1473            env_override_days
1474        } else {
1475            // Try to read the global (project_id = '__global__') config if present.
1476            let conn = self.conn.lock().await;
1477            let days: Option<i64> = conn
1478                .query_row(
1479                    "SELECT session_retention_days FROM memory_config
1480                     WHERE project_id = '__global__' LIMIT 1",
1481                    [],
1482                    |row| row.get(0),
1483                )
1484                .ok();
1485            drop(conn);
1486            days.unwrap_or(30) as u32
1487        };
1488
1489        self.prune_old_session_chunks(retention_days).await
1490    }
1491}
1492
1493/// Convert a database row to a MemoryChunk
1494fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1495    let id: String = row.get(0)?;
1496    let content: String = row.get(1)?;
1497
1498    let session_id: Option<String> = match tier {
1499        MemoryTier::Session => Some(row.get(2)?),
1500        MemoryTier::Project => row.get(2)?,
1501        MemoryTier::Global => None,
1502    };
1503
1504    let project_id: Option<String> = match tier {
1505        MemoryTier::Session => row.get(3)?,
1506        MemoryTier::Project => Some(row.get(3)?),
1507        MemoryTier::Global => None,
1508    };
1509
1510    let source: String = row.get(4)?;
1511    let created_at_str: String = row.get(5)?;
1512    let token_count: i64 = row.get(6)?;
1513    let metadata_str: Option<String> = row.get(7)?;
1514
1515    let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1516        .map_err(|e| {
1517            rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1518        })?
1519        .with_timezone(&Utc);
1520
1521    let metadata = metadata_str
1522        .filter(|s| !s.is_empty())
1523        .and_then(|s| serde_json::from_str(&s).ok());
1524
1525    let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1526    let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1527    let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1528    let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1529
1530    Ok(MemoryChunk {
1531        id,
1532        content,
1533        tier,
1534        session_id,
1535        project_id,
1536        source,
1537        source_path,
1538        source_mtime,
1539        source_size,
1540        source_hash,
1541        created_at,
1542        token_count,
1543        metadata,
1544    })
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549    use super::*;
1550    use tempfile::TempDir;
1551
1552    async fn setup_test_db() -> (MemoryDatabase, TempDir) {
1553        let temp_dir = TempDir::new().unwrap();
1554        let db_path = temp_dir.path().join("test_memory.db");
1555        let db = MemoryDatabase::new(&db_path).await.unwrap();
1556        (db, temp_dir)
1557    }
1558
1559    #[tokio::test]
1560    async fn test_init_schema() {
1561        let (db, _temp) = setup_test_db().await;
1562        // If we get here, schema was initialized successfully
1563        let stats = db.get_stats().await.unwrap();
1564        assert_eq!(stats.total_chunks, 0);
1565    }
1566
1567    #[tokio::test]
1568    async fn test_store_and_retrieve_chunk() {
1569        let (db, _temp) = setup_test_db().await;
1570
1571        let chunk = MemoryChunk {
1572            id: "test-1".to_string(),
1573            content: "Test content".to_string(),
1574            tier: MemoryTier::Session,
1575            session_id: Some("session-1".to_string()),
1576            project_id: Some("project-1".to_string()),
1577            source: "user_message".to_string(),
1578            source_path: None,
1579            source_mtime: None,
1580            source_size: None,
1581            source_hash: None,
1582            created_at: Utc::now(),
1583            token_count: 10,
1584            metadata: None,
1585        };
1586
1587        let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
1588        db.store_chunk(&chunk, &embedding).await.unwrap();
1589
1590        let chunks = db.get_session_chunks("session-1").await.unwrap();
1591        assert_eq!(chunks.len(), 1);
1592        assert_eq!(chunks[0].content, "Test content");
1593    }
1594
1595    #[tokio::test]
1596    async fn test_config_crud() {
1597        let (db, _temp) = setup_test_db().await;
1598
1599        let config = db.get_or_create_config("project-1").await.unwrap();
1600        assert_eq!(config.max_chunks, 10000);
1601
1602        let new_config = MemoryConfig {
1603            max_chunks: 5000,
1604            ..Default::default()
1605        };
1606        db.update_config("project-1", &new_config).await.unwrap();
1607
1608        let updated = db.get_or_create_config("project-1").await.unwrap();
1609        assert_eq!(updated.max_chunks, 5000);
1610    }
1611}