1use crate::types::{
5 ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6 MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier, ProjectMemoryStats,
7 DEFAULT_EMBEDDING_DIMENSION,
8};
9use chrono::{DateTime, Utc};
10use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
11use sqlite_vec::sqlite3_vec_init;
12use std::collections::HashSet;
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18type ProjectIndexStatusRow = (
19 Option<String>,
20 Option<i64>,
21 Option<i64>,
22 Option<i64>,
23 Option<i64>,
24 Option<i64>,
25);
26
27pub struct MemoryDatabase {
29 conn: Arc<Mutex<Connection>>,
30 db_path: std::path::PathBuf,
31}
32
33impl MemoryDatabase {
34 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
36 unsafe {
38 sqlite3_auto_extension(Some(std::mem::transmute::<
39 *const (),
40 unsafe extern "C" fn(
41 *mut rusqlite::ffi::sqlite3,
42 *mut *mut i8,
43 *const rusqlite::ffi::sqlite3_api_routines,
44 ) -> i32,
45 >(sqlite3_vec_init as *const ())));
46 }
47
48 let conn = Connection::open(db_path)?;
49 conn.busy_timeout(Duration::from_secs(10))?;
50
51 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
54 conn.execute("PRAGMA synchronous = NORMAL", [])?;
55
56 let db = Self {
57 conn: Arc::new(Mutex::new(conn)),
58 db_path: db_path.to_path_buf(),
59 };
60
61 db.init_schema().await?;
63 if let Err(err) = db.validate_vector_tables().await {
64 match &err {
65 crate::types::MemoryError::Database(db_err)
66 if Self::is_vector_table_error(db_err) =>
67 {
68 tracing::warn!(
69 "Detected vector table corruption during startup ({}). Recreating vector tables.",
70 db_err
71 );
72 db.recreate_vector_tables().await?;
73 }
74 _ => return Err(err),
75 }
76 }
77 db.validate_integrity().await?;
78
79 Ok(db)
80 }
81
82 async fn validate_integrity(&self) -> MemoryResult<()> {
84 let conn = self.conn.lock().await;
85 let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
86 {
87 Ok(value) => value,
88 Err(err) => {
89 tracing::warn!(
93 "Skipping strict PRAGMA quick_check due to probe error: {}",
94 err
95 );
96 return Ok(());
97 }
98 };
99 if check.trim().eq_ignore_ascii_case("ok") {
100 return Ok(());
101 }
102
103 let lowered = check.to_lowercase();
104 if lowered.contains("malformed")
105 || lowered.contains("corrupt")
106 || lowered.contains("database disk image is malformed")
107 {
108 return Err(crate::types::MemoryError::InvalidConfig(format!(
109 "malformed database integrity check: {}",
110 check
111 )));
112 }
113
114 tracing::warn!(
115 "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
116 check
117 );
118 Ok(())
119 }
120
121 async fn init_schema(&self) -> MemoryResult<()> {
123 let conn = self.conn.lock().await;
124
125 conn.execute(
129 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
130 id TEXT PRIMARY KEY,
131 content TEXT NOT NULL,
132 session_id TEXT NOT NULL,
133 project_id TEXT,
134 source TEXT NOT NULL,
135 created_at TEXT NOT NULL,
136 token_count INTEGER NOT NULL DEFAULT 0,
137 metadata TEXT
138 )",
139 [],
140 )?;
141
142 conn.execute(
144 &format!(
145 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
146 chunk_id TEXT PRIMARY KEY,
147 embedding float[{}]
148 )",
149 DEFAULT_EMBEDDING_DIMENSION
150 ),
151 [],
152 )?;
153
154 conn.execute(
156 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
157 id TEXT PRIMARY KEY,
158 content TEXT NOT NULL,
159 project_id TEXT NOT NULL,
160 session_id TEXT,
161 source TEXT NOT NULL,
162 created_at TEXT NOT NULL,
163 token_count INTEGER NOT NULL DEFAULT 0,
164 metadata TEXT
165 )",
166 [],
167 )?;
168
169 let existing_cols: HashSet<String> = {
172 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
173 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
174 rows.collect::<Result<HashSet<_>, _>>()?
175 };
176
177 if !existing_cols.contains("source_path") {
178 conn.execute(
179 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
180 [],
181 )?;
182 }
183 if !existing_cols.contains("source_mtime") {
184 conn.execute(
185 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
186 [],
187 )?;
188 }
189 if !existing_cols.contains("source_size") {
190 conn.execute(
191 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
192 [],
193 )?;
194 }
195 if !existing_cols.contains("source_hash") {
196 conn.execute(
197 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
198 [],
199 )?;
200 }
201
202 conn.execute(
204 &format!(
205 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
206 chunk_id TEXT PRIMARY KEY,
207 embedding float[{}]
208 )",
209 DEFAULT_EMBEDDING_DIMENSION
210 ),
211 [],
212 )?;
213
214 conn.execute(
216 "CREATE TABLE IF NOT EXISTS project_file_index (
217 project_id TEXT NOT NULL,
218 path TEXT NOT NULL,
219 mtime INTEGER NOT NULL,
220 size INTEGER NOT NULL,
221 hash TEXT NOT NULL,
222 indexed_at TEXT NOT NULL,
223 PRIMARY KEY(project_id, path)
224 )",
225 [],
226 )?;
227
228 conn.execute(
229 "CREATE TABLE IF NOT EXISTS project_index_status (
230 project_id TEXT PRIMARY KEY,
231 last_indexed_at TEXT,
232 last_total_files INTEGER,
233 last_processed_files INTEGER,
234 last_indexed_files INTEGER,
235 last_skipped_files INTEGER,
236 last_errors INTEGER
237 )",
238 [],
239 )?;
240
241 conn.execute(
243 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
244 id TEXT PRIMARY KEY,
245 content TEXT NOT NULL,
246 source TEXT NOT NULL,
247 created_at TEXT NOT NULL,
248 token_count INTEGER NOT NULL DEFAULT 0,
249 metadata TEXT
250 )",
251 [],
252 )?;
253
254 conn.execute(
256 &format!(
257 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
258 chunk_id TEXT PRIMARY KEY,
259 embedding float[{}]
260 )",
261 DEFAULT_EMBEDDING_DIMENSION
262 ),
263 [],
264 )?;
265
266 conn.execute(
268 "CREATE TABLE IF NOT EXISTS memory_config (
269 project_id TEXT PRIMARY KEY,
270 max_chunks INTEGER NOT NULL DEFAULT 10000,
271 chunk_size INTEGER NOT NULL DEFAULT 512,
272 retrieval_k INTEGER NOT NULL DEFAULT 5,
273 auto_cleanup INTEGER NOT NULL DEFAULT 1,
274 session_retention_days INTEGER NOT NULL DEFAULT 30,
275 token_budget INTEGER NOT NULL DEFAULT 5000,
276 chunk_overlap INTEGER NOT NULL DEFAULT 64,
277 updated_at TEXT NOT NULL
278 )",
279 [],
280 )?;
281
282 conn.execute(
284 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
285 id TEXT PRIMARY KEY,
286 cleanup_type TEXT NOT NULL,
287 tier TEXT NOT NULL,
288 project_id TEXT,
289 session_id TEXT,
290 chunks_deleted INTEGER NOT NULL DEFAULT 0,
291 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
292 created_at TEXT NOT NULL
293 )",
294 [],
295 )?;
296
297 conn.execute(
299 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
300 [],
301 )?;
302 conn.execute(
303 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
304 [],
305 )?;
306 conn.execute(
307 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
308 [],
309 )?;
310 conn.execute(
311 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
312 [],
313 )?;
314 conn.execute(
315 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
316 [],
317 )?;
318 conn.execute(
319 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
320 [],
321 )?;
322
323 conn.execute(
325 "CREATE TABLE IF NOT EXISTS memory_records (
326 id TEXT PRIMARY KEY,
327 user_id TEXT NOT NULL,
328 source_type TEXT NOT NULL,
329 content TEXT NOT NULL,
330 content_hash TEXT NOT NULL,
331 run_id TEXT NOT NULL,
332 session_id TEXT,
333 message_id TEXT,
334 tool_name TEXT,
335 project_tag TEXT,
336 channel_tag TEXT,
337 host_tag TEXT,
338 metadata TEXT,
339 provenance TEXT,
340 redaction_status TEXT NOT NULL,
341 redaction_count INTEGER NOT NULL DEFAULT 0,
342 visibility TEXT NOT NULL DEFAULT 'private',
343 demoted INTEGER NOT NULL DEFAULT 0,
344 score_boost REAL NOT NULL DEFAULT 0.0,
345 created_at_ms INTEGER NOT NULL,
346 updated_at_ms INTEGER NOT NULL,
347 expires_at_ms INTEGER
348 )",
349 [],
350 )?;
351 conn.execute(
352 "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
353 ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
354 [],
355 )?;
356 conn.execute(
357 "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
358 ON memory_records(user_id, created_at_ms DESC)",
359 [],
360 )?;
361 conn.execute(
362 "CREATE INDEX IF NOT EXISTS idx_memory_records_run
363 ON memory_records(run_id)",
364 [],
365 )?;
366 conn.execute(
367 "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
368 id UNINDEXED,
369 user_id UNINDEXED,
370 content
371 )",
372 [],
373 )?;
374 conn.execute(
375 "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
376 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
377 END",
378 [],
379 )?;
380 conn.execute(
381 "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
382 DELETE FROM memory_records_fts WHERE id = old.id;
383 END",
384 [],
385 )?;
386 conn.execute(
387 "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
388 DELETE FROM memory_records_fts WHERE id = old.id;
389 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
390 END",
391 [],
392 )?;
393
394 Ok(())
395 }
396
397 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
400 let conn = self.conn.lock().await;
401 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
402
403 for table in [
404 "session_memory_vectors",
405 "project_memory_vectors",
406 "global_memory_vectors",
407 ] {
408 let sql = format!("SELECT COUNT(*) FROM {}", table);
409 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
410
411 if row_count > 0 {
414 let probe_sql = format!(
415 "SELECT chunk_id, distance
416 FROM {}
417 WHERE embedding MATCH ?1 AND k = 1",
418 table
419 );
420 let mut stmt = conn.prepare(&probe_sql)?;
421 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
422 let _ = rows.next()?;
423 }
424 }
425 Ok(())
426 }
427
428 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
429 let text = err.to_string().to_lowercase();
430 text.contains("vector blob")
431 || text.contains("chunks iter error")
432 || text.contains("chunks iter")
433 || text.contains("internal sqlite-vec error")
434 || text.contains("insert rowids id")
435 || text.contains("sql logic error")
436 || text.contains("database disk image is malformed")
437 || text.contains("session_memory_vectors")
438 || text.contains("project_memory_vectors")
439 || text.contains("global_memory_vectors")
440 || text.contains("vec0")
441 }
442
443 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
444 let conn = self.conn.lock().await;
445
446 for base in [
447 "session_memory_vectors",
448 "project_memory_vectors",
449 "global_memory_vectors",
450 ] {
451 for name in [
453 base.to_string(),
454 format!("{}_chunks", base),
455 format!("{}_info", base),
456 format!("{}_rowids", base),
457 format!("{}_vector_chunks00", base),
458 ] {
459 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
460 conn.execute(&sql, [])?;
461 }
462
463 let like_pattern = format!("{base}_%");
465 let mut stmt = conn.prepare(
466 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
467 )?;
468 let table_names = stmt
469 .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
470 .collect::<Result<Vec<_>, _>>()?;
471 drop(stmt);
472 for name in table_names {
473 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
474 conn.execute(&sql, [])?;
475 }
476 }
477
478 conn.execute(
479 &format!(
480 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
481 chunk_id TEXT PRIMARY KEY,
482 embedding float[{}]
483 )",
484 DEFAULT_EMBEDDING_DIMENSION
485 ),
486 [],
487 )?;
488
489 conn.execute(
490 &format!(
491 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
492 chunk_id TEXT PRIMARY KEY,
493 embedding float[{}]
494 )",
495 DEFAULT_EMBEDDING_DIMENSION
496 ),
497 [],
498 )?;
499
500 conn.execute(
501 &format!(
502 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
503 chunk_id TEXT PRIMARY KEY,
504 embedding float[{}]
505 )",
506 DEFAULT_EMBEDDING_DIMENSION
507 ),
508 [],
509 )?;
510
511 Ok(())
512 }
513
514 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
517 match self.validate_vector_tables().await {
518 Ok(()) => Ok(false),
519 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
520 tracing::warn!(
521 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
522 err
523 );
524 self.recreate_vector_tables().await?;
525 Ok(true)
526 }
527 Err(err) => Err(err),
528 }
529 }
530
531 pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
535 let table_names = {
536 let conn = self.conn.lock().await;
537 let mut stmt = conn.prepare(
538 "SELECT name FROM sqlite_master
539 WHERE type='table'
540 AND name NOT LIKE 'sqlite_%'
541 ORDER BY name",
542 )?;
543 let names = stmt
544 .query_map([], |row| row.get::<_, String>(0))?
545 .collect::<Result<Vec<_>, _>>()?;
546 names
547 };
548
549 {
550 let conn = self.conn.lock().await;
551 for table in table_names {
552 let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
553 let _ = conn.execute(&sql, []);
554 }
555 }
556
557 self.init_schema().await
558 }
559
560 pub async fn try_repair_after_error(
563 &self,
564 err: &crate::types::MemoryError,
565 ) -> MemoryResult<bool> {
566 match err {
567 crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
568 tracing::warn!(
569 "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
570 db_err
571 );
572 self.recreate_vector_tables().await?;
573 Ok(true)
574 }
575 _ => Ok(false),
576 }
577 }
578
579 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
581 let conn = self.conn.lock().await;
582
583 let (chunks_table, vectors_table) = match chunk.tier {
584 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
585 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
586 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
587 };
588
589 let created_at_str = chunk.created_at.to_rfc3339();
590 let metadata_str = chunk
591 .metadata
592 .as_ref()
593 .map(|m| m.to_string())
594 .unwrap_or_default();
595
596 match chunk.tier {
598 MemoryTier::Session => {
599 conn.execute(
600 &format!(
601 "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata)
602 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
603 chunks_table
604 ),
605 params![
606 chunk.id,
607 chunk.content,
608 chunk.session_id.as_ref().unwrap_or(&String::new()),
609 chunk.project_id,
610 chunk.source,
611 created_at_str,
612 chunk.token_count,
613 metadata_str
614 ],
615 )?;
616 }
617 MemoryTier::Project => {
618 conn.execute(
619 &format!(
620 "INSERT INTO {} (
621 id, content, project_id, session_id, source, created_at, token_count, metadata,
622 source_path, source_mtime, source_size, source_hash
623 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
624 chunks_table
625 ),
626 params![
627 chunk.id,
628 chunk.content,
629 chunk.project_id.as_ref().unwrap_or(&String::new()),
630 chunk.session_id,
631 chunk.source,
632 created_at_str,
633 chunk.token_count,
634 metadata_str,
635 chunk.source_path.clone(),
636 chunk.source_mtime,
637 chunk.source_size,
638 chunk.source_hash.clone()
639 ],
640 )?;
641 }
642 MemoryTier::Global => {
643 conn.execute(
644 &format!(
645 "INSERT INTO {} (id, content, source, created_at, token_count, metadata)
646 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
647 chunks_table
648 ),
649 params![
650 chunk.id,
651 chunk.content,
652 chunk.source,
653 created_at_str,
654 chunk.token_count,
655 metadata_str
656 ],
657 )?;
658 }
659 }
660
661 let embedding_json = format!(
663 "[{}]",
664 embedding
665 .iter()
666 .map(|f| f.to_string())
667 .collect::<Vec<_>>()
668 .join(",")
669 );
670 conn.execute(
671 &format!(
672 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
673 vectors_table
674 ),
675 params![chunk.id, embedding_json],
676 )?;
677
678 Ok(())
679 }
680
681 pub async fn search_similar(
683 &self,
684 query_embedding: &[f32],
685 tier: MemoryTier,
686 project_id: Option<&str>,
687 session_id: Option<&str>,
688 limit: i64,
689 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
690 let conn = self.conn.lock().await;
691
692 let (chunks_table, vectors_table) = match tier {
693 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
694 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
695 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
696 };
697
698 let embedding_json = format!(
699 "[{}]",
700 query_embedding
701 .iter()
702 .map(|f| f.to_string())
703 .collect::<Vec<_>>()
704 .join(",")
705 );
706
707 let results = match tier {
709 MemoryTier::Session => {
710 if let Some(sid) = session_id {
711 let sql = format!(
712 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
713 v.distance
714 FROM {} AS v
715 JOIN {} AS c ON v.chunk_id = c.id
716 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
717 ORDER BY v.distance",
718 vectors_table, chunks_table
719 );
720 let mut stmt = conn.prepare(&sql)?;
721 let results = stmt
722 .query_map(params![sid, embedding_json, limit], |row| {
723 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
724 })?
725 .collect::<Result<Vec<_>, _>>()?;
726 results
727 } else if let Some(pid) = project_id {
728 let sql = format!(
729 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
730 v.distance
731 FROM {} AS v
732 JOIN {} AS c ON v.chunk_id = c.id
733 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
734 ORDER BY v.distance",
735 vectors_table, chunks_table
736 );
737 let mut stmt = conn.prepare(&sql)?;
738 let results = stmt
739 .query_map(params![pid, embedding_json, limit], |row| {
740 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
741 })?
742 .collect::<Result<Vec<_>, _>>()?;
743 results
744 } else {
745 let sql = format!(
746 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
747 v.distance
748 FROM {} AS v
749 JOIN {} AS c ON v.chunk_id = c.id
750 WHERE v.embedding MATCH ?1 AND k = ?2
751 ORDER BY v.distance",
752 vectors_table, chunks_table
753 );
754 let mut stmt = conn.prepare(&sql)?;
755 let results = stmt
756 .query_map(params![embedding_json, limit], |row| {
757 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
758 })?
759 .collect::<Result<Vec<_>, _>>()?;
760 results
761 }
762 }
763 MemoryTier::Project => {
764 if let Some(pid) = project_id {
765 let sql = format!(
766 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
767 c.source_path, c.source_mtime, c.source_size, c.source_hash,
768 v.distance
769 FROM {} AS v
770 JOIN {} AS c ON v.chunk_id = c.id
771 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
772 ORDER BY v.distance",
773 vectors_table, chunks_table
774 );
775 let mut stmt = conn.prepare(&sql)?;
776 let results = stmt
777 .query_map(params![pid, embedding_json, limit], |row| {
778 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
779 })?
780 .collect::<Result<Vec<_>, _>>()?;
781 results
782 } else {
783 let sql = format!(
784 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
785 c.source_path, c.source_mtime, c.source_size, c.source_hash,
786 v.distance
787 FROM {} AS v
788 JOIN {} AS c ON v.chunk_id = c.id
789 WHERE v.embedding MATCH ?1 AND k = ?2
790 ORDER BY v.distance",
791 vectors_table, chunks_table
792 );
793 let mut stmt = conn.prepare(&sql)?;
794 let results = stmt
795 .query_map(params![embedding_json, limit], |row| {
796 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
797 })?
798 .collect::<Result<Vec<_>, _>>()?;
799 results
800 }
801 }
802 MemoryTier::Global => {
803 let sql = format!(
804 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
805 v.distance
806 FROM {} AS v
807 JOIN {} AS c ON v.chunk_id = c.id
808 WHERE v.embedding MATCH ?1 AND k = ?2
809 ORDER BY v.distance",
810 vectors_table, chunks_table
811 );
812 let mut stmt = conn.prepare(&sql)?;
813 let results = stmt
814 .query_map(params![embedding_json, limit], |row| {
815 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
816 })?
817 .collect::<Result<Vec<_>, _>>()?;
818 results
819 }
820 };
821
822 Ok(results)
823 }
824
825 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
827 let conn = self.conn.lock().await;
828
829 let mut stmt = conn.prepare(
830 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
831 FROM session_memory_chunks
832 WHERE session_id = ?1
833 ORDER BY created_at DESC",
834 )?;
835
836 let chunks = stmt
837 .query_map(params![session_id], |row| {
838 row_to_chunk(row, MemoryTier::Session)
839 })?
840 .collect::<Result<Vec<_>, _>>()?;
841
842 Ok(chunks)
843 }
844
845 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
847 let conn = self.conn.lock().await;
848
849 let mut stmt = conn.prepare(
850 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
851 source_path, source_mtime, source_size, source_hash
852 FROM project_memory_chunks
853 WHERE project_id = ?1
854 ORDER BY created_at DESC",
855 )?;
856
857 let chunks = stmt
858 .query_map(params![project_id], |row| {
859 row_to_chunk(row, MemoryTier::Project)
860 })?
861 .collect::<Result<Vec<_>, _>>()?;
862
863 Ok(chunks)
864 }
865
866 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
868 let conn = self.conn.lock().await;
869
870 let mut stmt = conn.prepare(
871 "SELECT id, content, source, created_at, token_count, metadata
872 FROM global_memory_chunks
873 ORDER BY created_at DESC
874 LIMIT ?1",
875 )?;
876
877 let chunks = stmt
878 .query_map(params![limit], |row| {
879 let id: String = row.get(0)?;
880 let content: String = row.get(1)?;
881 let source: String = row.get(2)?;
882 let created_at_str: String = row.get(3)?;
883 let token_count: i64 = row.get(4)?;
884 let metadata_str: Option<String> = row.get(5)?;
885
886 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
887 .map_err(|e| {
888 rusqlite::Error::FromSqlConversionFailure(
889 3,
890 rusqlite::types::Type::Text,
891 Box::new(e),
892 )
893 })?
894 .with_timezone(&Utc);
895
896 let metadata = metadata_str
897 .filter(|s| !s.is_empty())
898 .and_then(|s| serde_json::from_str(&s).ok());
899
900 Ok(MemoryChunk {
901 id,
902 content,
903 tier: MemoryTier::Global,
904 session_id: None,
905 project_id: None,
906 source,
907 source_path: None,
908 source_mtime: None,
909 source_size: None,
910 source_hash: None,
911 created_at,
912 token_count,
913 metadata,
914 })
915 })?
916 .collect::<Result<Vec<_>, _>>()?;
917
918 Ok(chunks)
919 }
920
921 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
923 let conn = self.conn.lock().await;
924
925 let count: i64 = conn.query_row(
927 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
928 params![session_id],
929 |row| row.get(0),
930 )?;
931
932 conn.execute(
934 "DELETE FROM session_memory_vectors WHERE chunk_id IN
935 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
936 params![session_id],
937 )?;
938
939 conn.execute(
941 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
942 params![session_id],
943 )?;
944
945 Ok(count as u64)
946 }
947
948 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
950 let conn = self.conn.lock().await;
951
952 let count: i64 = conn.query_row(
954 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
955 params![project_id],
956 |row| row.get(0),
957 )?;
958
959 conn.execute(
961 "DELETE FROM project_memory_vectors WHERE chunk_id IN
962 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
963 params![project_id],
964 )?;
965
966 conn.execute(
968 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
969 params![project_id],
970 )?;
971
972 Ok(count as u64)
973 }
974
975 pub async fn clear_global_memory_by_source_prefix(
977 &self,
978 source_prefix: &str,
979 ) -> MemoryResult<u64> {
980 let conn = self.conn.lock().await;
981 let like = format!("{}%", source_prefix);
982
983 let count: i64 = conn.query_row(
984 "SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
985 params![like],
986 |row| row.get(0),
987 )?;
988
989 conn.execute(
990 "DELETE FROM global_memory_vectors WHERE chunk_id IN
991 (SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
992 params![like],
993 )?;
994
995 conn.execute(
996 "DELETE FROM global_memory_chunks WHERE source LIKE ?1",
997 params![like],
998 )?;
999
1000 Ok(count as u64)
1001 }
1002
1003 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
1005 let conn = self.conn.lock().await;
1006
1007 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
1008 let cutoff_str = cutoff.to_rfc3339();
1009
1010 let count: i64 = conn.query_row(
1012 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
1013 params![cutoff_str],
1014 |row| row.get(0),
1015 )?;
1016
1017 conn.execute(
1019 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1020 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
1021 params![cutoff_str],
1022 )?;
1023
1024 conn.execute(
1026 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1027 params![cutoff_str],
1028 )?;
1029
1030 Ok(count as u64)
1031 }
1032
1033 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1035 let conn = self.conn.lock().await;
1036
1037 let result: Option<MemoryConfig> = conn
1038 .query_row(
1039 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
1040 session_retention_days, token_budget, chunk_overlap
1041 FROM memory_config WHERE project_id = ?1",
1042 params![project_id],
1043 |row| {
1044 Ok(MemoryConfig {
1045 max_chunks: row.get(0)?,
1046 chunk_size: row.get(1)?,
1047 retrieval_k: row.get(2)?,
1048 auto_cleanup: row.get::<_, i64>(3)? != 0,
1049 session_retention_days: row.get(4)?,
1050 token_budget: row.get(5)?,
1051 chunk_overlap: row.get(6)?,
1052 })
1053 },
1054 )
1055 .optional()?;
1056
1057 match result {
1058 Some(config) => Ok(config),
1059 None => {
1060 let config = MemoryConfig::default();
1062 let updated_at = Utc::now().to_rfc3339();
1063
1064 conn.execute(
1065 "INSERT INTO memory_config
1066 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1067 session_retention_days, token_budget, chunk_overlap, updated_at)
1068 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1069 params![
1070 project_id,
1071 config.max_chunks,
1072 config.chunk_size,
1073 config.retrieval_k,
1074 config.auto_cleanup as i64,
1075 config.session_retention_days,
1076 config.token_budget,
1077 config.chunk_overlap,
1078 updated_at
1079 ],
1080 )?;
1081
1082 Ok(config)
1083 }
1084 }
1085 }
1086
1087 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1089 let conn = self.conn.lock().await;
1090
1091 let updated_at = Utc::now().to_rfc3339();
1092
1093 conn.execute(
1094 "INSERT OR REPLACE INTO memory_config
1095 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1096 session_retention_days, token_budget, chunk_overlap, updated_at)
1097 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1098 params![
1099 project_id,
1100 config.max_chunks,
1101 config.chunk_size,
1102 config.retrieval_k,
1103 config.auto_cleanup as i64,
1104 config.session_retention_days,
1105 config.token_budget,
1106 config.chunk_overlap,
1107 updated_at
1108 ],
1109 )?;
1110
1111 Ok(())
1112 }
1113
1114 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1116 let conn = self.conn.lock().await;
1117
1118 let session_chunks: i64 =
1120 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1121 row.get(0)
1122 })?;
1123
1124 let project_chunks: i64 =
1125 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1126 row.get(0)
1127 })?;
1128
1129 let global_chunks: i64 =
1130 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1131 row.get(0)
1132 })?;
1133
1134 let session_bytes: i64 = conn.query_row(
1136 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1137 [],
1138 |row| row.get(0),
1139 )?;
1140
1141 let project_bytes: i64 = conn.query_row(
1142 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1143 [],
1144 |row| row.get(0),
1145 )?;
1146
1147 let global_bytes: i64 = conn.query_row(
1148 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1149 [],
1150 |row| row.get(0),
1151 )?;
1152
1153 let last_cleanup: Option<String> = conn
1155 .query_row(
1156 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1157 [],
1158 |row| row.get(0),
1159 )
1160 .optional()?;
1161
1162 let last_cleanup = last_cleanup.and_then(|s| {
1163 DateTime::parse_from_rfc3339(&s)
1164 .ok()
1165 .map(|dt| dt.with_timezone(&Utc))
1166 });
1167
1168 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1170
1171 Ok(MemoryStats {
1172 total_chunks: session_chunks + project_chunks + global_chunks,
1173 session_chunks,
1174 project_chunks,
1175 global_chunks,
1176 total_bytes: session_bytes + project_bytes + global_bytes,
1177 session_bytes,
1178 project_bytes,
1179 global_bytes,
1180 file_size,
1181 last_cleanup,
1182 })
1183 }
1184
1185 pub async fn log_cleanup(
1187 &self,
1188 cleanup_type: &str,
1189 tier: MemoryTier,
1190 project_id: Option<&str>,
1191 session_id: Option<&str>,
1192 chunks_deleted: i64,
1193 bytes_reclaimed: i64,
1194 ) -> MemoryResult<()> {
1195 let conn = self.conn.lock().await;
1196
1197 let id = uuid::Uuid::new_v4().to_string();
1198 let created_at = Utc::now().to_rfc3339();
1199
1200 conn.execute(
1201 "INSERT INTO memory_cleanup_log
1202 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1203 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1204 params![
1205 id,
1206 cleanup_type,
1207 tier.to_string(),
1208 project_id,
1209 session_id,
1210 chunks_deleted,
1211 bytes_reclaimed,
1212 created_at
1213 ],
1214 )?;
1215
1216 Ok(())
1217 }
1218
1219 pub async fn vacuum(&self) -> MemoryResult<()> {
1221 let conn = self.conn.lock().await;
1222 conn.execute("VACUUM", [])?;
1223 Ok(())
1224 }
1225
1226 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1231 let conn = self.conn.lock().await;
1232 let n: i64 = conn.query_row(
1233 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1234 params![project_id],
1235 |row| row.get(0),
1236 )?;
1237 Ok(n)
1238 }
1239
1240 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1241 let conn = self.conn.lock().await;
1242 let exists: Option<i64> = conn
1243 .query_row(
1244 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1245 params![project_id],
1246 |row| row.get(0),
1247 )
1248 .optional()?;
1249 Ok(exists.is_some())
1250 }
1251
1252 pub async fn get_file_index_entry(
1253 &self,
1254 project_id: &str,
1255 path: &str,
1256 ) -> MemoryResult<Option<(i64, i64, String)>> {
1257 let conn = self.conn.lock().await;
1258 let row: Option<(i64, i64, String)> = conn
1259 .query_row(
1260 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1261 params![project_id, path],
1262 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1263 )
1264 .optional()?;
1265 Ok(row)
1266 }
1267
1268 pub async fn upsert_file_index_entry(
1269 &self,
1270 project_id: &str,
1271 path: &str,
1272 mtime: i64,
1273 size: i64,
1274 hash: &str,
1275 ) -> MemoryResult<()> {
1276 let conn = self.conn.lock().await;
1277 let indexed_at = Utc::now().to_rfc3339();
1278 conn.execute(
1279 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1280 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1281 ON CONFLICT(project_id, path) DO UPDATE SET
1282 mtime = excluded.mtime,
1283 size = excluded.size,
1284 hash = excluded.hash,
1285 indexed_at = excluded.indexed_at",
1286 params![project_id, path, mtime, size, hash, indexed_at],
1287 )?;
1288 Ok(())
1289 }
1290
1291 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1292 let conn = self.conn.lock().await;
1293 conn.execute(
1294 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1295 params![project_id, path],
1296 )?;
1297 Ok(())
1298 }
1299
1300 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1301 let conn = self.conn.lock().await;
1302 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1303 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1304 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1305 }
1306
1307 pub async fn delete_project_file_chunks_by_path(
1308 &self,
1309 project_id: &str,
1310 source_path: &str,
1311 ) -> MemoryResult<(i64, i64)> {
1312 let conn = self.conn.lock().await;
1313
1314 let chunks_deleted: i64 = conn.query_row(
1315 "SELECT COUNT(*) FROM project_memory_chunks
1316 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1317 params![project_id, source_path],
1318 |row| row.get(0),
1319 )?;
1320
1321 let bytes_estimated: i64 = conn.query_row(
1322 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1323 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1324 params![project_id, source_path],
1325 |row| row.get(0),
1326 )?;
1327
1328 conn.execute(
1330 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1331 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1332 params![project_id, source_path],
1333 )?;
1334
1335 conn.execute(
1336 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1337 params![project_id, source_path],
1338 )?;
1339
1340 Ok((chunks_deleted, bytes_estimated))
1341 }
1342
1343 pub async fn upsert_project_index_status(
1344 &self,
1345 project_id: &str,
1346 total_files: i64,
1347 processed_files: i64,
1348 indexed_files: i64,
1349 skipped_files: i64,
1350 errors: i64,
1351 ) -> MemoryResult<()> {
1352 let conn = self.conn.lock().await;
1353 let last_indexed_at = Utc::now().to_rfc3339();
1354 conn.execute(
1355 "INSERT INTO project_index_status (
1356 project_id, last_indexed_at, last_total_files, last_processed_files,
1357 last_indexed_files, last_skipped_files, last_errors
1358 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1359 ON CONFLICT(project_id) DO UPDATE SET
1360 last_indexed_at = excluded.last_indexed_at,
1361 last_total_files = excluded.last_total_files,
1362 last_processed_files = excluded.last_processed_files,
1363 last_indexed_files = excluded.last_indexed_files,
1364 last_skipped_files = excluded.last_skipped_files,
1365 last_errors = excluded.last_errors",
1366 params![
1367 project_id,
1368 last_indexed_at,
1369 total_files,
1370 processed_files,
1371 indexed_files,
1372 skipped_files,
1373 errors
1374 ],
1375 )?;
1376 Ok(())
1377 }
1378
1379 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1380 let conn = self.conn.lock().await;
1381
1382 let project_chunks: i64 = conn.query_row(
1383 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1384 params![project_id],
1385 |row| row.get(0),
1386 )?;
1387
1388 let project_bytes: i64 = conn.query_row(
1389 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1390 params![project_id],
1391 |row| row.get(0),
1392 )?;
1393
1394 let file_index_chunks: i64 = conn.query_row(
1395 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1396 params![project_id],
1397 |row| row.get(0),
1398 )?;
1399
1400 let file_index_bytes: i64 = conn.query_row(
1401 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1402 params![project_id],
1403 |row| row.get(0),
1404 )?;
1405
1406 let indexed_files: i64 = conn.query_row(
1407 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1408 params![project_id],
1409 |row| row.get(0),
1410 )?;
1411
1412 let status_row: Option<ProjectIndexStatusRow> =
1413 conn
1414 .query_row(
1415 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1416 FROM project_index_status WHERE project_id = ?1",
1417 params![project_id],
1418 |row| {
1419 Ok((
1420 row.get(0)?,
1421 row.get(1)?,
1422 row.get(2)?,
1423 row.get(3)?,
1424 row.get(4)?,
1425 row.get(5)?,
1426 ))
1427 },
1428 )
1429 .optional()?;
1430
1431 let (
1432 last_indexed_at,
1433 last_total_files,
1434 last_processed_files,
1435 last_indexed_files,
1436 last_skipped_files,
1437 last_errors,
1438 ) = status_row.unwrap_or((None, None, None, None, None, None));
1439
1440 let last_indexed_at = last_indexed_at.and_then(|s| {
1441 DateTime::parse_from_rfc3339(&s)
1442 .ok()
1443 .map(|dt| dt.with_timezone(&Utc))
1444 });
1445
1446 Ok(ProjectMemoryStats {
1447 project_id: project_id.to_string(),
1448 project_chunks,
1449 project_bytes,
1450 file_index_chunks,
1451 file_index_bytes,
1452 indexed_files,
1453 last_indexed_at,
1454 last_total_files,
1455 last_processed_files,
1456 last_indexed_files,
1457 last_skipped_files,
1458 last_errors,
1459 })
1460 }
1461
1462 pub async fn clear_project_file_index(
1463 &self,
1464 project_id: &str,
1465 vacuum: bool,
1466 ) -> MemoryResult<ClearFileIndexResult> {
1467 let conn = self.conn.lock().await;
1468
1469 let chunks_deleted: i64 = conn.query_row(
1470 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1471 params![project_id],
1472 |row| row.get(0),
1473 )?;
1474
1475 let bytes_estimated: i64 = conn.query_row(
1476 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1477 params![project_id],
1478 |row| row.get(0),
1479 )?;
1480
1481 conn.execute(
1483 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1484 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1485 params![project_id],
1486 )?;
1487
1488 conn.execute(
1490 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1491 params![project_id],
1492 )?;
1493
1494 conn.execute(
1496 "DELETE FROM project_file_index WHERE project_id = ?1",
1497 params![project_id],
1498 )?;
1499 conn.execute(
1500 "DELETE FROM project_index_status WHERE project_id = ?1",
1501 params![project_id],
1502 )?;
1503
1504 drop(conn); if vacuum {
1507 self.vacuum().await?;
1508 }
1509
1510 Ok(ClearFileIndexResult {
1511 chunks_deleted,
1512 bytes_estimated,
1513 did_vacuum: vacuum,
1514 })
1515 }
1516
1517 pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1529 if retention_days == 0 {
1530 return Ok(0);
1531 }
1532
1533 let conn = self.conn.lock().await;
1534
1535 let cutoff =
1537 (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1538
1539 conn.execute(
1541 "DELETE FROM session_memory_vectors
1542 WHERE chunk_id IN (
1543 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1544 )",
1545 params![cutoff],
1546 )?;
1547
1548 let deleted = conn.execute(
1549 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1550 params![cutoff],
1551 )?;
1552
1553 if deleted > 0 {
1554 tracing::info!(
1555 retention_days,
1556 deleted,
1557 "memory hygiene: pruned old session chunks"
1558 );
1559 }
1560
1561 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1562 Ok(deleted as u64)
1563 }
1564
1565 pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
1571 let retention_days = if env_override_days > 0 {
1573 env_override_days
1574 } else {
1575 let conn = self.conn.lock().await;
1577 let days: Option<i64> = conn
1578 .query_row(
1579 "SELECT session_retention_days FROM memory_config
1580 WHERE project_id = '__global__' LIMIT 1",
1581 [],
1582 |row| row.get(0),
1583 )
1584 .ok();
1585 drop(conn);
1586 days.unwrap_or(30) as u32
1587 };
1588
1589 self.prune_old_session_chunks(retention_days).await
1590 }
1591
1592 pub async fn put_global_memory_record(
1593 &self,
1594 record: &GlobalMemoryRecord,
1595 ) -> MemoryResult<GlobalMemoryWriteResult> {
1596 let conn = self.conn.lock().await;
1597
1598 let existing: Option<String> = conn
1599 .query_row(
1600 "SELECT id FROM memory_records
1601 WHERE user_id = ?1
1602 AND source_type = ?2
1603 AND content_hash = ?3
1604 AND run_id = ?4
1605 AND IFNULL(session_id, '') = IFNULL(?5, '')
1606 AND IFNULL(message_id, '') = IFNULL(?6, '')
1607 AND IFNULL(tool_name, '') = IFNULL(?7, '')
1608 LIMIT 1",
1609 params![
1610 record.user_id,
1611 record.source_type,
1612 record.content_hash,
1613 record.run_id,
1614 record.session_id,
1615 record.message_id,
1616 record.tool_name
1617 ],
1618 |row| row.get(0),
1619 )
1620 .optional()?;
1621
1622 if let Some(id) = existing {
1623 return Ok(GlobalMemoryWriteResult {
1624 id,
1625 stored: false,
1626 deduped: true,
1627 });
1628 }
1629
1630 let metadata = record
1631 .metadata
1632 .as_ref()
1633 .map(ToString::to_string)
1634 .unwrap_or_default();
1635 let provenance = record
1636 .provenance
1637 .as_ref()
1638 .map(ToString::to_string)
1639 .unwrap_or_default();
1640 conn.execute(
1641 "INSERT INTO memory_records(
1642 id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
1643 project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
1644 visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
1645 ) VALUES (
1646 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
1647 ?10, ?11, ?12, ?13, ?14, ?15, ?16,
1648 ?17, ?18, ?19, ?20, ?21, ?22
1649 )",
1650 params![
1651 record.id,
1652 record.user_id,
1653 record.source_type,
1654 record.content,
1655 record.content_hash,
1656 record.run_id,
1657 record.session_id,
1658 record.message_id,
1659 record.tool_name,
1660 record.project_tag,
1661 record.channel_tag,
1662 record.host_tag,
1663 metadata,
1664 provenance,
1665 record.redaction_status,
1666 i64::from(record.redaction_count),
1667 record.visibility,
1668 if record.demoted { 1i64 } else { 0i64 },
1669 record.score_boost,
1670 record.created_at_ms as i64,
1671 record.updated_at_ms as i64,
1672 record.expires_at_ms.map(|v| v as i64),
1673 ],
1674 )?;
1675
1676 Ok(GlobalMemoryWriteResult {
1677 id: record.id.clone(),
1678 stored: true,
1679 deduped: false,
1680 })
1681 }
1682
1683 #[allow(clippy::too_many_arguments)]
1684 pub async fn search_global_memory(
1685 &self,
1686 user_id: &str,
1687 query: &str,
1688 limit: i64,
1689 project_tag: Option<&str>,
1690 channel_tag: Option<&str>,
1691 host_tag: Option<&str>,
1692 ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
1693 let conn = self.conn.lock().await;
1694 let now_ms = chrono::Utc::now().timestamp_millis();
1695 let mut hits = Vec::new();
1696
1697 let fts_query = build_fts_query(query);
1698 let search_limit = limit.clamp(1, 100);
1699 let maybe_rows = conn.prepare(
1700 "SELECT
1701 m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
1702 m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
1703 m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
1704 m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
1705 bm25(memory_records_fts) AS rank
1706 FROM memory_records_fts
1707 JOIN memory_records m ON m.id = memory_records_fts.id
1708 WHERE memory_records_fts MATCH ?1
1709 AND m.user_id = ?2
1710 AND m.demoted = 0
1711 AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
1712 AND (?4 IS NULL OR m.project_tag = ?4)
1713 AND (?5 IS NULL OR m.channel_tag = ?5)
1714 AND (?6 IS NULL OR m.host_tag = ?6)
1715 ORDER BY rank ASC
1716 LIMIT ?7"
1717 );
1718
1719 if let Ok(mut stmt) = maybe_rows {
1720 let rows = stmt.query_map(
1721 params![
1722 fts_query,
1723 user_id,
1724 now_ms,
1725 project_tag,
1726 channel_tag,
1727 host_tag,
1728 search_limit
1729 ],
1730 |row| {
1731 let record = row_to_global_record(row)?;
1732 let rank = row.get::<_, f64>(22)?;
1733 let score = 1.0 / (1.0 + rank.max(0.0));
1734 Ok(GlobalMemorySearchHit { record, score })
1735 },
1736 )?;
1737 for row in rows {
1738 hits.push(row?);
1739 }
1740 }
1741
1742 if !hits.is_empty() {
1743 return Ok(hits);
1744 }
1745
1746 let like = format!("%{}%", query.trim());
1747 let mut stmt = conn.prepare(
1748 "SELECT
1749 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1750 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1751 redaction_status, redaction_count, visibility, demoted, score_boost,
1752 created_at_ms, updated_at_ms, expires_at_ms
1753 FROM memory_records
1754 WHERE user_id = ?1
1755 AND demoted = 0
1756 AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
1757 AND (?3 IS NULL OR project_tag = ?3)
1758 AND (?4 IS NULL OR channel_tag = ?4)
1759 AND (?5 IS NULL OR host_tag = ?5)
1760 AND (?6 = '' OR content LIKE ?7)
1761 ORDER BY created_at_ms DESC
1762 LIMIT ?8",
1763 )?;
1764 let rows = stmt.query_map(
1765 params![
1766 user_id,
1767 now_ms,
1768 project_tag,
1769 channel_tag,
1770 host_tag,
1771 query.trim(),
1772 like,
1773 search_limit
1774 ],
1775 |row| {
1776 let record = row_to_global_record(row)?;
1777 Ok(GlobalMemorySearchHit {
1778 record,
1779 score: 0.25,
1780 })
1781 },
1782 )?;
1783 for row in rows {
1784 hits.push(row?);
1785 }
1786
1787 Ok(hits)
1788 }
1789
1790 pub async fn list_global_memory(
1791 &self,
1792 user_id: &str,
1793 q: Option<&str>,
1794 limit: i64,
1795 offset: i64,
1796 ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
1797 let conn = self.conn.lock().await;
1798 let query = q.unwrap_or("").trim();
1799 let like = format!("%{}%", query);
1800 let mut stmt = conn.prepare(
1801 "SELECT
1802 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1803 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1804 redaction_status, redaction_count, visibility, demoted, score_boost,
1805 created_at_ms, updated_at_ms, expires_at_ms
1806 FROM memory_records
1807 WHERE user_id = ?1
1808 AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
1809 ORDER BY created_at_ms DESC
1810 LIMIT ?4 OFFSET ?5",
1811 )?;
1812 let rows = stmt.query_map(
1813 params![user_id, query, like, limit.clamp(1, 1000), offset.max(0)],
1814 row_to_global_record,
1815 )?;
1816 let mut out = Vec::new();
1817 for row in rows {
1818 out.push(row?);
1819 }
1820 Ok(out)
1821 }
1822
1823 pub async fn set_global_memory_visibility(
1824 &self,
1825 id: &str,
1826 visibility: &str,
1827 demoted: bool,
1828 ) -> MemoryResult<bool> {
1829 let conn = self.conn.lock().await;
1830 let now_ms = chrono::Utc::now().timestamp_millis();
1831 let changed = conn.execute(
1832 "UPDATE memory_records
1833 SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
1834 WHERE id = ?1",
1835 params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
1836 )?;
1837 Ok(changed > 0)
1838 }
1839
1840 pub async fn update_global_memory_context(
1841 &self,
1842 id: &str,
1843 visibility: &str,
1844 demoted: bool,
1845 metadata: Option<&serde_json::Value>,
1846 provenance: Option<&serde_json::Value>,
1847 ) -> MemoryResult<bool> {
1848 let conn = self.conn.lock().await;
1849 let now_ms = chrono::Utc::now().timestamp_millis();
1850 let metadata = metadata.map(ToString::to_string).unwrap_or_default();
1851 let provenance = provenance.map(ToString::to_string).unwrap_or_default();
1852 let changed = conn.execute(
1853 "UPDATE memory_records
1854 SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
1855 WHERE id = ?1",
1856 params![
1857 id,
1858 visibility,
1859 if demoted { 1i64 } else { 0i64 },
1860 metadata,
1861 provenance,
1862 now_ms,
1863 ],
1864 )?;
1865 Ok(changed > 0)
1866 }
1867
1868 pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
1869 let conn = self.conn.lock().await;
1870 let mut stmt = conn.prepare(
1871 "SELECT
1872 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1873 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1874 redaction_status, redaction_count, visibility, demoted, score_boost,
1875 created_at_ms, updated_at_ms, expires_at_ms
1876 FROM memory_records
1877 WHERE id = ?1
1878 LIMIT 1",
1879 )?;
1880 let record = stmt
1881 .query_row(params![id], row_to_global_record)
1882 .optional()?;
1883 Ok(record)
1884 }
1885
1886 pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
1887 let conn = self.conn.lock().await;
1888 let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
1889 Ok(changed > 0)
1890 }
1891}
1892
1893fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1895 let id: String = row.get(0)?;
1896 let content: String = row.get(1)?;
1897
1898 let session_id: Option<String> = match tier {
1899 MemoryTier::Session => Some(row.get(2)?),
1900 MemoryTier::Project => row.get(2)?,
1901 MemoryTier::Global => None,
1902 };
1903
1904 let project_id: Option<String> = match tier {
1905 MemoryTier::Session => row.get(3)?,
1906 MemoryTier::Project => Some(row.get(3)?),
1907 MemoryTier::Global => None,
1908 };
1909
1910 let source: String = row.get(4)?;
1911 let created_at_str: String = row.get(5)?;
1912 let token_count: i64 = row.get(6)?;
1913 let metadata_str: Option<String> = row.get(7)?;
1914
1915 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1916 .map_err(|e| {
1917 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1918 })?
1919 .with_timezone(&Utc);
1920
1921 let metadata = metadata_str
1922 .filter(|s| !s.is_empty())
1923 .and_then(|s| serde_json::from_str(&s).ok());
1924
1925 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1926 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1927 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1928 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1929
1930 Ok(MemoryChunk {
1931 id,
1932 content,
1933 tier,
1934 session_id,
1935 project_id,
1936 source,
1937 source_path,
1938 source_mtime,
1939 source_size,
1940 source_hash,
1941 created_at,
1942 token_count,
1943 metadata,
1944 })
1945}
1946
1947fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
1948 let metadata_str: Option<String> = row.get(12)?;
1949 let provenance_str: Option<String> = row.get(13)?;
1950 Ok(GlobalMemoryRecord {
1951 id: row.get(0)?,
1952 user_id: row.get(1)?,
1953 source_type: row.get(2)?,
1954 content: row.get(3)?,
1955 content_hash: row.get(4)?,
1956 run_id: row.get(5)?,
1957 session_id: row.get(6)?,
1958 message_id: row.get(7)?,
1959 tool_name: row.get(8)?,
1960 project_tag: row.get(9)?,
1961 channel_tag: row.get(10)?,
1962 host_tag: row.get(11)?,
1963 metadata: metadata_str
1964 .filter(|s| !s.is_empty())
1965 .and_then(|s| serde_json::from_str(&s).ok()),
1966 provenance: provenance_str
1967 .filter(|s| !s.is_empty())
1968 .and_then(|s| serde_json::from_str(&s).ok()),
1969 redaction_status: row.get(14)?,
1970 redaction_count: row.get::<_, i64>(15)? as u32,
1971 visibility: row.get(16)?,
1972 demoted: row.get::<_, i64>(17)? != 0,
1973 score_boost: row.get(18)?,
1974 created_at_ms: row.get::<_, i64>(19)? as u64,
1975 updated_at_ms: row.get::<_, i64>(20)? as u64,
1976 expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
1977 })
1978}
1979
1980fn build_fts_query(query: &str) -> String {
1981 let tokens = query
1982 .split_whitespace()
1983 .filter_map(|tok| {
1984 let cleaned =
1985 tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
1986 if cleaned.is_empty() {
1987 None
1988 } else {
1989 Some(format!("\"{}\"", cleaned))
1990 }
1991 })
1992 .collect::<Vec<_>>();
1993 if tokens.is_empty() {
1994 "\"\"".to_string()
1995 } else {
1996 tokens.join(" OR ")
1997 }
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002 use super::*;
2003 use tempfile::TempDir;
2004
2005 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
2006 let temp_dir = TempDir::new().unwrap();
2007 let db_path = temp_dir.path().join("test_memory.db");
2008 let db = MemoryDatabase::new(&db_path).await.unwrap();
2009 (db, temp_dir)
2010 }
2011
2012 #[tokio::test]
2013 async fn test_init_schema() {
2014 let (db, _temp) = setup_test_db().await;
2015 let stats = db.get_stats().await.unwrap();
2017 assert_eq!(stats.total_chunks, 0);
2018 }
2019
2020 #[tokio::test]
2021 async fn test_store_and_retrieve_chunk() {
2022 let (db, _temp) = setup_test_db().await;
2023
2024 let chunk = MemoryChunk {
2025 id: "test-1".to_string(),
2026 content: "Test content".to_string(),
2027 tier: MemoryTier::Session,
2028 session_id: Some("session-1".to_string()),
2029 project_id: Some("project-1".to_string()),
2030 source: "user_message".to_string(),
2031 source_path: None,
2032 source_mtime: None,
2033 source_size: None,
2034 source_hash: None,
2035 created_at: Utc::now(),
2036 token_count: 10,
2037 metadata: None,
2038 };
2039
2040 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
2041 db.store_chunk(&chunk, &embedding).await.unwrap();
2042
2043 let chunks = db.get_session_chunks("session-1").await.unwrap();
2044 assert_eq!(chunks.len(), 1);
2045 assert_eq!(chunks[0].content, "Test content");
2046 }
2047
2048 #[tokio::test]
2049 async fn test_config_crud() {
2050 let (db, _temp) = setup_test_db().await;
2051
2052 let config = db.get_or_create_config("project-1").await.unwrap();
2053 assert_eq!(config.max_chunks, 10000);
2054
2055 let new_config = MemoryConfig {
2056 max_chunks: 5000,
2057 ..Default::default()
2058 };
2059 db.update_config("project-1", &new_config).await.unwrap();
2060
2061 let updated = db.get_or_create_config("project-1").await.unwrap();
2062 assert_eq!(updated.max_chunks, 5000);
2063 }
2064
2065 #[tokio::test]
2066 async fn test_global_memory_put_search_and_dedup() {
2067 let (db, _temp) = setup_test_db().await;
2068 let now = chrono::Utc::now().timestamp_millis() as u64;
2069 let record = GlobalMemoryRecord {
2070 id: "gm-1".to_string(),
2071 user_id: "user-a".to_string(),
2072 source_type: "user_message".to_string(),
2073 content: "remember rust workspace layout".to_string(),
2074 content_hash: "h1".to_string(),
2075 run_id: "run-1".to_string(),
2076 session_id: Some("s1".to_string()),
2077 message_id: Some("m1".to_string()),
2078 tool_name: None,
2079 project_tag: Some("proj-x".to_string()),
2080 channel_tag: None,
2081 host_tag: None,
2082 metadata: None,
2083 provenance: None,
2084 redaction_status: "passed".to_string(),
2085 redaction_count: 0,
2086 visibility: "private".to_string(),
2087 demoted: false,
2088 score_boost: 0.0,
2089 created_at_ms: now,
2090 updated_at_ms: now,
2091 expires_at_ms: None,
2092 };
2093 let first = db.put_global_memory_record(&record).await.unwrap();
2094 assert!(first.stored);
2095 let second = db.put_global_memory_record(&record).await.unwrap();
2096 assert!(second.deduped);
2097
2098 let hits = db
2099 .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
2100 .await
2101 .unwrap();
2102 assert!(!hits.is_empty());
2103 assert_eq!(hits[0].record.id, "gm-1");
2104 }
2105}