1use crate::types::{
5 ClearFileIndexResult, MemoryChunk, MemoryConfig, MemoryResult, MemoryStats, MemoryTier,
6 ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
7};
8use chrono::{DateTime, Utc};
9use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
10use sqlite_vec::sqlite3_vec_init;
11use std::collections::HashSet;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16
17type ProjectIndexStatusRow = (
18 Option<String>,
19 Option<i64>,
20 Option<i64>,
21 Option<i64>,
22 Option<i64>,
23 Option<i64>,
24);
25
26pub struct MemoryDatabase {
28 conn: Arc<Mutex<Connection>>,
29 db_path: std::path::PathBuf,
30}
31
32impl MemoryDatabase {
33 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
35 unsafe {
37 sqlite3_auto_extension(Some(std::mem::transmute::<
38 *const (),
39 unsafe extern "C" fn(
40 *mut rusqlite::ffi::sqlite3,
41 *mut *mut i8,
42 *const rusqlite::ffi::sqlite3_api_routines,
43 ) -> i32,
44 >(sqlite3_vec_init as *const ())));
45 }
46
47 let conn = Connection::open(db_path)?;
48 conn.busy_timeout(Duration::from_secs(10))?;
49
50 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
53 conn.execute("PRAGMA synchronous = NORMAL", [])?;
54
55 let db = Self {
56 conn: Arc::new(Mutex::new(conn)),
57 db_path: db_path.to_path_buf(),
58 };
59
60 db.init_schema().await?;
62 if let Err(err) = db.validate_vector_tables().await {
63 match &err {
64 crate::types::MemoryError::Database(db_err)
65 if Self::is_vector_table_error(db_err) =>
66 {
67 tracing::warn!(
68 "Detected vector table corruption during startup ({}). Recreating vector tables.",
69 db_err
70 );
71 db.recreate_vector_tables().await?;
72 }
73 _ => return Err(err),
74 }
75 }
76 db.validate_integrity().await?;
77
78 Ok(db)
79 }
80
81 async fn validate_integrity(&self) -> MemoryResult<()> {
83 let conn = self.conn.lock().await;
84 let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
85 {
86 Ok(value) => value,
87 Err(err) => {
88 tracing::warn!(
92 "Skipping strict PRAGMA quick_check due to probe error: {}",
93 err
94 );
95 return Ok(());
96 }
97 };
98 if check.trim().eq_ignore_ascii_case("ok") {
99 return Ok(());
100 }
101
102 let lowered = check.to_lowercase();
103 if lowered.contains("malformed")
104 || lowered.contains("corrupt")
105 || lowered.contains("database disk image is malformed")
106 {
107 return Err(crate::types::MemoryError::InvalidConfig(format!(
108 "malformed database integrity check: {}",
109 check
110 )));
111 }
112
113 tracing::warn!(
114 "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
115 check
116 );
117 Ok(())
118 }
119
120 async fn init_schema(&self) -> MemoryResult<()> {
122 let conn = self.conn.lock().await;
123
124 conn.execute(
128 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
129 id TEXT PRIMARY KEY,
130 content TEXT NOT NULL,
131 session_id TEXT NOT NULL,
132 project_id TEXT,
133 source TEXT NOT NULL,
134 created_at TEXT NOT NULL,
135 token_count INTEGER NOT NULL DEFAULT 0,
136 metadata TEXT
137 )",
138 [],
139 )?;
140
141 conn.execute(
143 &format!(
144 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
145 chunk_id TEXT PRIMARY KEY,
146 embedding float[{}]
147 )",
148 DEFAULT_EMBEDDING_DIMENSION
149 ),
150 [],
151 )?;
152
153 conn.execute(
155 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
156 id TEXT PRIMARY KEY,
157 content TEXT NOT NULL,
158 project_id TEXT NOT NULL,
159 session_id TEXT,
160 source TEXT NOT NULL,
161 created_at TEXT NOT NULL,
162 token_count INTEGER NOT NULL DEFAULT 0,
163 metadata TEXT
164 )",
165 [],
166 )?;
167
168 let existing_cols: HashSet<String> = {
171 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
172 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
173 rows.collect::<Result<HashSet<_>, _>>()?
174 };
175
176 if !existing_cols.contains("source_path") {
177 conn.execute(
178 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
179 [],
180 )?;
181 }
182 if !existing_cols.contains("source_mtime") {
183 conn.execute(
184 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
185 [],
186 )?;
187 }
188 if !existing_cols.contains("source_size") {
189 conn.execute(
190 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
191 [],
192 )?;
193 }
194 if !existing_cols.contains("source_hash") {
195 conn.execute(
196 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
197 [],
198 )?;
199 }
200
201 conn.execute(
203 &format!(
204 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
205 chunk_id TEXT PRIMARY KEY,
206 embedding float[{}]
207 )",
208 DEFAULT_EMBEDDING_DIMENSION
209 ),
210 [],
211 )?;
212
213 conn.execute(
215 "CREATE TABLE IF NOT EXISTS project_file_index (
216 project_id TEXT NOT NULL,
217 path TEXT NOT NULL,
218 mtime INTEGER NOT NULL,
219 size INTEGER NOT NULL,
220 hash TEXT NOT NULL,
221 indexed_at TEXT NOT NULL,
222 PRIMARY KEY(project_id, path)
223 )",
224 [],
225 )?;
226
227 conn.execute(
228 "CREATE TABLE IF NOT EXISTS project_index_status (
229 project_id TEXT PRIMARY KEY,
230 last_indexed_at TEXT,
231 last_total_files INTEGER,
232 last_processed_files INTEGER,
233 last_indexed_files INTEGER,
234 last_skipped_files INTEGER,
235 last_errors INTEGER
236 )",
237 [],
238 )?;
239
240 conn.execute(
242 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
243 id TEXT PRIMARY KEY,
244 content TEXT NOT NULL,
245 source TEXT NOT NULL,
246 created_at TEXT NOT NULL,
247 token_count INTEGER NOT NULL DEFAULT 0,
248 metadata TEXT
249 )",
250 [],
251 )?;
252
253 conn.execute(
255 &format!(
256 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
257 chunk_id TEXT PRIMARY KEY,
258 embedding float[{}]
259 )",
260 DEFAULT_EMBEDDING_DIMENSION
261 ),
262 [],
263 )?;
264
265 conn.execute(
267 "CREATE TABLE IF NOT EXISTS memory_config (
268 project_id TEXT PRIMARY KEY,
269 max_chunks INTEGER NOT NULL DEFAULT 10000,
270 chunk_size INTEGER NOT NULL DEFAULT 512,
271 retrieval_k INTEGER NOT NULL DEFAULT 5,
272 auto_cleanup INTEGER NOT NULL DEFAULT 1,
273 session_retention_days INTEGER NOT NULL DEFAULT 30,
274 token_budget INTEGER NOT NULL DEFAULT 5000,
275 chunk_overlap INTEGER NOT NULL DEFAULT 64,
276 updated_at TEXT NOT NULL
277 )",
278 [],
279 )?;
280
281 conn.execute(
283 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
284 id TEXT PRIMARY KEY,
285 cleanup_type TEXT NOT NULL,
286 tier TEXT NOT NULL,
287 project_id TEXT,
288 session_id TEXT,
289 chunks_deleted INTEGER NOT NULL DEFAULT 0,
290 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
291 created_at TEXT NOT NULL
292 )",
293 [],
294 )?;
295
296 conn.execute(
298 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
299 [],
300 )?;
301 conn.execute(
302 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
303 [],
304 )?;
305 conn.execute(
306 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
307 [],
308 )?;
309 conn.execute(
310 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
311 [],
312 )?;
313 conn.execute(
314 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
315 [],
316 )?;
317 conn.execute(
318 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
319 [],
320 )?;
321
322 Ok(())
323 }
324
325 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
328 let conn = self.conn.lock().await;
329 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
330
331 for table in [
332 "session_memory_vectors",
333 "project_memory_vectors",
334 "global_memory_vectors",
335 ] {
336 let sql = format!("SELECT COUNT(*) FROM {}", table);
337 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
338
339 if row_count > 0 {
342 let probe_sql = format!(
343 "SELECT chunk_id, distance
344 FROM {}
345 WHERE embedding MATCH ?1 AND k = 1",
346 table
347 );
348 let mut stmt = conn.prepare(&probe_sql)?;
349 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
350 let _ = rows.next()?;
351 }
352 }
353 Ok(())
354 }
355
356 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
357 let text = err.to_string().to_lowercase();
358 text.contains("vector blob")
359 || text.contains("chunks iter error")
360 || text.contains("chunks iter")
361 || text.contains("internal sqlite-vec error")
362 || text.contains("insert rowids id")
363 || text.contains("sql logic error")
364 || text.contains("database disk image is malformed")
365 || text.contains("session_memory_vectors")
366 || text.contains("project_memory_vectors")
367 || text.contains("global_memory_vectors")
368 || text.contains("vec0")
369 }
370
371 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
372 let conn = self.conn.lock().await;
373
374 for base in [
375 "session_memory_vectors",
376 "project_memory_vectors",
377 "global_memory_vectors",
378 ] {
379 for name in [
381 base.to_string(),
382 format!("{}_chunks", base),
383 format!("{}_info", base),
384 format!("{}_rowids", base),
385 format!("{}_vector_chunks00", base),
386 ] {
387 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
388 conn.execute(&sql, [])?;
389 }
390
391 let like_pattern = format!("{base}_%");
393 let mut stmt = conn.prepare(
394 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
395 )?;
396 let table_names = stmt
397 .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
398 .collect::<Result<Vec<_>, _>>()?;
399 drop(stmt);
400 for name in table_names {
401 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
402 conn.execute(&sql, [])?;
403 }
404 }
405
406 conn.execute(
407 &format!(
408 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
409 chunk_id TEXT PRIMARY KEY,
410 embedding float[{}]
411 )",
412 DEFAULT_EMBEDDING_DIMENSION
413 ),
414 [],
415 )?;
416
417 conn.execute(
418 &format!(
419 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
420 chunk_id TEXT PRIMARY KEY,
421 embedding float[{}]
422 )",
423 DEFAULT_EMBEDDING_DIMENSION
424 ),
425 [],
426 )?;
427
428 conn.execute(
429 &format!(
430 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
431 chunk_id TEXT PRIMARY KEY,
432 embedding float[{}]
433 )",
434 DEFAULT_EMBEDDING_DIMENSION
435 ),
436 [],
437 )?;
438
439 Ok(())
440 }
441
442 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
445 match self.validate_vector_tables().await {
446 Ok(()) => Ok(false),
447 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
448 tracing::warn!(
449 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
450 err
451 );
452 self.recreate_vector_tables().await?;
453 Ok(true)
454 }
455 Err(err) => Err(err),
456 }
457 }
458
459 pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
463 let table_names = {
464 let conn = self.conn.lock().await;
465 let mut stmt = conn.prepare(
466 "SELECT name FROM sqlite_master
467 WHERE type='table'
468 AND name NOT LIKE 'sqlite_%'
469 ORDER BY name",
470 )?;
471 let names = stmt
472 .query_map([], |row| row.get::<_, String>(0))?
473 .collect::<Result<Vec<_>, _>>()?;
474 names
475 };
476
477 {
478 let conn = self.conn.lock().await;
479 for table in table_names {
480 let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
481 let _ = conn.execute(&sql, []);
482 }
483 }
484
485 self.init_schema().await
486 }
487
488 pub async fn try_repair_after_error(
491 &self,
492 err: &crate::types::MemoryError,
493 ) -> MemoryResult<bool> {
494 match err {
495 crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
496 tracing::warn!(
497 "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
498 db_err
499 );
500 self.recreate_vector_tables().await?;
501 Ok(true)
502 }
503 _ => Ok(false),
504 }
505 }
506
507 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
509 let conn = self.conn.lock().await;
510
511 let (chunks_table, vectors_table) = match chunk.tier {
512 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
513 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
514 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
515 };
516
517 let created_at_str = chunk.created_at.to_rfc3339();
518 let metadata_str = chunk
519 .metadata
520 .as_ref()
521 .map(|m| m.to_string())
522 .unwrap_or_default();
523
524 match chunk.tier {
526 MemoryTier::Session => {
527 conn.execute(
528 &format!(
529 "INSERT INTO {} (id, content, session_id, project_id, source, created_at, token_count, metadata)
530 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
531 chunks_table
532 ),
533 params![
534 chunk.id,
535 chunk.content,
536 chunk.session_id.as_ref().unwrap_or(&String::new()),
537 chunk.project_id,
538 chunk.source,
539 created_at_str,
540 chunk.token_count,
541 metadata_str
542 ],
543 )?;
544 }
545 MemoryTier::Project => {
546 conn.execute(
547 &format!(
548 "INSERT INTO {} (
549 id, content, project_id, session_id, source, created_at, token_count, metadata,
550 source_path, source_mtime, source_size, source_hash
551 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
552 chunks_table
553 ),
554 params![
555 chunk.id,
556 chunk.content,
557 chunk.project_id.as_ref().unwrap_or(&String::new()),
558 chunk.session_id,
559 chunk.source,
560 created_at_str,
561 chunk.token_count,
562 metadata_str,
563 chunk.source_path.clone(),
564 chunk.source_mtime,
565 chunk.source_size,
566 chunk.source_hash.clone()
567 ],
568 )?;
569 }
570 MemoryTier::Global => {
571 conn.execute(
572 &format!(
573 "INSERT INTO {} (id, content, source, created_at, token_count, metadata)
574 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
575 chunks_table
576 ),
577 params![
578 chunk.id,
579 chunk.content,
580 chunk.source,
581 created_at_str,
582 chunk.token_count,
583 metadata_str
584 ],
585 )?;
586 }
587 }
588
589 let embedding_json = format!(
591 "[{}]",
592 embedding
593 .iter()
594 .map(|f| f.to_string())
595 .collect::<Vec<_>>()
596 .join(",")
597 );
598 conn.execute(
599 &format!(
600 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
601 vectors_table
602 ),
603 params![chunk.id, embedding_json],
604 )?;
605
606 Ok(())
607 }
608
609 pub async fn search_similar(
611 &self,
612 query_embedding: &[f32],
613 tier: MemoryTier,
614 project_id: Option<&str>,
615 session_id: Option<&str>,
616 limit: i64,
617 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
618 let conn = self.conn.lock().await;
619
620 let (chunks_table, vectors_table) = match tier {
621 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
622 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
623 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
624 };
625
626 let embedding_json = format!(
627 "[{}]",
628 query_embedding
629 .iter()
630 .map(|f| f.to_string())
631 .collect::<Vec<_>>()
632 .join(",")
633 );
634
635 let results = match tier {
637 MemoryTier::Session => {
638 if let Some(sid) = session_id {
639 let sql = format!(
640 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
641 v.distance
642 FROM {} AS v
643 JOIN {} AS c ON v.chunk_id = c.id
644 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
645 ORDER BY v.distance",
646 vectors_table, chunks_table
647 );
648 let mut stmt = conn.prepare(&sql)?;
649 let results = stmt
650 .query_map(params![sid, embedding_json, limit], |row| {
651 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
652 })?
653 .collect::<Result<Vec<_>, _>>()?;
654 results
655 } else if let Some(pid) = project_id {
656 let sql = format!(
657 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
658 v.distance
659 FROM {} AS v
660 JOIN {} AS c ON v.chunk_id = c.id
661 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
662 ORDER BY v.distance",
663 vectors_table, chunks_table
664 );
665 let mut stmt = conn.prepare(&sql)?;
666 let results = stmt
667 .query_map(params![pid, embedding_json, limit], |row| {
668 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
669 })?
670 .collect::<Result<Vec<_>, _>>()?;
671 results
672 } else {
673 let sql = format!(
674 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
675 v.distance
676 FROM {} AS v
677 JOIN {} AS c ON v.chunk_id = c.id
678 WHERE v.embedding MATCH ?1 AND k = ?2
679 ORDER BY v.distance",
680 vectors_table, chunks_table
681 );
682 let mut stmt = conn.prepare(&sql)?;
683 let results = stmt
684 .query_map(params![embedding_json, limit], |row| {
685 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
686 })?
687 .collect::<Result<Vec<_>, _>>()?;
688 results
689 }
690 }
691 MemoryTier::Project => {
692 if let Some(pid) = project_id {
693 let sql = format!(
694 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
695 c.source_path, c.source_mtime, c.source_size, c.source_hash,
696 v.distance
697 FROM {} AS v
698 JOIN {} AS c ON v.chunk_id = c.id
699 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
700 ORDER BY v.distance",
701 vectors_table, chunks_table
702 );
703 let mut stmt = conn.prepare(&sql)?;
704 let results = stmt
705 .query_map(params![pid, embedding_json, limit], |row| {
706 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
707 })?
708 .collect::<Result<Vec<_>, _>>()?;
709 results
710 } else {
711 let sql = format!(
712 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
713 c.source_path, c.source_mtime, c.source_size, c.source_hash,
714 v.distance
715 FROM {} AS v
716 JOIN {} AS c ON v.chunk_id = c.id
717 WHERE v.embedding MATCH ?1 AND k = ?2
718 ORDER BY v.distance",
719 vectors_table, chunks_table
720 );
721 let mut stmt = conn.prepare(&sql)?;
722 let results = stmt
723 .query_map(params![embedding_json, limit], |row| {
724 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
725 })?
726 .collect::<Result<Vec<_>, _>>()?;
727 results
728 }
729 }
730 MemoryTier::Global => {
731 let sql = format!(
732 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
733 v.distance
734 FROM {} AS v
735 JOIN {} AS c ON v.chunk_id = c.id
736 WHERE v.embedding MATCH ?1 AND k = ?2
737 ORDER BY v.distance",
738 vectors_table, chunks_table
739 );
740 let mut stmt = conn.prepare(&sql)?;
741 let results = stmt
742 .query_map(params![embedding_json, limit], |row| {
743 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(8)?))
744 })?
745 .collect::<Result<Vec<_>, _>>()?;
746 results
747 }
748 };
749
750 Ok(results)
751 }
752
753 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
755 let conn = self.conn.lock().await;
756
757 let mut stmt = conn.prepare(
758 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata
759 FROM session_memory_chunks
760 WHERE session_id = ?1
761 ORDER BY created_at DESC",
762 )?;
763
764 let chunks = stmt
765 .query_map(params![session_id], |row| {
766 row_to_chunk(row, MemoryTier::Session)
767 })?
768 .collect::<Result<Vec<_>, _>>()?;
769
770 Ok(chunks)
771 }
772
773 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
775 let conn = self.conn.lock().await;
776
777 let mut stmt = conn.prepare(
778 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
779 source_path, source_mtime, source_size, source_hash
780 FROM project_memory_chunks
781 WHERE project_id = ?1
782 ORDER BY created_at DESC",
783 )?;
784
785 let chunks = stmt
786 .query_map(params![project_id], |row| {
787 row_to_chunk(row, MemoryTier::Project)
788 })?
789 .collect::<Result<Vec<_>, _>>()?;
790
791 Ok(chunks)
792 }
793
794 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
796 let conn = self.conn.lock().await;
797
798 let mut stmt = conn.prepare(
799 "SELECT id, content, source, created_at, token_count, metadata
800 FROM global_memory_chunks
801 ORDER BY created_at DESC
802 LIMIT ?1",
803 )?;
804
805 let chunks = stmt
806 .query_map(params![limit], |row| {
807 let id: String = row.get(0)?;
808 let content: String = row.get(1)?;
809 let source: String = row.get(2)?;
810 let created_at_str: String = row.get(3)?;
811 let token_count: i64 = row.get(4)?;
812 let metadata_str: Option<String> = row.get(5)?;
813
814 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
815 .map_err(|e| {
816 rusqlite::Error::FromSqlConversionFailure(
817 3,
818 rusqlite::types::Type::Text,
819 Box::new(e),
820 )
821 })?
822 .with_timezone(&Utc);
823
824 let metadata = metadata_str
825 .filter(|s| !s.is_empty())
826 .and_then(|s| serde_json::from_str(&s).ok());
827
828 Ok(MemoryChunk {
829 id,
830 content,
831 tier: MemoryTier::Global,
832 session_id: None,
833 project_id: None,
834 source,
835 source_path: None,
836 source_mtime: None,
837 source_size: None,
838 source_hash: None,
839 created_at,
840 token_count,
841 metadata,
842 })
843 })?
844 .collect::<Result<Vec<_>, _>>()?;
845
846 Ok(chunks)
847 }
848
849 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
851 let conn = self.conn.lock().await;
852
853 let count: i64 = conn.query_row(
855 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
856 params![session_id],
857 |row| row.get(0),
858 )?;
859
860 conn.execute(
862 "DELETE FROM session_memory_vectors WHERE chunk_id IN
863 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
864 params![session_id],
865 )?;
866
867 conn.execute(
869 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
870 params![session_id],
871 )?;
872
873 Ok(count as u64)
874 }
875
876 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
878 let conn = self.conn.lock().await;
879
880 let count: i64 = conn.query_row(
882 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
883 params![project_id],
884 |row| row.get(0),
885 )?;
886
887 conn.execute(
889 "DELETE FROM project_memory_vectors WHERE chunk_id IN
890 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
891 params![project_id],
892 )?;
893
894 conn.execute(
896 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
897 params![project_id],
898 )?;
899
900 Ok(count as u64)
901 }
902
903 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
905 let conn = self.conn.lock().await;
906
907 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
908 let cutoff_str = cutoff.to_rfc3339();
909
910 let count: i64 = conn.query_row(
912 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
913 params![cutoff_str],
914 |row| row.get(0),
915 )?;
916
917 conn.execute(
919 "DELETE FROM session_memory_vectors WHERE chunk_id IN
920 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
921 params![cutoff_str],
922 )?;
923
924 conn.execute(
926 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
927 params![cutoff_str],
928 )?;
929
930 Ok(count as u64)
931 }
932
933 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
935 let conn = self.conn.lock().await;
936
937 let result: Option<MemoryConfig> = conn
938 .query_row(
939 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
940 session_retention_days, token_budget, chunk_overlap
941 FROM memory_config WHERE project_id = ?1",
942 params![project_id],
943 |row| {
944 Ok(MemoryConfig {
945 max_chunks: row.get(0)?,
946 chunk_size: row.get(1)?,
947 retrieval_k: row.get(2)?,
948 auto_cleanup: row.get::<_, i64>(3)? != 0,
949 session_retention_days: row.get(4)?,
950 token_budget: row.get(5)?,
951 chunk_overlap: row.get(6)?,
952 })
953 },
954 )
955 .optional()?;
956
957 match result {
958 Some(config) => Ok(config),
959 None => {
960 let config = MemoryConfig::default();
962 let updated_at = Utc::now().to_rfc3339();
963
964 conn.execute(
965 "INSERT INTO memory_config
966 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
967 session_retention_days, token_budget, chunk_overlap, updated_at)
968 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
969 params![
970 project_id,
971 config.max_chunks,
972 config.chunk_size,
973 config.retrieval_k,
974 config.auto_cleanup as i64,
975 config.session_retention_days,
976 config.token_budget,
977 config.chunk_overlap,
978 updated_at
979 ],
980 )?;
981
982 Ok(config)
983 }
984 }
985 }
986
987 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
989 let conn = self.conn.lock().await;
990
991 let updated_at = Utc::now().to_rfc3339();
992
993 conn.execute(
994 "INSERT OR REPLACE INTO memory_config
995 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
996 session_retention_days, token_budget, chunk_overlap, updated_at)
997 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
998 params![
999 project_id,
1000 config.max_chunks,
1001 config.chunk_size,
1002 config.retrieval_k,
1003 config.auto_cleanup as i64,
1004 config.session_retention_days,
1005 config.token_budget,
1006 config.chunk_overlap,
1007 updated_at
1008 ],
1009 )?;
1010
1011 Ok(())
1012 }
1013
1014 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1016 let conn = self.conn.lock().await;
1017
1018 let session_chunks: i64 =
1020 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1021 row.get(0)
1022 })?;
1023
1024 let project_chunks: i64 =
1025 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1026 row.get(0)
1027 })?;
1028
1029 let global_chunks: i64 =
1030 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1031 row.get(0)
1032 })?;
1033
1034 let session_bytes: i64 = conn.query_row(
1036 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1037 [],
1038 |row| row.get(0),
1039 )?;
1040
1041 let project_bytes: i64 = conn.query_row(
1042 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1043 [],
1044 |row| row.get(0),
1045 )?;
1046
1047 let global_bytes: i64 = conn.query_row(
1048 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1049 [],
1050 |row| row.get(0),
1051 )?;
1052
1053 let last_cleanup: Option<String> = conn
1055 .query_row(
1056 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1057 [],
1058 |row| row.get(0),
1059 )
1060 .optional()?;
1061
1062 let last_cleanup = last_cleanup.and_then(|s| {
1063 DateTime::parse_from_rfc3339(&s)
1064 .ok()
1065 .map(|dt| dt.with_timezone(&Utc))
1066 });
1067
1068 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1070
1071 Ok(MemoryStats {
1072 total_chunks: session_chunks + project_chunks + global_chunks,
1073 session_chunks,
1074 project_chunks,
1075 global_chunks,
1076 total_bytes: session_bytes + project_bytes + global_bytes,
1077 session_bytes,
1078 project_bytes,
1079 global_bytes,
1080 file_size,
1081 last_cleanup,
1082 })
1083 }
1084
1085 pub async fn log_cleanup(
1087 &self,
1088 cleanup_type: &str,
1089 tier: MemoryTier,
1090 project_id: Option<&str>,
1091 session_id: Option<&str>,
1092 chunks_deleted: i64,
1093 bytes_reclaimed: i64,
1094 ) -> MemoryResult<()> {
1095 let conn = self.conn.lock().await;
1096
1097 let id = uuid::Uuid::new_v4().to_string();
1098 let created_at = Utc::now().to_rfc3339();
1099
1100 conn.execute(
1101 "INSERT INTO memory_cleanup_log
1102 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1103 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1104 params![
1105 id,
1106 cleanup_type,
1107 tier.to_string(),
1108 project_id,
1109 session_id,
1110 chunks_deleted,
1111 bytes_reclaimed,
1112 created_at
1113 ],
1114 )?;
1115
1116 Ok(())
1117 }
1118
1119 pub async fn vacuum(&self) -> MemoryResult<()> {
1121 let conn = self.conn.lock().await;
1122 conn.execute("VACUUM", [])?;
1123 Ok(())
1124 }
1125
1126 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1131 let conn = self.conn.lock().await;
1132 let n: i64 = conn.query_row(
1133 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1134 params![project_id],
1135 |row| row.get(0),
1136 )?;
1137 Ok(n)
1138 }
1139
1140 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1141 let conn = self.conn.lock().await;
1142 let exists: Option<i64> = conn
1143 .query_row(
1144 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1145 params![project_id],
1146 |row| row.get(0),
1147 )
1148 .optional()?;
1149 Ok(exists.is_some())
1150 }
1151
1152 pub async fn get_file_index_entry(
1153 &self,
1154 project_id: &str,
1155 path: &str,
1156 ) -> MemoryResult<Option<(i64, i64, String)>> {
1157 let conn = self.conn.lock().await;
1158 let row: Option<(i64, i64, String)> = conn
1159 .query_row(
1160 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1161 params![project_id, path],
1162 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1163 )
1164 .optional()?;
1165 Ok(row)
1166 }
1167
1168 pub async fn upsert_file_index_entry(
1169 &self,
1170 project_id: &str,
1171 path: &str,
1172 mtime: i64,
1173 size: i64,
1174 hash: &str,
1175 ) -> MemoryResult<()> {
1176 let conn = self.conn.lock().await;
1177 let indexed_at = Utc::now().to_rfc3339();
1178 conn.execute(
1179 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1180 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1181 ON CONFLICT(project_id, path) DO UPDATE SET
1182 mtime = excluded.mtime,
1183 size = excluded.size,
1184 hash = excluded.hash,
1185 indexed_at = excluded.indexed_at",
1186 params![project_id, path, mtime, size, hash, indexed_at],
1187 )?;
1188 Ok(())
1189 }
1190
1191 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1192 let conn = self.conn.lock().await;
1193 conn.execute(
1194 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1195 params![project_id, path],
1196 )?;
1197 Ok(())
1198 }
1199
1200 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1201 let conn = self.conn.lock().await;
1202 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1203 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1204 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1205 }
1206
1207 pub async fn delete_project_file_chunks_by_path(
1208 &self,
1209 project_id: &str,
1210 source_path: &str,
1211 ) -> MemoryResult<(i64, i64)> {
1212 let conn = self.conn.lock().await;
1213
1214 let chunks_deleted: i64 = conn.query_row(
1215 "SELECT COUNT(*) FROM project_memory_chunks
1216 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1217 params![project_id, source_path],
1218 |row| row.get(0),
1219 )?;
1220
1221 let bytes_estimated: i64 = conn.query_row(
1222 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1223 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1224 params![project_id, source_path],
1225 |row| row.get(0),
1226 )?;
1227
1228 conn.execute(
1230 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1231 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1232 params![project_id, source_path],
1233 )?;
1234
1235 conn.execute(
1236 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1237 params![project_id, source_path],
1238 )?;
1239
1240 Ok((chunks_deleted, bytes_estimated))
1241 }
1242
1243 pub async fn upsert_project_index_status(
1244 &self,
1245 project_id: &str,
1246 total_files: i64,
1247 processed_files: i64,
1248 indexed_files: i64,
1249 skipped_files: i64,
1250 errors: i64,
1251 ) -> MemoryResult<()> {
1252 let conn = self.conn.lock().await;
1253 let last_indexed_at = Utc::now().to_rfc3339();
1254 conn.execute(
1255 "INSERT INTO project_index_status (
1256 project_id, last_indexed_at, last_total_files, last_processed_files,
1257 last_indexed_files, last_skipped_files, last_errors
1258 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1259 ON CONFLICT(project_id) DO UPDATE SET
1260 last_indexed_at = excluded.last_indexed_at,
1261 last_total_files = excluded.last_total_files,
1262 last_processed_files = excluded.last_processed_files,
1263 last_indexed_files = excluded.last_indexed_files,
1264 last_skipped_files = excluded.last_skipped_files,
1265 last_errors = excluded.last_errors",
1266 params![
1267 project_id,
1268 last_indexed_at,
1269 total_files,
1270 processed_files,
1271 indexed_files,
1272 skipped_files,
1273 errors
1274 ],
1275 )?;
1276 Ok(())
1277 }
1278
1279 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1280 let conn = self.conn.lock().await;
1281
1282 let project_chunks: i64 = conn.query_row(
1283 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1284 params![project_id],
1285 |row| row.get(0),
1286 )?;
1287
1288 let project_bytes: i64 = conn.query_row(
1289 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1290 params![project_id],
1291 |row| row.get(0),
1292 )?;
1293
1294 let file_index_chunks: i64 = conn.query_row(
1295 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1296 params![project_id],
1297 |row| row.get(0),
1298 )?;
1299
1300 let file_index_bytes: i64 = conn.query_row(
1301 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1302 params![project_id],
1303 |row| row.get(0),
1304 )?;
1305
1306 let indexed_files: i64 = conn.query_row(
1307 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1308 params![project_id],
1309 |row| row.get(0),
1310 )?;
1311
1312 let status_row: Option<ProjectIndexStatusRow> =
1313 conn
1314 .query_row(
1315 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1316 FROM project_index_status WHERE project_id = ?1",
1317 params![project_id],
1318 |row| {
1319 Ok((
1320 row.get(0)?,
1321 row.get(1)?,
1322 row.get(2)?,
1323 row.get(3)?,
1324 row.get(4)?,
1325 row.get(5)?,
1326 ))
1327 },
1328 )
1329 .optional()?;
1330
1331 let (
1332 last_indexed_at,
1333 last_total_files,
1334 last_processed_files,
1335 last_indexed_files,
1336 last_skipped_files,
1337 last_errors,
1338 ) = status_row.unwrap_or((None, None, None, None, None, None));
1339
1340 let last_indexed_at = last_indexed_at.and_then(|s| {
1341 DateTime::parse_from_rfc3339(&s)
1342 .ok()
1343 .map(|dt| dt.with_timezone(&Utc))
1344 });
1345
1346 Ok(ProjectMemoryStats {
1347 project_id: project_id.to_string(),
1348 project_chunks,
1349 project_bytes,
1350 file_index_chunks,
1351 file_index_bytes,
1352 indexed_files,
1353 last_indexed_at,
1354 last_total_files,
1355 last_processed_files,
1356 last_indexed_files,
1357 last_skipped_files,
1358 last_errors,
1359 })
1360 }
1361
1362 pub async fn clear_project_file_index(
1363 &self,
1364 project_id: &str,
1365 vacuum: bool,
1366 ) -> MemoryResult<ClearFileIndexResult> {
1367 let conn = self.conn.lock().await;
1368
1369 let chunks_deleted: i64 = conn.query_row(
1370 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1371 params![project_id],
1372 |row| row.get(0),
1373 )?;
1374
1375 let bytes_estimated: i64 = conn.query_row(
1376 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1377 params![project_id],
1378 |row| row.get(0),
1379 )?;
1380
1381 conn.execute(
1383 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1384 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1385 params![project_id],
1386 )?;
1387
1388 conn.execute(
1390 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1391 params![project_id],
1392 )?;
1393
1394 conn.execute(
1396 "DELETE FROM project_file_index WHERE project_id = ?1",
1397 params![project_id],
1398 )?;
1399 conn.execute(
1400 "DELETE FROM project_index_status WHERE project_id = ?1",
1401 params![project_id],
1402 )?;
1403
1404 drop(conn); if vacuum {
1407 self.vacuum().await?;
1408 }
1409
1410 Ok(ClearFileIndexResult {
1411 chunks_deleted,
1412 bytes_estimated,
1413 did_vacuum: vacuum,
1414 })
1415 }
1416
1417 pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1429 if retention_days == 0 {
1430 return Ok(0);
1431 }
1432
1433 let conn = self.conn.lock().await;
1434
1435 let cutoff =
1437 (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1438
1439 conn.execute(
1441 "DELETE FROM session_memory_vectors
1442 WHERE chunk_id IN (
1443 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1444 )",
1445 params![cutoff],
1446 )?;
1447
1448 let deleted = conn.execute(
1449 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1450 params![cutoff],
1451 )?;
1452
1453 if deleted > 0 {
1454 tracing::info!(
1455 retention_days,
1456 deleted,
1457 "memory hygiene: pruned old session chunks"
1458 );
1459 }
1460
1461 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1462 Ok(deleted as u64)
1463 }
1464
1465 pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
1471 let retention_days = if env_override_days > 0 {
1473 env_override_days
1474 } else {
1475 let conn = self.conn.lock().await;
1477 let days: Option<i64> = conn
1478 .query_row(
1479 "SELECT session_retention_days FROM memory_config
1480 WHERE project_id = '__global__' LIMIT 1",
1481 [],
1482 |row| row.get(0),
1483 )
1484 .ok();
1485 drop(conn);
1486 days.unwrap_or(30) as u32
1487 };
1488
1489 self.prune_old_session_chunks(retention_days).await
1490 }
1491}
1492
1493fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
1495 let id: String = row.get(0)?;
1496 let content: String = row.get(1)?;
1497
1498 let session_id: Option<String> = match tier {
1499 MemoryTier::Session => Some(row.get(2)?),
1500 MemoryTier::Project => row.get(2)?,
1501 MemoryTier::Global => None,
1502 };
1503
1504 let project_id: Option<String> = match tier {
1505 MemoryTier::Session => row.get(3)?,
1506 MemoryTier::Project => Some(row.get(3)?),
1507 MemoryTier::Global => None,
1508 };
1509
1510 let source: String = row.get(4)?;
1511 let created_at_str: String = row.get(5)?;
1512 let token_count: i64 = row.get(6)?;
1513 let metadata_str: Option<String> = row.get(7)?;
1514
1515 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
1516 .map_err(|e| {
1517 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
1518 })?
1519 .with_timezone(&Utc);
1520
1521 let metadata = metadata_str
1522 .filter(|s| !s.is_empty())
1523 .and_then(|s| serde_json::from_str(&s).ok());
1524
1525 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
1526 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
1527 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
1528 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
1529
1530 Ok(MemoryChunk {
1531 id,
1532 content,
1533 tier,
1534 session_id,
1535 project_id,
1536 source,
1537 source_path,
1538 source_mtime,
1539 source_size,
1540 source_hash,
1541 created_at,
1542 token_count,
1543 metadata,
1544 })
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549 use super::*;
1550 use tempfile::TempDir;
1551
1552 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
1553 let temp_dir = TempDir::new().unwrap();
1554 let db_path = temp_dir.path().join("test_memory.db");
1555 let db = MemoryDatabase::new(&db_path).await.unwrap();
1556 (db, temp_dir)
1557 }
1558
1559 #[tokio::test]
1560 async fn test_init_schema() {
1561 let (db, _temp) = setup_test_db().await;
1562 let stats = db.get_stats().await.unwrap();
1564 assert_eq!(stats.total_chunks, 0);
1565 }
1566
1567 #[tokio::test]
1568 async fn test_store_and_retrieve_chunk() {
1569 let (db, _temp) = setup_test_db().await;
1570
1571 let chunk = MemoryChunk {
1572 id: "test-1".to_string(),
1573 content: "Test content".to_string(),
1574 tier: MemoryTier::Session,
1575 session_id: Some("session-1".to_string()),
1576 project_id: Some("project-1".to_string()),
1577 source: "user_message".to_string(),
1578 source_path: None,
1579 source_mtime: None,
1580 source_size: None,
1581 source_hash: None,
1582 created_at: Utc::now(),
1583 token_count: 10,
1584 metadata: None,
1585 };
1586
1587 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
1588 db.store_chunk(&chunk, &embedding).await.unwrap();
1589
1590 let chunks = db.get_session_chunks("session-1").await.unwrap();
1591 assert_eq!(chunks.len(), 1);
1592 assert_eq!(chunks[0].content, "Test content");
1593 }
1594
1595 #[tokio::test]
1596 async fn test_config_crud() {
1597 let (db, _temp) = setup_test_db().await;
1598
1599 let config = db.get_or_create_config("project-1").await.unwrap();
1600 assert_eq!(config.max_chunks, 10000);
1601
1602 let new_config = MemoryConfig {
1603 max_chunks: 5000,
1604 ..Default::default()
1605 };
1606 db.update_config("project-1", &new_config).await.unwrap();
1607
1608 let updated = db.get_or_create_config("project-1").await.unwrap();
1609 assert_eq!(updated.max_chunks, 5000);
1610 }
1611}