1use crate::types::{
5 ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6 MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier, ProjectMemoryStats,
7 DEFAULT_EMBEDDING_DIMENSION,
8};
9use chrono::{DateTime, Utc};
10use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
11use sqlite_vec::sqlite3_vec_init;
12use std::collections::HashSet;
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18type ProjectIndexStatusRow = (
19 Option<String>,
20 Option<i64>,
21 Option<i64>,
22 Option<i64>,
23 Option<i64>,
24 Option<i64>,
25);
26
27pub struct MemoryDatabase {
29 conn: Arc<Mutex<Connection>>,
30 db_path: std::path::PathBuf,
31}
32
33impl MemoryDatabase {
34 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
36 unsafe {
38 sqlite3_auto_extension(Some(std::mem::transmute::<
39 *const (),
40 unsafe extern "C" fn(
41 *mut rusqlite::ffi::sqlite3,
42 *mut *mut i8,
43 *const rusqlite::ffi::sqlite3_api_routines,
44 ) -> i32,
45 >(sqlite3_vec_init as *const ())));
46 }
47
48 let conn = Connection::open(db_path)?;
49 conn.busy_timeout(Duration::from_secs(10))?;
50
51 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
54 conn.execute("PRAGMA synchronous = NORMAL", [])?;
55
56 let db = Self {
57 conn: Arc::new(Mutex::new(conn)),
58 db_path: db_path.to_path_buf(),
59 };
60
61 db.init_schema().await?;
63 if let Err(err) = db.validate_vector_tables().await {
64 match &err {
65 crate::types::MemoryError::Database(db_err)
66 if Self::is_vector_table_error(db_err) =>
67 {
68 tracing::warn!(
69 "Detected vector table corruption during startup ({}). Recreating vector tables.",
70 db_err
71 );
72 db.recreate_vector_tables().await?;
73 }
74 _ => return Err(err),
75 }
76 }
77 db.validate_integrity().await?;
78
79 Ok(db)
80 }
81
82 async fn validate_integrity(&self) -> MemoryResult<()> {
84 let conn = self.conn.lock().await;
85 let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
86 {
87 Ok(value) => value,
88 Err(err) => {
89 tracing::warn!(
93 "Skipping strict PRAGMA quick_check due to probe error: {}",
94 err
95 );
96 return Ok(());
97 }
98 };
99 if check.trim().eq_ignore_ascii_case("ok") {
100 return Ok(());
101 }
102
103 let lowered = check.to_lowercase();
104 if lowered.contains("malformed")
105 || lowered.contains("corrupt")
106 || lowered.contains("database disk image is malformed")
107 {
108 return Err(crate::types::MemoryError::InvalidConfig(format!(
109 "malformed database integrity check: {}",
110 check
111 )));
112 }
113
114 tracing::warn!(
115 "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
116 check
117 );
118 Ok(())
119 }
120
121 async fn init_schema(&self) -> MemoryResult<()> {
123 let conn = self.conn.lock().await;
124
125 conn.execute(
129 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
130 id TEXT PRIMARY KEY,
131 content TEXT NOT NULL,
132 session_id TEXT NOT NULL,
133 project_id TEXT,
134 source TEXT NOT NULL,
135 created_at TEXT NOT NULL,
136 token_count INTEGER NOT NULL DEFAULT 0,
137 metadata TEXT
138 )",
139 [],
140 )?;
141
142 conn.execute(
144 &format!(
145 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
146 chunk_id TEXT PRIMARY KEY,
147 embedding float[{}]
148 )",
149 DEFAULT_EMBEDDING_DIMENSION
150 ),
151 [],
152 )?;
153
154 conn.execute(
156 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
157 id TEXT PRIMARY KEY,
158 content TEXT NOT NULL,
159 project_id TEXT NOT NULL,
160 session_id TEXT,
161 source TEXT NOT NULL,
162 created_at TEXT NOT NULL,
163 token_count INTEGER NOT NULL DEFAULT 0,
164 metadata TEXT
165 )",
166 [],
167 )?;
168
169 let existing_cols: HashSet<String> = {
172 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
173 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
174 rows.collect::<Result<HashSet<_>, _>>()?
175 };
176
177 if !existing_cols.contains("source_path") {
178 conn.execute(
179 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
180 [],
181 )?;
182 }
183 if !existing_cols.contains("source_mtime") {
184 conn.execute(
185 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
186 [],
187 )?;
188 }
189 if !existing_cols.contains("source_size") {
190 conn.execute(
191 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
192 [],
193 )?;
194 }
195 if !existing_cols.contains("source_hash") {
196 conn.execute(
197 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
198 [],
199 )?;
200 }
201
202 conn.execute(
204 &format!(
205 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
206 chunk_id TEXT PRIMARY KEY,
207 embedding float[{}]
208 )",
209 DEFAULT_EMBEDDING_DIMENSION
210 ),
211 [],
212 )?;
213
214 conn.execute(
216 "CREATE TABLE IF NOT EXISTS project_file_index (
217 project_id TEXT NOT NULL,
218 path TEXT NOT NULL,
219 mtime INTEGER NOT NULL,
220 size INTEGER NOT NULL,
221 hash TEXT NOT NULL,
222 indexed_at TEXT NOT NULL,
223 PRIMARY KEY(project_id, path)
224 )",
225 [],
226 )?;
227
228 conn.execute(
229 "CREATE TABLE IF NOT EXISTS project_index_status (
230 project_id TEXT PRIMARY KEY,
231 last_indexed_at TEXT,
232 last_total_files INTEGER,
233 last_processed_files INTEGER,
234 last_indexed_files INTEGER,
235 last_skipped_files INTEGER,
236 last_errors INTEGER
237 )",
238 [],
239 )?;
240
241 conn.execute(
243 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
244 id TEXT PRIMARY KEY,
245 content TEXT NOT NULL,
246 source TEXT NOT NULL,
247 created_at TEXT NOT NULL,
248 token_count INTEGER NOT NULL DEFAULT 0,
249 metadata TEXT
250 )",
251 [],
252 )?;
253
254 conn.execute(
256 &format!(
257 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
258 chunk_id TEXT PRIMARY KEY,
259 embedding float[{}]
260 )",
261 DEFAULT_EMBEDDING_DIMENSION
262 ),
263 [],
264 )?;
265
266 conn.execute(
268 "CREATE TABLE IF NOT EXISTS memory_config (
269 project_id TEXT PRIMARY KEY,
270 max_chunks INTEGER NOT NULL DEFAULT 10000,
271 chunk_size INTEGER NOT NULL DEFAULT 512,
272 retrieval_k INTEGER NOT NULL DEFAULT 5,
273 auto_cleanup INTEGER NOT NULL DEFAULT 1,
274 session_retention_days INTEGER NOT NULL DEFAULT 30,
275 token_budget INTEGER NOT NULL DEFAULT 5000,
276 chunk_overlap INTEGER NOT NULL DEFAULT 64,
277 updated_at TEXT NOT NULL
278 )",
279 [],
280 )?;
281
282 conn.execute(
284 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
285 id TEXT PRIMARY KEY,
286 cleanup_type TEXT NOT NULL,
287 tier TEXT NOT NULL,
288 project_id TEXT,
289 session_id TEXT,
290 chunks_deleted INTEGER NOT NULL DEFAULT 0,
291 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
292 created_at TEXT NOT NULL
293 )",
294 [],
295 )?;
296
297 conn.execute(
299 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
300 [],
301 )?;
302 conn.execute(
303 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
304 [],
305 )?;
306 conn.execute(
307 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
308 [],
309 )?;
310 conn.execute(
311 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
312 [],
313 )?;
314 conn.execute(
315 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
316 [],
317 )?;
318 conn.execute(
319 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
320 [],
321 )?;
322
323 conn.execute(
325 "CREATE TABLE IF NOT EXISTS memory_records (
326 id TEXT PRIMARY KEY,
327 user_id TEXT NOT NULL,
328 source_type TEXT NOT NULL,
329 content TEXT NOT NULL,
330 content_hash TEXT NOT NULL,
331 run_id TEXT NOT NULL,
332 session_id TEXT,
333 message_id TEXT,
334 tool_name TEXT,
335 project_tag TEXT,
336 channel_tag TEXT,
337 host_tag TEXT,
338 metadata TEXT,
339 provenance TEXT,
340 redaction_status TEXT NOT NULL,
341 redaction_count INTEGER NOT NULL DEFAULT 0,
342 visibility TEXT NOT NULL DEFAULT 'private',
343 demoted INTEGER NOT NULL DEFAULT 0,
344 score_boost REAL NOT NULL DEFAULT 0.0,
345 created_at_ms INTEGER NOT NULL,
346 updated_at_ms INTEGER NOT NULL,
347 expires_at_ms INTEGER
348 )",
349 [],
350 )?;
351 conn.execute(
352 "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
353 ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
354 [],
355 )?;
356 conn.execute(
357 "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
358 ON memory_records(user_id, created_at_ms DESC)",
359 [],
360 )?;
361 conn.execute(
362 "CREATE INDEX IF NOT EXISTS idx_memory_records_run
363 ON memory_records(run_id)",
364 [],
365 )?;
366 conn.execute(
367 "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
368 id UNINDEXED,
369 user_id UNINDEXED,
370 content
371 )",
372 [],
373 )?;
374 conn.execute(
375 "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
376 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
377 END",
378 [],
379 )?;
380 conn.execute(
381 "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
382 DELETE FROM memory_records_fts WHERE id = old.id;
383 END",
384 [],
385 )?;
386 conn.execute(
387 "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
388 DELETE FROM memory_records_fts WHERE id = old.id;
389 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
390 END",
391 [],
392 )?;
393
394 Ok(())
395 }
396
397 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
400 let conn = self.conn.lock().await;
401 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
402
403 for table in [
404 "session_memory_vectors",
405 "project_memory_vectors",
406 "global_memory_vectors",
407 ] {
408 let sql = format!("SELECT COUNT(*) FROM {}", table);
409 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
410
411 if row_count > 0 {
414 let probe_sql = format!(
415 "SELECT chunk_id, distance
416 FROM {}
417 WHERE embedding MATCH ?1 AND k = 1",
418 table
419 );
420 let mut stmt = conn.prepare(&probe_sql)?;
421 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
422 let _ = rows.next()?;
423 }
424 }
425 Ok(())
426 }
427
428 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
429 let text = err.to_string().to_lowercase();
430 text.contains("vector blob")
431 || text.contains("chunks iter error")
432 || text.contains("chunks iter")
433 || text.contains("internal sqlite-vec error")
434 || text.contains("insert rowids id")
435 || text.contains("sql logic error")
436 || text.contains("database disk image is malformed")
437 || text.contains("session_memory_vectors")
438 || text.contains("project_memory_vectors")
439 || text.contains("global_memory_vectors")
440 || text.contains("vec0")
441 }
442
443 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
444 let conn = self.conn.lock().await;
445
446 for base in [
447 "session_memory_vectors",
448 "project_memory_vectors",
449 "global_memory_vectors",
450 ] {
451 for name in [
453 base.to_string(),
454 format!("{}_chunks", base),
455 format!("{}_info", base),
456 format!("{}_rowids", base),
457 format!("{}_vector_chunks00", base),
458 ] {
459 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
460 conn.execute(&sql, [])?;
461 }
462
463 let like_pattern = format!("{base}_%");
465 let mut stmt = conn.prepare(
466 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
467 )?;
468 let table_names = stmt
469 .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
470 .collect::<Result<Vec<_>, _>>()?;
471 drop(stmt);
472 for name in table_names {
473 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
474 conn.execute(&sql, [])?;
475 }
476 }
477
478 conn.execute(
479 &format!(
480 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
481 chunk_id TEXT PRIMARY KEY,
482 embedding float[{}]
483 )",
484 DEFAULT_EMBEDDING_DIMENSION
485 ),
486 [],
487 )?;
488
489 conn.execute(
490 &format!(
491 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
492 chunk_id TEXT PRIMARY KEY,
493 embedding float[{}]
494 )",
495 DEFAULT_EMBEDDING_DIMENSION
496 ),
497 [],
498 )?;
499
500 conn.execute(
501 &format!(
502 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
503 chunk_id TEXT PRIMARY KEY,
504 embedding float[{}]
505 )",
506 DEFAULT_EMBEDDING_DIMENSION
507 ),
508 [],
509 )?;
510
511 Ok(())
512 }
513
514 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
517 match self.validate_vector_tables().await {
518 Ok(()) => Ok(false),
519 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
520 tracing::warn!(
521 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
522 err
523 );
524 self.recreate_vector_tables().await?;
525 Ok(true)
526 }
527 Err(err) => Err(err),
528 }
529 }
530
531 pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
535 let table_names = {
536 let conn = self.conn.lock().await;
537 let mut stmt = conn.prepare(
538 "SELECT name FROM sqlite_master
539 WHERE type='table'
540 AND name NOT LIKE 'sqlite_%'
541 ORDER BY name",
542 )?;
543 let names = stmt
544 .query_map([], |row| row.get::<_, String>(0))?
545 .collect::<Result<Vec<_>, _>>()?;
546 names
547 };
548
549 {
550 let conn = self.conn.lock().await;
551 for table in table_names {
552 let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
553 let _ = conn.execute(&sql, []);
554 }
555 }
556
557 self.init_schema().await
558 }
559
560 pub async fn try_repair_after_error(
563 &self,
564 err: &crate::types::MemoryError,
565 ) -> MemoryResult<bool> {
566 match err {
567 crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
568 tracing::warn!(
569 "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
570 db_err
571 );
572 self.recreate_vector_tables().await?;
573 Ok(true)
574 }
575 _ => Ok(false),
576 }
577 }
578
579 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
581 let conn = self.conn.lock().await;
582
583 let (chunks_table, vectors_table) = match chunk.tier {
584 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
585 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
586 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
587 };
588
589 let created_at_str = chunk.created_at.to_rfc3339();
590 let metadata_str = chunk
591 .metadata
592 .as_ref()
593 .map(|m| m.to_string())
594 .unwrap_or_default();
595
596 match chunk.tier {
598 MemoryTier::Session => {
599 conn.execute(
600 &format!(
601 "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata)
602 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
603 chunks_table
604 ),
605 params![
606 chunk.id,
607 chunk.content,
608 chunk.session_id.as_ref().unwrap_or(&String::new()),
609 chunk.project_id,
610 chunk.source,
611 created_at_str,
612 chunk.token_count,
613 metadata_str
614 ],
615 )?;
616 }
617 MemoryTier::Project => {
618 conn.execute(
619 &format!(
620 "INSERT INTO {} (
621 id, content, project_id, session_id, source, created_at, token_count, metadata,
622 source_path, source_mtime, source_size, source_hash
623 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
624 chunks_table
625 ),
626 params![
627 chunk.id,
628 chunk.content,
629 chunk.project_id.as_ref().unwrap_or(&String::new()),
630 chunk.session_id,
631 chunk.source,
632 created_at_str,
633 chunk.token_count,
634 metadata_str,
635 chunk.source_path.clone(),
636 chunk.source_mtime,
637 chunk.source_size,
638 chunk.source_hash.clone()
639 ],
640 )?;
641 }
642 MemoryTier::Global => {
643 conn.execute(
644 &format!(
645 "INSERT INTO {} (id, content, source, created_at, token_count, metadata)
646 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
647 chunks_table
648 ),
649 params![
650 chunk.id,
651 chunk.content,
652 chunk.source,
653 created_at_str,
654 chunk.token_count,
655 metadata_str
656 ],
657 )?;
658 }
659 }
660
661 let embedding_json = format!(
663 "[{}]",
664 embedding
665 .iter()
666 .map(|f| f.to_string())
667 .collect::<Vec<_>>()
668 .join(",")
669 );
670 conn.execute(
671 &format!(
672 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
673 vectors_table
674 ),
675 params![chunk.id, embedding_json],
676 )?;
677
678 Ok(())
679 }
680
681 pub async fn search_similar(
683 &self,
684 query_embedding: &[f32],
685 tier: MemoryTier,
686 project_id: Option<&str>,
687 session_id: Option<&str>,
688 limit: i64,
689 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
690 let conn = self.conn.lock().await;
691
692 let (chunks_table, vectors_table) = match tier {
693 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
694 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
695 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
696 };
697
698 let embedding_json = format!(
699 "[{}]",
700 query_embedding
701 .iter()
702 .map(|f| f.to_string())
703 .collect::<Vec<_>>()
704 .join(",")
705 );
706
707 let results = match tier {
709 MemoryTier::Session => {
710 if let Some(sid) = session_id {
711 let sql = format!(
712 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
713 v.distance
714 FROM {} AS v
715 JOIN {} AS c ON v.chunk_id = c.id
716 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
717 ORDER BY v.distance",
718 vectors_table, chunks_table
719 );
720 let mut stmt = conn.prepare(&sql)?;
721 let results = stmt
722 .query_map(params![sid, embedding_json, limit], |row| {
723 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
724 })?
725 .collect::<Result<Vec<_>, _>>()?;
726 results
727 } else if let Some(pid) = project_id {
728 let sql = format!(
729 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
730 v.distance
731 FROM {} AS v
732 JOIN {} AS c ON v.chunk_id = c.id
733 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
734 ORDER BY v.distance",
735 vectors_table, chunks_table
736 );
737 let mut stmt = conn.prepare(&sql)?;
738 let results = stmt
739 .query_map(params![pid, embedding_json, limit], |row| {
740 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
741 })?
742 .collect::<Result<Vec<_>, _>>()?;
743 results
744 } else {
745 let sql = format!(
746 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
747 v.distance
748 FROM {} AS v
749 JOIN {} AS c ON v.chunk_id = c.id
750 WHERE v.embedding MATCH ?1 AND k = ?2
751 ORDER BY v.distance",
752 vectors_table, chunks_table
753 );
754 let mut stmt = conn.prepare(&sql)?;
755 let results = stmt
756 .query_map(params![embedding_json, limit], |row| {
757 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
758 })?
759 .collect::<Result<Vec<_>, _>>()?;
760 results
761 }
762 }
763 MemoryTier::Project => {
764 if let Some(pid) = project_id {
765 let sql = format!(
766 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
767 c.source_path, c.source_mtime, c.source_size, c.source_hash,
768 v.distance
769 FROM {} AS v
770 JOIN {} AS c ON v.chunk_id = c.id
771 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
772 ORDER BY v.distance",
773 vectors_table, chunks_table
774 );
775 let mut stmt = conn.prepare(&sql)?;
776 let results = stmt
777 .query_map(params![pid, embedding_json, limit], |row| {
778 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
779 })?
780 .collect::<Result<Vec<_>, _>>()?;
781 results
782 } else {
783 let sql = format!(
784 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
785 c.source_path, c.source_mtime, c.source_size, c.source_hash,
786 v.distance
787 FROM {} AS v
788 JOIN {} AS c ON v.chunk_id = c.id
789 WHERE v.embedding MATCH ?1 AND k = ?2
790 ORDER BY v.distance",
791 vectors_table, chunks_table
792 );
793 let mut stmt = conn.prepare(&sql)?;
794 let results = stmt
795 .query_map(params![embedding_json, limit], |row| {
796 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
797 })?
798 .collect::<Result<Vec<_>, _>>()?;
799 results
800 }
801 }
802 MemoryTier::Global => {
803 let sql = format!(
804 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
805 v.distance
806 FROM {} AS v
807 JOIN {} AS c ON v.chunk_id = c.id
808 WHERE v.embedding MATCH ?1 AND k = ?2
809 ORDER BY v.distance",
810 vectors_table, chunks_table
811 );
812 let mut stmt = conn.prepare(&sql)?;
813 let results = stmt
814 .query_map(params![embedding_json, limit], |row| {
815 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
816 })?
817 .collect::<Result<Vec<_>, _>>()?;
818 results
819 }
820 };
821
822 Ok(results)
823 }
824
825 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
827 let conn = self.conn.lock().await;
828
829 let mut stmt = conn.prepare(
830 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
831 FROM session_memory_chunks
832 WHERE session_id = ?1
833 ORDER BY created_at DESC",
834 )?;
835
836 let chunks = stmt
837 .query_map(params![session_id], |row| {
838 row_to_chunk(row, MemoryTier::Session)
839 })?
840 .collect::<Result<Vec<_>, _>>()?;
841
842 Ok(chunks)
843 }
844
845 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
847 let conn = self.conn.lock().await;
848
849 let mut stmt = conn.prepare(
850 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
851 source_path, source_mtime, source_size, source_hash
852 FROM project_memory_chunks
853 WHERE project_id = ?1
854 ORDER BY created_at DESC",
855 )?;
856
857 let chunks = stmt
858 .query_map(params![project_id], |row| {
859 row_to_chunk(row, MemoryTier::Project)
860 })?
861 .collect::<Result<Vec<_>, _>>()?;
862
863 Ok(chunks)
864 }
865
866 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
868 let conn = self.conn.lock().await;
869
870 let mut stmt = conn.prepare(
871 "SELECT id, content, source, created_at, token_count, metadata
872 FROM global_memory_chunks
873 ORDER BY created_at DESC
874 LIMIT ?1",
875 )?;
876
877 let chunks = stmt
878 .query_map(params![limit], |row| {
879 let id: String = row.get(0)?;
880 let content: String = row.get(1)?;
881 let source: String = row.get(2)?;
882 let created_at_str: String = row.get(3)?;
883 let token_count: i64 = row.get(4)?;
884 let metadata_str: Option<String> = row.get(5)?;
885
886 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
887 .map_err(|e| {
888 rusqlite::Error::FromSqlConversionFailure(
889 3,
890 rusqlite::types::Type::Text,
891 Box::new(e),
892 )
893 })?
894 .with_timezone(&Utc);
895
896 let metadata = metadata_str
897 .filter(|s| !s.is_empty())
898 .and_then(|s| serde_json::from_str(&s).ok());
899
900 Ok(MemoryChunk {
901 id,
902 content,
903 tier: MemoryTier::Global,
904 session_id: None,
905 project_id: None,
906 source,
907 source_path: None,
908 source_mtime: None,
909 source_size: None,
910 source_hash: None,
911 created_at,
912 token_count,
913 metadata,
914 })
915 })?
916 .collect::<Result<Vec<_>, _>>()?;
917
918 Ok(chunks)
919 }
920
921 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
923 let conn = self.conn.lock().await;
924
925 let count: i64 = conn.query_row(
927 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
928 params![session_id],
929 |row| row.get(0),
930 )?;
931
932 conn.execute(
934 "DELETE FROM session_memory_vectors WHERE chunk_id IN
935 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
936 params![session_id],
937 )?;
938
939 conn.execute(
941 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
942 params![session_id],
943 )?;
944
945 Ok(count as u64)
946 }
947
948 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
950 let conn = self.conn.lock().await;
951
952 let count: i64 = conn.query_row(
954 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
955 params![project_id],
956 |row| row.get(0),
957 )?;
958
959 conn.execute(
961 "DELETE FROM project_memory_vectors WHERE chunk_id IN
962 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
963 params![project_id],
964 )?;
965
966 conn.execute(
968 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
969 params![project_id],
970 )?;
971
972 Ok(count as u64)
973 }
974
975 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
977 let conn = self.conn.lock().await;
978
979 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
980 let cutoff_str = cutoff.to_rfc3339();
981
982 let count: i64 = conn.query_row(
984 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
985 params![cutoff_str],
986 |row| row.get(0),
987 )?;
988
989 conn.execute(
991 "DELETE FROM session_memory_vectors WHERE chunk_id IN
992 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
993 params![cutoff_str],
994 )?;
995
996 conn.execute(
998 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
999 params![cutoff_str],
1000 )?;
1001
1002 Ok(count as u64)
1003 }
1004
1005 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1007 let conn = self.conn.lock().await;
1008
1009 let result: Option<MemoryConfig> = conn
1010 .query_row(
1011 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
1012 session_retention_days, token_budget, chunk_overlap
1013 FROM memory_config WHERE project_id = ?1",
1014 params![project_id],
1015 |row| {
1016 Ok(MemoryConfig {
1017 max_chunks: row.get(0)?,
1018 chunk_size: row.get(1)?,
1019 retrieval_k: row.get(2)?,
1020 auto_cleanup: row.get::<_, i64>(3)? != 0,
1021 session_retention_days: row.get(4)?,
1022 token_budget: row.get(5)?,
1023 chunk_overlap: row.get(6)?,
1024 })
1025 },
1026 )
1027 .optional()?;
1028
1029 match result {
1030 Some(config) => Ok(config),
1031 None => {
1032 let config = MemoryConfig::default();
1034 let updated_at = Utc::now().to_rfc3339();
1035
1036 conn.execute(
1037 "INSERT INTO memory_config
1038 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1039 session_retention_days, token_budget, chunk_overlap, updated_at)
1040 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1041 params![
1042 project_id,
1043 config.max_chunks,
1044 config.chunk_size,
1045 config.retrieval_k,
1046 config.auto_cleanup as i64,
1047 config.session_retention_days,
1048 config.token_budget,
1049 config.chunk_overlap,
1050 updated_at
1051 ],
1052 )?;
1053
1054 Ok(config)
1055 }
1056 }
1057 }
1058
1059 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1061 let conn = self.conn.lock().await;
1062
1063 let updated_at = Utc::now().to_rfc3339();
1064
1065 conn.execute(
1066 "INSERT OR REPLACE INTO memory_config
1067 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1068 session_retention_days, token_budget, chunk_overlap, updated_at)
1069 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1070 params![
1071 project_id,
1072 config.max_chunks,
1073 config.chunk_size,
1074 config.retrieval_k,
1075 config.auto_cleanup as i64,
1076 config.session_retention_days,
1077 config.token_budget,
1078 config.chunk_overlap,
1079 updated_at
1080 ],
1081 )?;
1082
1083 Ok(())
1084 }
1085
1086 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1088 let conn = self.conn.lock().await;
1089
1090 let session_chunks: i64 =
1092 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1093 row.get(0)
1094 })?;
1095
1096 let project_chunks: i64 =
1097 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1098 row.get(0)
1099 })?;
1100
1101 let global_chunks: i64 =
1102 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1103 row.get(0)
1104 })?;
1105
1106 let session_bytes: i64 = conn.query_row(
1108 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1109 [],
1110 |row| row.get(0),
1111 )?;
1112
1113 let project_bytes: i64 = conn.query_row(
1114 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1115 [],
1116 |row| row.get(0),
1117 )?;
1118
1119 let global_bytes: i64 = conn.query_row(
1120 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1121 [],
1122 |row| row.get(0),
1123 )?;
1124
1125 let last_cleanup: Option<String> = conn
1127 .query_row(
1128 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1129 [],
1130 |row| row.get(0),
1131 )
1132 .optional()?;
1133
1134 let last_cleanup = last_cleanup.and_then(|s| {
1135 DateTime::parse_from_rfc3339(&s)
1136 .ok()
1137 .map(|dt| dt.with_timezone(&Utc))
1138 });
1139
1140 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1142
1143 Ok(MemoryStats {
1144 total_chunks: session_chunks + project_chunks + global_chunks,
1145 session_chunks,
1146 project_chunks,
1147 global_chunks,
1148 total_bytes: session_bytes + project_bytes + global_bytes,
1149 session_bytes,
1150 project_bytes,
1151 global_bytes,
1152 file_size,
1153 last_cleanup,
1154 })
1155 }
1156
1157 pub async fn log_cleanup(
1159 &self,
1160 cleanup_type: &str,
1161 tier: MemoryTier,
1162 project_id: Option<&str>,
1163 session_id: Option<&str>,
1164 chunks_deleted: i64,
1165 bytes_reclaimed: i64,
1166 ) -> MemoryResult<()> {
1167 let conn = self.conn.lock().await;
1168
1169 let id = uuid::Uuid::new_v4().to_string();
1170 let created_at = Utc::now().to_rfc3339();
1171
1172 conn.execute(
1173 "INSERT INTO memory_cleanup_log
1174 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1175 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1176 params![
1177 id,
1178 cleanup_type,
1179 tier.to_string(),
1180 project_id,
1181 session_id,
1182 chunks_deleted,
1183 bytes_reclaimed,
1184 created_at
1185 ],
1186 )?;
1187
1188 Ok(())
1189 }
1190
1191 pub async fn vacuum(&self) -> MemoryResult<()> {
1193 let conn = self.conn.lock().await;
1194 conn.execute("VACUUM", [])?;
1195 Ok(())
1196 }
1197
1198 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1203 let conn = self.conn.lock().await;
1204 let n: i64 = conn.query_row(
1205 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1206 params![project_id],
1207 |row| row.get(0),
1208 )?;
1209 Ok(n)
1210 }
1211
1212 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1213 let conn = self.conn.lock().await;
1214 let exists: Option<i64> = conn
1215 .query_row(
1216 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1217 params![project_id],
1218 |row| row.get(0),
1219 )
1220 .optional()?;
1221 Ok(exists.is_some())
1222 }
1223
1224 pub async fn get_file_index_entry(
1225 &self,
1226 project_id: &str,
1227 path: &str,
1228 ) -> MemoryResult<Option<(i64, i64, String)>> {
1229 let conn = self.conn.lock().await;
1230 let row: Option<(i64, i64, String)> = conn
1231 .query_row(
1232 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1233 params![project_id, path],
1234 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1235 )
1236 .optional()?;
1237 Ok(row)
1238 }
1239
1240 pub async fn upsert_file_index_entry(
1241 &self,
1242 project_id: &str,
1243 path: &str,
1244 mtime: i64,
1245 size: i64,
1246 hash: &str,
1247 ) -> MemoryResult<()> {
1248 let conn = self.conn.lock().await;
1249 let indexed_at = Utc::now().to_rfc3339();
1250 conn.execute(
1251 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1252 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1253 ON CONFLICT(project_id, path) DO UPDATE SET
1254 mtime = excluded.mtime,
1255 size = excluded.size,
1256 hash = excluded.hash,
1257 indexed_at = excluded.indexed_at",
1258 params![project_id, path, mtime, size, hash, indexed_at],
1259 )?;
1260 Ok(())
1261 }
1262
1263 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1264 let conn = self.conn.lock().await;
1265 conn.execute(
1266 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1267 params![project_id, path],
1268 )?;
1269 Ok(())
1270 }
1271
1272 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1273 let conn = self.conn.lock().await;
1274 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1275 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1276 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1277 }
1278
1279 pub async fn delete_project_file_chunks_by_path(
1280 &self,
1281 project_id: &str,
1282 source_path: &str,
1283 ) -> MemoryResult<(i64, i64)> {
1284 let conn = self.conn.lock().await;
1285
1286 let chunks_deleted: i64 = conn.query_row(
1287 "SELECT COUNT(*) FROM project_memory_chunks
1288 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1289 params![project_id, source_path],
1290 |row| row.get(0),
1291 )?;
1292
1293 let bytes_estimated: i64 = conn.query_row(
1294 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1295 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1296 params![project_id, source_path],
1297 |row| row.get(0),
1298 )?;
1299
1300 conn.execute(
1302 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1303 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1304 params![project_id, source_path],
1305 )?;
1306
1307 conn.execute(
1308 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1309 params![project_id, source_path],
1310 )?;
1311
1312 Ok((chunks_deleted, bytes_estimated))
1313 }
1314
1315 pub async fn upsert_project_index_status(
1316 &self,
1317 project_id: &str,
1318 total_files: i64,
1319 processed_files: i64,
1320 indexed_files: i64,
1321 skipped_files: i64,
1322 errors: i64,
1323 ) -> MemoryResult<()> {
1324 let conn = self.conn.lock().await;
1325 let last_indexed_at = Utc::now().to_rfc3339();
1326 conn.execute(
1327 "INSERT INTO project_index_status (
1328 project_id, last_indexed_at, last_total_files, last_processed_files,
1329 last_indexed_files, last_skipped_files, last_errors
1330 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1331 ON CONFLICT(project_id) DO UPDATE SET
1332 last_indexed_at = excluded.last_indexed_at,
1333 last_total_files = excluded.last_total_files,
1334 last_processed_files = excluded.last_processed_files,
1335 last_indexed_files = excluded.last_indexed_files,
1336 last_skipped_files = excluded.last_skipped_files,
1337 last_errors = excluded.last_errors",
1338 params![
1339 project_id,
1340 last_indexed_at,
1341 total_files,
1342 processed_files,
1343 indexed_files,
1344 skipped_files,
1345 errors
1346 ],
1347 )?;
1348 Ok(())
1349 }
1350
1351 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1352 let conn = self.conn.lock().await;
1353
1354 let project_chunks: i64 = conn.query_row(
1355 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1356 params![project_id],
1357 |row| row.get(0),
1358 )?;
1359
1360 let project_bytes: i64 = conn.query_row(
1361 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1362 params![project_id],
1363 |row| row.get(0),
1364 )?;
1365
1366 let file_index_chunks: i64 = conn.query_row(
1367 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1368 params![project_id],
1369 |row| row.get(0),
1370 )?;
1371
1372 let file_index_bytes: i64 = conn.query_row(
1373 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1374 params![project_id],
1375 |row| row.get(0),
1376 )?;
1377
1378 let indexed_files: i64 = conn.query_row(
1379 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1380 params![project_id],
1381 |row| row.get(0),
1382 )?;
1383
1384 let status_row: Option<ProjectIndexStatusRow> =
1385 conn
1386 .query_row(
1387 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1388 FROM project_index_status WHERE project_id = ?1",
1389 params![project_id],
1390 |row| {
1391 Ok((
1392 row.get(0)?,
1393 row.get(1)?,
1394 row.get(2)?,
1395 row.get(3)?,
1396 row.get(4)?,
1397 row.get(5)?,
1398 ))
1399 },
1400 )
1401 .optional()?;
1402
1403 let (
1404 last_indexed_at,
1405 last_total_files,
1406 last_processed_files,
1407 last_indexed_files,
1408 last_skipped_files,
1409 last_errors,
1410 ) = status_row.unwrap_or((None, None, None, None, None, None));
1411
1412 let last_indexed_at = last_indexed_at.and_then(|s| {
1413 DateTime::parse_from_rfc3339(&s)
1414 .ok()
1415 .map(|dt| dt.with_timezone(&Utc))
1416 });
1417
1418 Ok(ProjectMemoryStats {
1419 project_id: project_id.to_string(),
1420 project_chunks,
1421 project_bytes,
1422 file_index_chunks,
1423 file_index_bytes,
1424 indexed_files,
1425 last_indexed_at,
1426 last_total_files,
1427 last_processed_files,
1428 last_indexed_files,
1429 last_skipped_files,
1430 last_errors,
1431 })
1432 }
1433
1434 pub async fn clear_project_file_index(
1435 &self,
1436 project_id: &str,
1437 vacuum: bool,
1438 ) -> MemoryResult<ClearFileIndexResult> {
1439 let conn = self.conn.lock().await;
1440
1441 let chunks_deleted: i64 = conn.query_row(
1442 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1443 params![project_id],
1444 |row| row.get(0),
1445 )?;
1446
1447 let bytes_estimated: i64 = conn.query_row(
1448 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1449 params![project_id],
1450 |row| row.get(0),
1451 )?;
1452
1453 conn.execute(
1455 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1456 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1457 params![project_id],
1458 )?;
1459
1460 conn.execute(
1462 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1463 params![project_id],
1464 )?;
1465
1466 conn.execute(
1468 "DELETE FROM project_file_index WHERE project_id = ?1",
1469 params![project_id],
1470 )?;
1471 conn.execute(
1472 "DELETE FROM project_index_status WHERE project_id = ?1",
1473 params![project_id],
1474 )?;
1475
1476 drop(conn); if vacuum {
1479 self.vacuum().await?;
1480 }
1481
1482 Ok(ClearFileIndexResult {
1483 chunks_deleted,
1484 bytes_estimated,
1485 did_vacuum: vacuum,
1486 })
1487 }
1488
1489 pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1501 if retention_days == 0 {
1502 return Ok(0);
1503 }
1504
1505 let conn = self.conn.lock().await;
1506
1507 let cutoff =
1509 (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1510
1511 conn.execute(
1513 "DELETE FROM session_memory_vectors
1514 WHERE chunk_id IN (
1515 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1516 )",
1517 params![cutoff],
1518 )?;
1519
1520 let deleted = conn.execute(
1521 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1522 params![cutoff],
1523 )?;
1524
1525 if deleted > 0 {
1526 tracing::info!(
1527 retention_days,
1528 deleted,
1529 "memory hygiene: pruned old session chunks"
1530 );
1531 }
1532
1533 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1534 Ok(deleted as u64)
1535 }
1536
1537 pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
1543 let retention_days = if env_override_days > 0 {
1545 env_override_days
1546 } else {
1547 let conn = self.conn.lock().await;
1549 let days: Option<i64> = conn
1550 .query_row(
1551 "SELECT session_retention_days FROM memory_config
1552 WHERE project_id = '__global__' LIMIT 1",
1553 [],
1554 |row| row.get(0),
1555 )
1556 .ok();
1557 drop(conn);
1558 days.unwrap_or(30) as u32
1559 };
1560
1561 self.prune_old_session_chunks(retention_days).await
1562 }
1563
1564 pub async fn put_global_memory_record(
1565 &self,
1566 record: &GlobalMemoryRecord,
1567 ) -> MemoryResult<GlobalMemoryWriteResult> {
1568 let conn = self.conn.lock().await;
1569
1570 let existing: Option<String> = conn
1571 .query_row(
1572 "SELECT id FROM memory_records
1573 WHERE user_id = ?1
1574 AND source_type = ?2
1575 AND content_hash = ?3
1576 AND run_id = ?4
1577 AND IFNULL(session_id, '') = IFNULL(?5, '')
1578 AND IFNULL(message_id, '') = IFNULL(?6, '')
1579 AND IFNULL(tool_name, '') = IFNULL(?7, '')
1580 LIMIT 1",
1581 params![
1582 record.user_id,
1583 record.source_type,
1584 record.content_hash,
1585 record.run_id,
1586 record.session_id,
1587 record.message_id,
1588 record.tool_name
1589 ],
1590 |row| row.get(0),
1591 )
1592 .optional()?;
1593
1594 if let Some(id) = existing {
1595 return Ok(GlobalMemoryWriteResult {
1596 id,
1597 stored: false,
1598 deduped: true,
1599 });
1600 }
1601
1602 let metadata = record
1603 .metadata
1604 .as_ref()
1605 .map(ToString::to_string)
1606 .unwrap_or_default();
1607 let provenance = record
1608 .provenance
1609 .as_ref()
1610 .map(ToString::to_string)
1611 .unwrap_or_default();
1612 conn.execute(
1613 "INSERT INTO memory_records(
1614 id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
1615 project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
1616 visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
1617 ) VALUES (
1618 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
1619 ?10, ?11, ?12, ?13, ?14, ?15, ?16,
1620 ?17, ?18, ?19, ?20, ?21, ?22
1621 )",
1622 params![
1623 record.id,
1624 record.user_id,
1625 record.source_type,
1626 record.content,
1627 record.content_hash,
1628 record.run_id,
1629 record.session_id,
1630 record.message_id,
1631 record.tool_name,
1632 record.project_tag,
1633 record.channel_tag,
1634 record.host_tag,
1635 metadata,
1636 provenance,
1637 record.redaction_status,
1638 i64::from(record.redaction_count),
1639 record.visibility,
1640 if record.demoted { 1i64 } else { 0i64 },
1641 record.score_boost,
1642 record.created_at_ms as i64,
1643 record.updated_at_ms as i64,
1644 record.expires_at_ms.map(|v| v as i64),
1645 ],
1646 )?;
1647
1648 Ok(GlobalMemoryWriteResult {
1649 id: record.id.clone(),
1650 stored: true,
1651 deduped: false,
1652 })
1653 }
1654
1655 #[allow(clippy::too_many_arguments)]
1656 pub async fn search_global_memory(
1657 &self,
1658 user_id: &str,
1659 query: &str,
1660 limit: i64,
1661 project_tag: Option<&str>,
1662 channel_tag: Option<&str>,
1663 host_tag: Option<&str>,
1664 ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
1665 let conn = self.conn.lock().await;
1666 let now_ms = chrono::Utc::now().timestamp_millis();
1667 let mut hits = Vec::new();
1668
1669 let fts_query = build_fts_query(query);
1670 let search_limit = limit.clamp(1, 100);
1671 let maybe_rows = conn.prepare(
1672 "SELECT
1673 m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
1674 m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
1675 m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
1676 m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
1677 bm25(memory_records_fts) AS rank
1678 FROM memory_records_fts
1679 JOIN memory_records m ON m.id = memory_records_fts.id
1680 WHERE memory_records_fts MATCH ?1
1681 AND m.user_id = ?2
1682 AND m.demoted = 0
1683 AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
1684 AND (?4 IS NULL OR m.project_tag = ?4)
1685 AND (?5 IS NULL OR m.channel_tag = ?5)
1686 AND (?6 IS NULL OR m.host_tag = ?6)
1687 ORDER BY rank ASC
1688 LIMIT ?7"
1689 );
1690
1691 if let Ok(mut stmt) = maybe_rows {
1692 let rows = stmt.query_map(
1693 params![
1694 fts_query,
1695 user_id,
1696 now_ms,
1697 project_tag,
1698 channel_tag,
1699 host_tag,
1700 search_limit
1701 ],
1702 |row| {
1703 let record = row_to_global_record(row)?;
1704 let rank = row.get::<_, f64>(22)?;
1705 let score = 1.0 / (1.0 + rank.max(0.0));
1706 Ok(GlobalMemorySearchHit { record, score })
1707 },
1708 )?;
1709 for row in rows {
1710 hits.push(row?);
1711 }
1712 }
1713
1714 if !hits.is_empty() {
1715 return Ok(hits);
1716 }
1717
1718 let like = format!("%{}%", query.trim());
1719 let mut stmt = conn.prepare(
1720 "SELECT
1721 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1722 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1723 redaction_status, redaction_count, visibility, demoted, score_boost,
1724 created_at_ms, updated_at_ms, expires_at_ms
1725 FROM memory_records
1726 WHERE user_id = ?1
1727 AND demoted = 0
1728 AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
1729 AND (?3 IS NULL OR project_tag = ?3)
1730 AND (?4 IS NULL OR channel_tag = ?4)
1731 AND (?5 IS NULL OR host_tag = ?5)
1732 AND (?6 = '' OR content LIKE ?7)
1733 ORDER BY created_at_ms DESC
1734 LIMIT ?8",
1735 )?;
1736 let rows = stmt.query_map(
1737 params![
1738 user_id,
1739 now_ms,
1740 project_tag,
1741 channel_tag,
1742 host_tag,
1743 query.trim(),
1744 like,
1745 search_limit
1746 ],
1747 |row| {
1748 let record = row_to_global_record(row)?;
1749 Ok(GlobalMemorySearchHit {
1750 record,
1751 score: 0.25,
1752 })
1753 },
1754 )?;
1755 for row in rows {
1756 hits.push(row?);
1757 }
1758
1759 Ok(hits)
1760 }
1761
1762 pub async fn list_global_memory(
1763 &self,
1764 user_id: &str,
1765 q: Option<&str>,
1766 limit: i64,
1767 offset: i64,
1768 ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
1769 let conn = self.conn.lock().await;
1770 let query = q.unwrap_or("").trim();
1771 let like = format!("%{}%", query);
1772 let mut stmt = conn.prepare(
1773 "SELECT
1774 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1775 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1776 redaction_status, redaction_count, visibility, demoted, score_boost,
1777 created_at_ms, updated_at_ms, expires_at_ms
1778 FROM memory_records
1779 WHERE user_id = ?1
1780 AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
1781 ORDER BY created_at_ms DESC
1782 LIMIT ?4 OFFSET ?5",
1783 )?;
1784 let rows = stmt.query_map(
1785 params![user_id, query, like, limit.clamp(1, 1000), offset.max(0)],
1786 row_to_global_record,
1787 )?;
1788 let mut out = Vec::new();
1789 for row in rows {
1790 out.push(row?);
1791 }
1792 Ok(out)
1793 }
1794
1795 pub async fn set_global_memory_visibility(
1796 &self,
1797 id: &str,
1798 visibility: &str,
1799 demoted: bool,
1800 ) -> MemoryResult<bool> {
1801 let conn = self.conn.lock().await;
1802 let now_ms = chrono::Utc::now().timestamp_millis();
1803 let changed = conn.execute(
1804 "UPDATE memory_records
1805 SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
1806 WHERE id = ?1",
1807 params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
1808 )?;
1809 Ok(changed > 0)
1810 }
1811
1812 pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
1813 let conn = self.conn.lock().await;
1814 let mut stmt = conn.prepare(
1815 "SELECT
1816 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
1817 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
1818 redaction_status, redaction_count, visibility, demoted, score_boost,
1819 created_at_ms, updated_at_ms, expires_at_ms
1820 FROM memory_records
1821 WHERE id = ?1
1822 LIMIT 1",
1823 )?;
1824 let record = stmt
1825 .query_row(params![id], row_to_global_record)
1826 .optional()?;
1827 Ok(record)
1828 }
1829
1830 pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
1831 let conn = self.conn.lock().await;
1832 let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
1833 Ok(changed > 0)
1834 }
1835}
1836
1837fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1839 let id: String = row.get(0)?;
1840 let content: String = row.get(1)?;
1841
1842 let session_id: Option<String> = match tier {
1843 MemoryTier::Session => Some(row.get(2)?),
1844 MemoryTier::Project => row.get(2)?,
1845 MemoryTier::Global => None,
1846 };
1847
1848 let project_id: Option<String> = match tier {
1849 MemoryTier::Session => row.get(3)?,
1850 MemoryTier::Project => Some(row.get(3)?),
1851 MemoryTier::Global => None,
1852 };
1853
1854 let source: String = row.get(4)?;
1855 let created_at_str: String = row.get(5)?;
1856 let token_count: i64 = row.get(6)?;
1857 let metadata_str: Option<String> = row.get(7)?;
1858
1859 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1860 .map_err(|e| {
1861 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1862 })?
1863 .with_timezone(&Utc);
1864
1865 let metadata = metadata_str
1866 .filter(|s| !s.is_empty())
1867 .and_then(|s| serde_json::from_str(&s).ok());
1868
1869 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1870 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1871 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1872 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1873
1874 Ok(MemoryChunk {
1875 id,
1876 content,
1877 tier,
1878 session_id,
1879 project_id,
1880 source,
1881 source_path,
1882 source_mtime,
1883 source_size,
1884 source_hash,
1885 created_at,
1886 token_count,
1887 metadata,
1888 })
1889}
1890
1891fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
1892 let metadata_str: Option<String> = row.get(12)?;
1893 let provenance_str: Option<String> = row.get(13)?;
1894 Ok(GlobalMemoryRecord {
1895 id: row.get(0)?,
1896 user_id: row.get(1)?,
1897 source_type: row.get(2)?,
1898 content: row.get(3)?,
1899 content_hash: row.get(4)?,
1900 run_id: row.get(5)?,
1901 session_id: row.get(6)?,
1902 message_id: row.get(7)?,
1903 tool_name: row.get(8)?,
1904 project_tag: row.get(9)?,
1905 channel_tag: row.get(10)?,
1906 host_tag: row.get(11)?,
1907 metadata: metadata_str
1908 .filter(|s| !s.is_empty())
1909 .and_then(|s| serde_json::from_str(&s).ok()),
1910 provenance: provenance_str
1911 .filter(|s| !s.is_empty())
1912 .and_then(|s| serde_json::from_str(&s).ok()),
1913 redaction_status: row.get(14)?,
1914 redaction_count: row.get::<_, i64>(15)? as u32,
1915 visibility: row.get(16)?,
1916 demoted: row.get::<_, i64>(17)? != 0,
1917 score_boost: row.get(18)?,
1918 created_at_ms: row.get::<_, i64>(19)? as u64,
1919 updated_at_ms: row.get::<_, i64>(20)? as u64,
1920 expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
1921 })
1922}
1923
1924fn build_fts_query(query: &str) -> String {
1925 let tokens = query
1926 .split_whitespace()
1927 .filter_map(|tok| {
1928 let cleaned =
1929 tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
1930 if cleaned.is_empty() {
1931 None
1932 } else {
1933 Some(format!("\"{}\"", cleaned))
1934 }
1935 })
1936 .collect::<Vec<_>>();
1937 if tokens.is_empty() {
1938 "\"\"".to_string()
1939 } else {
1940 tokens.join(" OR ")
1941 }
1942}
1943
1944#[cfg(test)]
1945mod tests {
1946 use super::*;
1947 use tempfile::TempDir;
1948
1949 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
1950 let temp_dir = TempDir::new().unwrap();
1951 let db_path = temp_dir.path().join("test_memory.db");
1952 let db = MemoryDatabase::new(&db_path).await.unwrap();
1953 (db, temp_dir)
1954 }
1955
1956 #[tokio::test]
1957 async fn test_init_schema() {
1958 let (db, _temp) = setup_test_db().await;
1959 let stats = db.get_stats().await.unwrap();
1961 assert_eq!(stats.total_chunks, 0);
1962 }
1963
1964 #[tokio::test]
1965 async fn test_store_and_retrieve_chunk() {
1966 let (db, _temp) = setup_test_db().await;
1967
1968 let chunk = MemoryChunk {
1969 id: "test-1".to_string(),
1970 content: "Test content".to_string(),
1971 tier: MemoryTier::Session,
1972 session_id: Some("session-1".to_string()),
1973 project_id: Some("project-1".to_string()),
1974 source: "user_message".to_string(),
1975 source_path: None,
1976 source_mtime: None,
1977 source_size: None,
1978 source_hash: None,
1979 created_at: Utc::now(),
1980 token_count: 10,
1981 metadata: None,
1982 };
1983
1984 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
1985 db.store_chunk(&chunk, &embedding).await.unwrap();
1986
1987 let chunks = db.get_session_chunks("session-1").await.unwrap();
1988 assert_eq!(chunks.len(), 1);
1989 assert_eq!(chunks[0].content, "Test content");
1990 }
1991
1992 #[tokio::test]
1993 async fn test_config_crud() {
1994 let (db, _temp) = setup_test_db().await;
1995
1996 let config = db.get_or_create_config("project-1").await.unwrap();
1997 assert_eq!(config.max_chunks, 10000);
1998
1999 let new_config = MemoryConfig {
2000 max_chunks: 5000,
2001 ..Default::default()
2002 };
2003 db.update_config("project-1", &new_config).await.unwrap();
2004
2005 let updated = db.get_or_create_config("project-1").await.unwrap();
2006 assert_eq!(updated.max_chunks, 5000);
2007 }
2008
2009 #[tokio::test]
2010 async fn test_global_memory_put_search_and_dedup() {
2011 let (db, _temp) = setup_test_db().await;
2012 let now = chrono::Utc::now().timestamp_millis() as u64;
2013 let record = GlobalMemoryRecord {
2014 id: "gm-1".to_string(),
2015 user_id: "user-a".to_string(),
2016 source_type: "user_message".to_string(),
2017 content: "remember rust workspace layout".to_string(),
2018 content_hash: "h1".to_string(),
2019 run_id: "run-1".to_string(),
2020 session_id: Some("s1".to_string()),
2021 message_id: Some("m1".to_string()),
2022 tool_name: None,
2023 project_tag: Some("proj-x".to_string()),
2024 channel_tag: None,
2025 host_tag: None,
2026 metadata: None,
2027 provenance: None,
2028 redaction_status: "passed".to_string(),
2029 redaction_count: 0,
2030 visibility: "private".to_string(),
2031 demoted: false,
2032 score_boost: 0.0,
2033 created_at_ms: now,
2034 updated_at_ms: now,
2035 expires_at_ms: None,
2036 };
2037 let first = db.put_global_memory_record(&record).await.unwrap();
2038 assert!(first.stored);
2039 let second = db.put_global_memory_record(&record).await.unwrap();
2040 assert!(second.deduped);
2041
2042 let hits = db
2043 .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
2044 .await
2045 .unwrap();
2046 assert!(!hits.is_empty());
2047 assert_eq!(hits[0].record.id, "gm-1");
2048 }
2049}