1use crate::types::{
5 ClearFileIndexResult, MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier,
6 ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
7};
8use chrono::{DateTime, Utc};
9use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
10use sqlite_vec::sqlite3_vec_init;
11use std::collections::HashSet;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16type ProjectIndexStatusRow = (
17 Option<String>,
18 Option<i64>,
19 Option<i64>,
20 Option<i64>,
21 Option<i64>,
22 Option<i64>,
23);
24
25pub struct MemoryDatabase {
27 conn: Arc<Mutex<Connection>>,
28 db_path: std::path::PathBuf,
29}
30
31impl MemoryDatabase {
32 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
34 unsafe {
36 sqlite3_auto_extension(Some(std::mem::transmute::<
37 *const (),
38 unsafe extern "C" fn(
39 *mut rusqlite::ffi::sqlite3,
40 *mut *mut i8,
41 *const rusqlite::ffi::sqlite3_api_routines,
42 ) -> i32,
43 >(sqlite3_vec_init as *const ())));
44 }
45
46 let conn = Connection::open(db_path)?;
47
48 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
51 conn.execute("PRAGMA synchronous = NORMAL", [])?;
52
53 let db = Self {
54 conn: Arc::new(Mutex::new(conn)),
55 db_path: db_path.to_path_buf(),
56 };
57
58 db.init_schema().await?;
60 if let Err(err) = db.validate_vector_tables().await {
61 match &err {
62 crate::types::MemoryError::Database(db_err)
63 if Self::is_vector_table_error(db_err) =>
64 {
65 tracing::warn!(
66 "Detected vector table corruption during startup ({}). Recreating vector tables.",
67 db_err
68 );
69 db.recreate_vector_tables().await?;
70 }
71 _ => return Err(err),
72 }
73 }
74 db.validate_integrity().await?;
75
76 Ok(db)
77 }
78
79 async fn validate_integrity(&self) -> MemoryResult<()> {
81 let conn = self.conn.lock().await;
82 let check: String = conn.query_row("PRAGMA quick_check(1)", [], |row| row.get(0))?;
83 if check.trim().eq_ignore_ascii_case("ok") {
84 return Ok(());
85 }
86
87 Err(crate::types::MemoryError::InvalidConfig(format!(
88 "malformed database integrity check: {}",
89 check
90 )))
91 }
92
93 async fn init_schema(&self) -> MemoryResult<()> {
95 let conn = self.conn.lock().await;
96
97 conn.execute(
101 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
102 id TEXT PRIMARY KEY,
103 content TEXT NOT NULL,
104 session_id TEXT NOT NULL,
105 project_id TEXT,
106 source TEXT NOT NULL,
107 created_at TEXT NOT NULL,
108 token_count INTEGER NOT NULL DEFAULT 0,
109 metadata TEXT
110 )",
111 [],
112 )?;
113
114 conn.execute(
116 &format!(
117 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
118 chunk_id TEXT PRIMARY KEY,
119 embedding float[{}]
120 )",
121 DEFAULT_EMBEDDING_DIMENSION
122 ),
123 [],
124 )?;
125
126 conn.execute(
128 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
129 id TEXT PRIMARY KEY,
130 content TEXT NOT NULL,
131 project_id TEXT NOT NULL,
132 session_id TEXT,
133 source TEXT NOT NULL,
134 created_at TEXT NOT NULL,
135 token_count INTEGER NOT NULL DEFAULT 0,
136 metadata TEXT
137 )",
138 [],
139 )?;
140
141 let existing_cols: HashSet<String> = {
144 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
145 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
146 rows.collect::<Result<HashSet<_>, _>>()?
147 };
148
149 if !existing_cols.contains("source_path") {
150 conn.execute(
151 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
152 [],
153 )?;
154 }
155 if !existing_cols.contains("source_mtime") {
156 conn.execute(
157 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
158 [],
159 )?;
160 }
161 if !existing_cols.contains("source_size") {
162 conn.execute(
163 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
164 [],
165 )?;
166 }
167 if !existing_cols.contains("source_hash") {
168 conn.execute(
169 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
170 [],
171 )?;
172 }
173
174 conn.execute(
176 &format!(
177 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
178 chunk_id TEXT PRIMARY KEY,
179 embedding float[{}]
180 )",
181 DEFAULT_EMBEDDING_DIMENSION
182 ),
183 [],
184 )?;
185
186 conn.execute(
188 "CREATE TABLE IF NOT EXISTS project_file_index (
189 project_id TEXT NOT NULL,
190 path TEXT NOT NULL,
191 mtime INTEGER NOT NULL,
192 size INTEGER NOT NULL,
193 hash TEXT NOT NULL,
194 indexed_at TEXT NOT NULL,
195 PRIMARY KEY(project_id, path)
196 )",
197 [],
198 )?;
199
200 conn.execute(
201 "CREATE TABLE IF NOT EXISTS project_index_status (
202 project_id TEXT PRIMARY KEY,
203 last_indexed_at TEXT,
204 last_total_files INTEGER,
205 last_processed_files INTEGER,
206 last_indexed_files INTEGER,
207 last_skipped_files INTEGER,
208 last_errors INTEGER
209 )",
210 [],
211 )?;
212
213 conn.execute(
215 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
216 id TEXT PRIMARY KEY,
217 content TEXT NOT NULL,
218 source TEXT NOT NULL,
219 created_at TEXT NOT NULL,
220 token_count INTEGER NOT NULL DEFAULT 0,
221 metadata TEXT
222 )",
223 [],
224 )?;
225
226 conn.execute(
228 &format!(
229 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
230 chunk_id TEXT PRIMARY KEY,
231 embedding float[{}]
232 )",
233 DEFAULT_EMBEDDING_DIMENSION
234 ),
235 [],
236 )?;
237
238 conn.execute(
240 "CREATE TABLE IF NOT EXISTS memory_config (
241 project_id TEXT PRIMARY KEY,
242 max_chunks INTEGER NOT NULL DEFAULT 10000,
243 chunk_size INTEGER NOT NULL DEFAULT 512,
244 retrieval_k INTEGER NOT NULL DEFAULT 5,
245 auto_cleanup INTEGER NOT NULL DEFAULT 1,
246 session_retention_days INTEGER NOT NULL DEFAULT 30,
247 token_budget INTEGER NOT NULL DEFAULT 5000,
248 chunk_overlap INTEGER NOT NULL DEFAULT 64,
249 updated_at TEXT NOT NULL
250 )",
251 [],
252 )?;
253
254 conn.execute(
256 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
257 id TEXT PRIMARY KEY,
258 cleanup_type TEXT NOT NULL,
259 tier TEXT NOT NULL,
260 project_id TEXT,
261 session_id TEXT,
262 chunks_deleted INTEGER NOT NULL DEFAULT 0,
263 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
264 created_at TEXT NOT NULL
265 )",
266 [],
267 )?;
268
269 conn.execute(
271 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
272 [],
273 )?;
274 conn.execute(
275 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
276 [],
277 )?;
278 conn.execute(
279 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
280 [],
281 )?;
282 conn.execute(
283 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
284 [],
285 )?;
286 conn.execute(
287 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
288 [],
289 )?;
290 conn.execute(
291 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
292 [],
293 )?;
294
295 Ok(())
296 }
297
298 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
301 let conn = self.conn.lock().await;
302 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
303
304 for table in [
305 "session_memory_vectors",
306 "project_memory_vectors",
307 "global_memory_vectors",
308 ] {
309 let sql = format!("SELECT COUNT(*) FROM {}", table);
310 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
311
312 if row_count > 0 {
315 let probe_sql = format!(
316 "SELECT chunk_id, distance
317 FROM {}
318 WHERE embedding MATCH ?1 AND k = 1",
319 table
320 );
321 let mut stmt = conn.prepare(&probe_sql)?;
322 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
323 let _ = rows.next()?;
324 }
325 }
326 Ok(())
327 }
328
329 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
330 let text = err.to_string().to_lowercase();
331 text.contains("vector blob")
332 || text.contains("session_memory_vectors")
333 || text.contains("project_memory_vectors")
334 || text.contains("global_memory_vectors")
335 || text.contains("vec0")
336 }
337
338 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
339 let conn = self.conn.lock().await;
340
341 conn.execute("DROP TABLE IF EXISTS session_memory_vectors", [])?;
342 conn.execute("DROP TABLE IF EXISTS project_memory_vectors", [])?;
343 conn.execute("DROP TABLE IF EXISTS global_memory_vectors", [])?;
344
345 conn.execute(
346 &format!(
347 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
348 chunk_id TEXT PRIMARY KEY,
349 embedding float[{}]
350 )",
351 DEFAULT_EMBEDDING_DIMENSION
352 ),
353 [],
354 )?;
355
356 conn.execute(
357 &format!(
358 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
359 chunk_id TEXT PRIMARY KEY,
360 embedding float[{}]
361 )",
362 DEFAULT_EMBEDDING_DIMENSION
363 ),
364 [],
365 )?;
366
367 conn.execute(
368 &format!(
369 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
370 chunk_id TEXT PRIMARY KEY,
371 embedding float[{}]
372 )",
373 DEFAULT_EMBEDDING_DIMENSION
374 ),
375 [],
376 )?;
377
378 Ok(())
379 }
380
381 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
384 match self.validate_vector_tables().await {
385 Ok(()) => Ok(false),
386 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
387 tracing::warn!(
388 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
389 err
390 );
391 self.recreate_vector_tables().await?;
392 Ok(true)
393 }
394 Err(err) => Err(err),
395 }
396 }
397
398 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
400 let conn = self.conn.lock().await;
401
402 let (chunks_table, vectors_table) = match chunk.tier {
403 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
404 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
405 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
406 };
407
408 let created_at_str = chunk.created_at.to_rfc3339();
409 let metadata_str = chunk
410 .metadata
411 .as_ref()
412 .map(|m| m.to_string())
413 .unwrap_or_default();
414
415 match chunk.tier {
417 MemoryTier::Session => {
418 conn.execute(
419 &format!(
420 "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata)
421 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
422 chunks_table
423 ),
424 params![
425 chunk.id,
426 chunk.content,
427 chunk.session_id.as_ref().unwrap_or(&String::new()),
428 chunk.project_id,
429 chunk.source,
430 created_at_str,
431 chunk.token_count,
432 metadata_str
433 ],
434 )?;
435 }
436 MemoryTier::Project => {
437 conn.execute(
438 &format!(
439 "INSERT INTO {} (
440 id, content, project_id, session_id, source, created_at, token_count, metadata,
441 source_path, source_mtime, source_size, source_hash
442 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
443 chunks_table
444 ),
445 params![
446 chunk.id,
447 chunk.content,
448 chunk.project_id.as_ref().unwrap_or(&String::new()),
449 chunk.session_id,
450 chunk.source,
451 created_at_str,
452 chunk.token_count,
453 metadata_str,
454 chunk.source_path.clone(),
455 chunk.source_mtime,
456 chunk.source_size,
457 chunk.source_hash.clone()
458 ],
459 )?;
460 }
461 MemoryTier::Global => {
462 conn.execute(
463 &format!(
464 "INSERT INTO {} (id, content, source, created_at, token_count, metadata)
465 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
466 chunks_table
467 ),
468 params![
469 chunk.id,
470 chunk.content,
471 chunk.source,
472 created_at_str,
473 chunk.token_count,
474 metadata_str
475 ],
476 )?;
477 }
478 }
479
480 let embedding_json = format!(
482 "[{}]",
483 embedding
484 .iter()
485 .map(|f| f.to_string())
486 .collect::<Vec<_>>()
487 .join(",")
488 );
489 conn.execute(
490 &format!(
491 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
492 vectors_table
493 ),
494 params![chunk.id, embedding_json],
495 )?;
496
497 Ok(())
498 }
499
500 pub async fn search_similar(
502 &self,
503 query_embedding: &[f32],
504 tier: MemoryTier,
505 project_id: Option<&str>,
506 session_id: Option<&str>,
507 limit: i64,
508 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
509 let conn = self.conn.lock().await;
510
511 let (chunks_table, vectors_table) = match tier {
512 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
513 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
514 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
515 };
516
517 let embedding_json = format!(
518 "[{}]",
519 query_embedding
520 .iter()
521 .map(|f| f.to_string())
522 .collect::<Vec<_>>()
523 .join(",")
524 );
525
526 let results = match tier {
528 MemoryTier::Session => {
529 if let Some(sid) = session_id {
530 let sql = format!(
531 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
532 v.distance
533 FROM {} AS v
534 JOIN {} AS c ON v.chunk_id = c.id
535 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
536 ORDER BY v.distance",
537 vectors_table, chunks_table
538 );
539 let mut stmt = conn.prepare(&sql)?;
540 let results = stmt
541 .query_map(params![sid, embedding_json, limit], |row| {
542 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
543 })?
544 .collect::<Result<Vec<_>, _>>()?;
545 results
546 } else if let Some(pid) = project_id {
547 let sql = format!(
548 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
549 v.distance
550 FROM {} AS v
551 JOIN {} AS c ON v.chunk_id = c.id
552 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
553 ORDER BY v.distance",
554 vectors_table, chunks_table
555 );
556 let mut stmt = conn.prepare(&sql)?;
557 let results = stmt
558 .query_map(params![pid, embedding_json, limit], |row| {
559 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
560 })?
561 .collect::<Result<Vec<_>, _>>()?;
562 results
563 } else {
564 let sql = format!(
565 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
566 v.distance
567 FROM {} AS v
568 JOIN {} AS c ON v.chunk_id = c.id
569 WHERE v.embedding MATCH ?1 AND k = ?2
570 ORDER BY v.distance",
571 vectors_table, chunks_table
572 );
573 let mut stmt = conn.prepare(&sql)?;
574 let results = stmt
575 .query_map(params![embedding_json, limit], |row| {
576 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
577 })?
578 .collect::<Result<Vec<_>, _>>()?;
579 results
580 }
581 }
582 MemoryTier::Project => {
583 if let Some(pid) = project_id {
584 let sql = format!(
585 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
586 c.source_path, c.source_mtime, c.source_size, c.source_hash,
587 v.distance
588 FROM {} AS v
589 JOIN {} AS c ON v.chunk_id = c.id
590 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
591 ORDER BY v.distance",
592 vectors_table, chunks_table
593 );
594 let mut stmt = conn.prepare(&sql)?;
595 let results = stmt
596 .query_map(params![pid, embedding_json, limit], |row| {
597 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
598 })?
599 .collect::<Result<Vec<_>, _>>()?;
600 results
601 } else {
602 let sql = format!(
603 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
604 c.source_path, c.source_mtime, c.source_size, c.source_hash,
605 v.distance
606 FROM {} AS v
607 JOIN {} AS c ON v.chunk_id = c.id
608 WHERE v.embedding MATCH ?1 AND k = ?2
609 ORDER BY v.distance",
610 vectors_table, chunks_table
611 );
612 let mut stmt = conn.prepare(&sql)?;
613 let results = stmt
614 .query_map(params![embedding_json, limit], |row| {
615 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
616 })?
617 .collect::<Result<Vec<_>, _>>()?;
618 results
619 }
620 }
621 MemoryTier::Global => {
622 let sql = format!(
623 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
624 v.distance
625 FROM {} AS v
626 JOIN {} AS c ON v.chunk_id = c.id
627 WHERE v.embedding MATCH ?1 AND k = ?2
628 ORDER BY v.distance",
629 vectors_table, chunks_table
630 );
631 let mut stmt = conn.prepare(&sql)?;
632 let results = stmt
633 .query_map(params![embedding_json, limit], |row| {
634 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
635 })?
636 .collect::<Result<Vec<_>, _>>()?;
637 results
638 }
639 };
640
641 Ok(results)
642 }
643
644 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
646 let conn = self.conn.lock().await;
647
648 let mut stmt = conn.prepare(
649 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
650 FROM session_memory_chunks
651 WHERE session_id = ?1
652 ORDER BY created_at DESC",
653 )?;
654
655 let chunks = stmt
656 .query_map(params![session_id], |row| {
657 row_to_chunk(row, MemoryTier::Session)
658 })?
659 .collect::<Result<Vec<_>, _>>()?;
660
661 Ok(chunks)
662 }
663
664 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
666 let conn = self.conn.lock().await;
667
668 let mut stmt = conn.prepare(
669 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
670 source_path, source_mtime, source_size, source_hash
671 FROM project_memory_chunks
672 WHERE project_id = ?1
673 ORDER BY created_at DESC",
674 )?;
675
676 let chunks = stmt
677 .query_map(params![project_id], |row| {
678 row_to_chunk(row, MemoryTier::Project)
679 })?
680 .collect::<Result<Vec<_>, _>>()?;
681
682 Ok(chunks)
683 }
684
685 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
687 let conn = self.conn.lock().await;
688
689 let mut stmt = conn.prepare(
690 "SELECT id, content, source, created_at, token_count, metadata
691 FROM global_memory_chunks
692 ORDER BY created_at DESC
693 LIMIT ?1",
694 )?;
695
696 let chunks = stmt
697 .query_map(params![limit], |row| {
698 let id: String = row.get(0)?;
699 let content: String = row.get(1)?;
700 let source: String = row.get(2)?;
701 let created_at_str: String = row.get(3)?;
702 let token_count: i64 = row.get(4)?;
703 let metadata_str: Option<String> = row.get(5)?;
704
705 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
706 .map_err(|e| {
707 rusqlite::Error::FromSqlConversionFailure(
708 3,
709 rusqlite::types::Type::Text,
710 Box::new(e),
711 )
712 })?
713 .with_timezone(&Utc);
714
715 let metadata = metadata_str
716 .filter(|s| !s.is_empty())
717 .and_then(|s| serde_json::from_str(&s).ok());
718
719 Ok(MemoryChunk {
720 id,
721 content,
722 tier: MemoryTier::Global,
723 session_id: None,
724 project_id: None,
725 source,
726 source_path: None,
727 source_mtime: None,
728 source_size: None,
729 source_hash: None,
730 created_at,
731 token_count,
732 metadata,
733 })
734 })?
735 .collect::<Result<Vec<_>, _>>()?;
736
737 Ok(chunks)
738 }
739
740 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
742 let conn = self.conn.lock().await;
743
744 let count: i64 = conn.query_row(
746 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
747 params![session_id],
748 |row| row.get(0),
749 )?;
750
751 conn.execute(
753 "DELETE FROM session_memory_vectors WHERE chunk_id IN
754 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
755 params![session_id],
756 )?;
757
758 conn.execute(
760 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
761 params![session_id],
762 )?;
763
764 Ok(count as u64)
765 }
766
767 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
769 let conn = self.conn.lock().await;
770
771 let count: i64 = conn.query_row(
773 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
774 params![project_id],
775 |row| row.get(0),
776 )?;
777
778 conn.execute(
780 "DELETE FROM project_memory_vectors WHERE chunk_id IN
781 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
782 params![project_id],
783 )?;
784
785 conn.execute(
787 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
788 params![project_id],
789 )?;
790
791 Ok(count as u64)
792 }
793
794 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
796 let conn = self.conn.lock().await;
797
798 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
799 let cutoff_str = cutoff.to_rfc3339();
800
801 let count: i64 = conn.query_row(
803 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
804 params![cutoff_str],
805 |row| row.get(0),
806 )?;
807
808 conn.execute(
810 "DELETE FROM session_memory_vectors WHERE chunk_id IN
811 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
812 params![cutoff_str],
813 )?;
814
815 conn.execute(
817 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
818 params![cutoff_str],
819 )?;
820
821 Ok(count as u64)
822 }
823
824 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
826 let conn = self.conn.lock().await;
827
828 let result: Option<MemoryConfig> = conn
829 .query_row(
830 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
831 session_retention_days, token_budget, chunk_overlap
832 FROM memory_config WHERE project_id = ?1",
833 params![project_id],
834 |row| {
835 Ok(MemoryConfig {
836 max_chunks: row.get(0)?,
837 chunk_size: row.get(1)?,
838 retrieval_k: row.get(2)?,
839 auto_cleanup: row.get::<_, i64>(3)? != 0,
840 session_retention_days: row.get(4)?,
841 token_budget: row.get(5)?,
842 chunk_overlap: row.get(6)?,
843 })
844 },
845 )
846 .optional()?;
847
848 match result {
849 Some(config) => Ok(config),
850 None => {
851 let config = MemoryConfig::default();
853 let updated_at = Utc::now().to_rfc3339();
854
855 conn.execute(
856 "INSERT INTO memory_config
857 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
858 session_retention_days, token_budget, chunk_overlap, updated_at)
859 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
860 params![
861 project_id,
862 config.max_chunks,
863 config.chunk_size,
864 config.retrieval_k,
865 config.auto_cleanup as i64,
866 config.session_retention_days,
867 config.token_budget,
868 config.chunk_overlap,
869 updated_at
870 ],
871 )?;
872
873 Ok(config)
874 }
875 }
876 }
877
878 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
880 let conn = self.conn.lock().await;
881
882 let updated_at = Utc::now().to_rfc3339();
883
884 conn.execute(
885 "INSERT OR REPLACE INTO memory_config
886 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
887 session_retention_days, token_budget, chunk_overlap, updated_at)
888 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
889 params![
890 project_id,
891 config.max_chunks,
892 config.chunk_size,
893 config.retrieval_k,
894 config.auto_cleanup as i64,
895 config.session_retention_days,
896 config.token_budget,
897 config.chunk_overlap,
898 updated_at
899 ],
900 )?;
901
902 Ok(())
903 }
904
905 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
907 let conn = self.conn.lock().await;
908
909 let session_chunks: i64 =
911 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
912 row.get(0)
913 })?;
914
915 let project_chunks: i64 =
916 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
917 row.get(0)
918 })?;
919
920 let global_chunks: i64 =
921 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
922 row.get(0)
923 })?;
924
925 let session_bytes: i64 = conn.query_row(
927 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
928 [],
929 |row| row.get(0),
930 )?;
931
932 let project_bytes: i64 = conn.query_row(
933 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
934 [],
935 |row| row.get(0),
936 )?;
937
938 let global_bytes: i64 = conn.query_row(
939 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
940 [],
941 |row| row.get(0),
942 )?;
943
944 let last_cleanup: Option<String> = conn
946 .query_row(
947 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
948 [],
949 |row| row.get(0),
950 )
951 .optional()?;
952
953 let last_cleanup = last_cleanup.and_then(|s| {
954 DateTime::parse_from_rfc3339(&s)
955 .ok()
956 .map(|dt| dt.with_timezone(&Utc))
957 });
958
959 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
961
962 Ok(MemoryStats {
963 total_chunks: session_chunks + project_chunks + global_chunks,
964 session_chunks,
965 project_chunks,
966 global_chunks,
967 total_bytes: session_bytes + project_bytes + global_bytes,
968 session_bytes,
969 project_bytes,
970 global_bytes,
971 file_size,
972 last_cleanup,
973 })
974 }
975
976 pub async fn log_cleanup(
978 &self,
979 cleanup_type: &str,
980 tier: MemoryTier,
981 project_id: Option<&str>,
982 session_id: Option<&str>,
983 chunks_deleted: i64,
984 bytes_reclaimed: i64,
985 ) -> MemoryResult<()> {
986 let conn = self.conn.lock().await;
987
988 let id = uuid::Uuid::new_v4().to_string();
989 let created_at = Utc::now().to_rfc3339();
990
991 conn.execute(
992 "INSERT INTO memory_cleanup_log
993 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
994 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
995 params![
996 id,
997 cleanup_type,
998 tier.to_string(),
999 project_id,
1000 session_id,
1001 chunks_deleted,
1002 bytes_reclaimed,
1003 created_at
1004 ],
1005 )?;
1006
1007 Ok(())
1008 }
1009
1010 pub async fn vacuum(&self) -> MemoryResult<()> {
1012 let conn = self.conn.lock().await;
1013 conn.execute("VACUUM", [])?;
1014 Ok(())
1015 }
1016
1017 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1022 let conn = self.conn.lock().await;
1023 let n: i64 = conn.query_row(
1024 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1025 params![project_id],
1026 |row| row.get(0),
1027 )?;
1028 Ok(n)
1029 }
1030
1031 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1032 let conn = self.conn.lock().await;
1033 let exists: Option<i64> = conn
1034 .query_row(
1035 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1036 params![project_id],
1037 |row| row.get(0),
1038 )
1039 .optional()?;
1040 Ok(exists.is_some())
1041 }
1042
1043 pub async fn get_file_index_entry(
1044 &self,
1045 project_id: &str,
1046 path: &str,
1047 ) -> MemoryResult<Option<(i64, i64, String)>> {
1048 let conn = self.conn.lock().await;
1049 let row: Option<(i64, i64, String)> = conn
1050 .query_row(
1051 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1052 params![project_id, path],
1053 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1054 )
1055 .optional()?;
1056 Ok(row)
1057 }
1058
1059 pub async fn upsert_file_index_entry(
1060 &self,
1061 project_id: &str,
1062 path: &str,
1063 mtime: i64,
1064 size: i64,
1065 hash: &str,
1066 ) -> MemoryResult<()> {
1067 let conn = self.conn.lock().await;
1068 let indexed_at = Utc::now().to_rfc3339();
1069 conn.execute(
1070 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1071 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1072 ON CONFLICT(project_id, path) DO UPDATE SET
1073 mtime = excluded.mtime,
1074 size = excluded.size,
1075 hash = excluded.hash,
1076 indexed_at = excluded.indexed_at",
1077 params![project_id, path, mtime, size, hash, indexed_at],
1078 )?;
1079 Ok(())
1080 }
1081
1082 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1083 let conn = self.conn.lock().await;
1084 conn.execute(
1085 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1086 params![project_id, path],
1087 )?;
1088 Ok(())
1089 }
1090
1091 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1092 let conn = self.conn.lock().await;
1093 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1094 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1095 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1096 }
1097
1098 pub async fn delete_project_file_chunks_by_path(
1099 &self,
1100 project_id: &str,
1101 source_path: &str,
1102 ) -> MemoryResult<(i64, i64)> {
1103 let conn = self.conn.lock().await;
1104
1105 let chunks_deleted: i64 = conn.query_row(
1106 "SELECT COUNT(*) FROM project_memory_chunks
1107 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1108 params![project_id, source_path],
1109 |row| row.get(0),
1110 )?;
1111
1112 let bytes_estimated: i64 = conn.query_row(
1113 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1114 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1115 params![project_id, source_path],
1116 |row| row.get(0),
1117 )?;
1118
1119 conn.execute(
1121 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1122 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1123 params![project_id, source_path],
1124 )?;
1125
1126 conn.execute(
1127 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1128 params![project_id, source_path],
1129 )?;
1130
1131 Ok((chunks_deleted, bytes_estimated))
1132 }
1133
1134 pub async fn upsert_project_index_status(
1135 &self,
1136 project_id: &str,
1137 total_files: i64,
1138 processed_files: i64,
1139 indexed_files: i64,
1140 skipped_files: i64,
1141 errors: i64,
1142 ) -> MemoryResult<()> {
1143 let conn = self.conn.lock().await;
1144 let last_indexed_at = Utc::now().to_rfc3339();
1145 conn.execute(
1146 "INSERT INTO project_index_status (
1147 project_id, last_indexed_at, last_total_files, last_processed_files,
1148 last_indexed_files, last_skipped_files, last_errors
1149 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1150 ON CONFLICT(project_id) DO UPDATE SET
1151 last_indexed_at = excluded.last_indexed_at,
1152 last_total_files = excluded.last_total_files,
1153 last_processed_files = excluded.last_processed_files,
1154 last_indexed_files = excluded.last_indexed_files,
1155 last_skipped_files = excluded.last_skipped_files,
1156 last_errors = excluded.last_errors",
1157 params![
1158 project_id,
1159 last_indexed_at,
1160 total_files,
1161 processed_files,
1162 indexed_files,
1163 skipped_files,
1164 errors
1165 ],
1166 )?;
1167 Ok(())
1168 }
1169
1170 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1171 let conn = self.conn.lock().await;
1172
1173 let project_chunks: i64 = conn.query_row(
1174 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1175 params![project_id],
1176 |row| row.get(0),
1177 )?;
1178
1179 let project_bytes: i64 = conn.query_row(
1180 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1181 params![project_id],
1182 |row| row.get(0),
1183 )?;
1184
1185 let file_index_chunks: i64 = conn.query_row(
1186 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1187 params![project_id],
1188 |row| row.get(0),
1189 )?;
1190
1191 let file_index_bytes: i64 = conn.query_row(
1192 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1193 params![project_id],
1194 |row| row.get(0),
1195 )?;
1196
1197 let indexed_files: i64 = conn.query_row(
1198 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1199 params![project_id],
1200 |row| row.get(0),
1201 )?;
1202
1203 let status_row: Option<ProjectIndexStatusRow> =
1204 conn
1205 .query_row(
1206 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1207 FROM project_index_status WHERE project_id = ?1",
1208 params![project_id],
1209 |row| {
1210 Ok((
1211 row.get(0)?,
1212 row.get(1)?,
1213 row.get(2)?,
1214 row.get(3)?,
1215 row.get(4)?,
1216 row.get(5)?,
1217 ))
1218 },
1219 )
1220 .optional()?;
1221
1222 let (
1223 last_indexed_at,
1224 last_total_files,
1225 last_processed_files,
1226 last_indexed_files,
1227 last_skipped_files,
1228 last_errors,
1229 ) = status_row.unwrap_or((None, None, None, None, None, None));
1230
1231 let last_indexed_at = last_indexed_at.and_then(|s| {
1232 DateTime::parse_from_rfc3339(&s)
1233 .ok()
1234 .map(|dt| dt.with_timezone(&Utc))
1235 });
1236
1237 Ok(ProjectMemoryStats {
1238 project_id: project_id.to_string(),
1239 project_chunks,
1240 project_bytes,
1241 file_index_chunks,
1242 file_index_bytes,
1243 indexed_files,
1244 last_indexed_at,
1245 last_total_files,
1246 last_processed_files,
1247 last_indexed_files,
1248 last_skipped_files,
1249 last_errors,
1250 })
1251 }
1252
1253 pub async fn clear_project_file_index(
1254 &self,
1255 project_id: &str,
1256 vacuum: bool,
1257 ) -> MemoryResult<ClearFileIndexResult> {
1258 let conn = self.conn.lock().await;
1259
1260 let chunks_deleted: i64 = conn.query_row(
1261 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1262 params![project_id],
1263 |row| row.get(0),
1264 )?;
1265
1266 let bytes_estimated: i64 = conn.query_row(
1267 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1268 params![project_id],
1269 |row| row.get(0),
1270 )?;
1271
1272 conn.execute(
1274 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1275 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1276 params![project_id],
1277 )?;
1278
1279 conn.execute(
1281 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1282 params![project_id],
1283 )?;
1284
1285 conn.execute(
1287 "DELETE FROM project_file_index WHERE project_id = ?1",
1288 params![project_id],
1289 )?;
1290 conn.execute(
1291 "DELETE FROM project_index_status WHERE project_id = ?1",
1292 params![project_id],
1293 )?;
1294
1295 drop(conn); if vacuum {
1298 self.vacuum().await?;
1299 }
1300
1301 Ok(ClearFileIndexResult {
1302 chunks_deleted,
1303 bytes_estimated,
1304 did_vacuum: vacuum,
1305 })
1306 }
1307}
1308
1309fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1311 let id: String = row.get(0)?;
1312 let content: String = row.get(1)?;
1313
1314 let session_id: Option<String> = match tier {
1315 MemoryTier::Session => Some(row.get(2)?),
1316 MemoryTier::Project => row.get(2)?,
1317 MemoryTier::Global => None,
1318 };
1319
1320 let project_id: Option<String> = match tier {
1321 MemoryTier::Session => row.get(3)?,
1322 MemoryTier::Project => Some(row.get(3)?),
1323 MemoryTier::Global => None,
1324 };
1325
1326 let source: String = row.get(4)?;
1327 let created_at_str: String = row.get(5)?;
1328 let token_count: i64 = row.get(6)?;
1329 let metadata_str: Option<String> = row.get(7)?;
1330
1331 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1332 .map_err(|e| {
1333 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1334 })?
1335 .with_timezone(&Utc);
1336
1337 let metadata = metadata_str
1338 .filter(|s| !s.is_empty())
1339 .and_then(|s| serde_json::from_str(&s).ok());
1340
1341 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1342 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1343 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1344 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1345
1346 Ok(MemoryChunk {
1347 id,
1348 content,
1349 tier,
1350 session_id,
1351 project_id,
1352 source,
1353 source_path,
1354 source_mtime,
1355 source_size,
1356 source_hash,
1357 created_at,
1358 token_count,
1359 metadata,
1360 })
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365 use super::*;
1366 use tempfile::TempDir;
1367
1368 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
1369 let temp_dir = TempDir::new().unwrap();
1370 let db_path = temp_dir.path().join("test_memory.db");
1371 let db = MemoryDatabase::new(&db_path).await.unwrap();
1372 (db, temp_dir)
1373 }
1374
1375 #[tokio::test]
1376 async fn test_init_schema() {
1377 let (db, _temp) = setup_test_db().await;
1378 let stats = db.get_stats().await.unwrap();
1380 assert_eq!(stats.total_chunks, 0);
1381 }
1382
1383 #[tokio::test]
1384 async fn test_store_and_retrieve_chunk() {
1385 let (db, _temp) = setup_test_db().await;
1386
1387 let chunk = MemoryChunk {
1388 id: "test-1".to_string(),
1389 content: "Test content".to_string(),
1390 tier: MemoryTier::Session,
1391 session_id: Some("session-1".to_string()),
1392 project_id: Some("project-1".to_string()),
1393 source: "user_message".to_string(),
1394 source_path: None,
1395 source_mtime: None,
1396 source_size: None,
1397 source_hash: None,
1398 created_at: Utc::now(),
1399 token_count: 10,
1400 metadata: None,
1401 };
1402
1403 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
1404 db.store_chunk(&chunk, &embedding).await.unwrap();
1405
1406 let chunks = db.get_session_chunks("session-1").await.unwrap();
1407 assert_eq!(chunks.len(), 1);
1408 assert_eq!(chunks[0].content, "Test content");
1409 }
1410
1411 #[tokio::test]
1412 async fn test_config_crud() {
1413 let (db, _temp) = setup_test_db().await;
1414
1415 let config = db.get_or_create_config("project-1").await.unwrap();
1416 assert_eq!(config.max_chunks, 10000);
1417
1418 let new_config = MemoryConfig {
1419 max_chunks: 5000,
1420 ..Default::default()
1421 };
1422 db.update_config("project-1", &new_config).await.unwrap();
1423
1424 let updated = db.get_or_create_config("project-1").await.unwrap();
1425 assert_eq!(updated.max_chunks, 5000);
1426 }
1427}