1use crate::types::{
5 ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
6 MemoryChunk, MemoryConfig, MemoryError, MemoryResult, MemoryStats, MemoryTier,
7 ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
8};
9use chrono::{DateTime, Utc};
10use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
11use sqlite_vec::sqlite3_vec_init;
12use std::collections::HashSet;
13use std::path::Path;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18type ProjectIndexStatusRow = (
19 Option<String>,
20 Option<i64>,
21 Option<i64>,
22 Option<i64>,
23 Option<i64>,
24 Option<i64>,
25);
26
27pub struct MemoryDatabase {
29 conn: Arc<Mutex<Connection>>,
30 db_path: std::path::PathBuf,
31}
32
33impl MemoryDatabase {
34 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
36 unsafe {
38 sqlite3_auto_extension(Some(std::mem::transmute::<
39 *const (),
40 unsafe extern "C" fn(
41 *mut rusqlite::ffi::sqlite3,
42 *mut *mut i8,
43 *const rusqlite::ffi::sqlite3_api_routines,
44 ) -> i32,
45 >(sqlite3_vec_init as *const ())));
46 }
47
48 let conn = Connection::open(db_path)?;
49 conn.busy_timeout(Duration::from_secs(10))?;
50
51 conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
54 conn.execute("PRAGMA synchronous = NORMAL", [])?;
55
56 let db = Self {
57 conn: Arc::new(Mutex::new(conn)),
58 db_path: db_path.to_path_buf(),
59 };
60
61 db.init_schema().await?;
63 if let Err(err) = db.validate_vector_tables().await {
64 match &err {
65 crate::types::MemoryError::Database(db_err)
66 if Self::is_vector_table_error(db_err) =>
67 {
68 tracing::warn!(
69 "Detected vector table corruption during startup ({}). Recreating vector tables.",
70 db_err
71 );
72 db.recreate_vector_tables().await?;
73 }
74 _ => return Err(err),
75 }
76 }
77 db.validate_integrity().await?;
78
79 Ok(db)
80 }
81
82 async fn validate_integrity(&self) -> MemoryResult<()> {
84 let conn = self.conn.lock().await;
85 let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
86 {
87 Ok(value) => value,
88 Err(err) => {
89 tracing::warn!(
93 "Skipping strict PRAGMA quick_check due to probe error: {}",
94 err
95 );
96 return Ok(());
97 }
98 };
99 if check.trim().eq_ignore_ascii_case("ok") {
100 return Ok(());
101 }
102
103 let lowered = check.to_lowercase();
104 if lowered.contains("malformed")
105 || lowered.contains("corrupt")
106 || lowered.contains("database disk image is malformed")
107 {
108 return Err(crate::types::MemoryError::InvalidConfig(format!(
109 "malformed database integrity check: {}",
110 check
111 )));
112 }
113
114 tracing::warn!(
115 "PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
116 check
117 );
118 Ok(())
119 }
120
121 async fn init_schema(&self) -> MemoryResult<()> {
123 let conn = self.conn.lock().await;
124
125 conn.execute(
129 "CREATE TABLE IF NOT EXISTS session_memory_chunks (
130 id TEXT PRIMARY KEY,
131 content TEXT NOT NULL,
132 session_id TEXT NOT NULL,
133 project_id TEXT,
134 source TEXT NOT NULL,
135 created_at TEXT NOT NULL,
136 token_count INTEGER NOT NULL DEFAULT 0,
137 metadata TEXT
138 )",
139 [],
140 )?;
141 let session_existing_cols: HashSet<String> = {
142 let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
143 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
144 rows.collect::<Result<HashSet<_>, _>>()?
145 };
146 if !session_existing_cols.contains("source_path") {
147 conn.execute(
148 "ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
149 [],
150 )?;
151 }
152 if !session_existing_cols.contains("source_mtime") {
153 conn.execute(
154 "ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
155 [],
156 )?;
157 }
158 if !session_existing_cols.contains("source_size") {
159 conn.execute(
160 "ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
161 [],
162 )?;
163 }
164 if !session_existing_cols.contains("source_hash") {
165 conn.execute(
166 "ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
167 [],
168 )?;
169 }
170
171 conn.execute(
173 &format!(
174 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
175 chunk_id TEXT PRIMARY KEY,
176 embedding float[{}]
177 )",
178 DEFAULT_EMBEDDING_DIMENSION
179 ),
180 [],
181 )?;
182
183 conn.execute(
185 "CREATE TABLE IF NOT EXISTS project_memory_chunks (
186 id TEXT PRIMARY KEY,
187 content TEXT NOT NULL,
188 project_id TEXT NOT NULL,
189 session_id TEXT,
190 source TEXT NOT NULL,
191 created_at TEXT NOT NULL,
192 token_count INTEGER NOT NULL DEFAULT 0,
193 metadata TEXT
194 )",
195 [],
196 )?;
197
198 let existing_cols: HashSet<String> = {
201 let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
202 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
203 rows.collect::<Result<HashSet<_>, _>>()?
204 };
205
206 if !existing_cols.contains("source_path") {
207 conn.execute(
208 "ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
209 [],
210 )?;
211 }
212 if !existing_cols.contains("source_mtime") {
213 conn.execute(
214 "ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
215 [],
216 )?;
217 }
218 if !existing_cols.contains("source_size") {
219 conn.execute(
220 "ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
221 [],
222 )?;
223 }
224 if !existing_cols.contains("source_hash") {
225 conn.execute(
226 "ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
227 [],
228 )?;
229 }
230
231 conn.execute(
233 &format!(
234 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
235 chunk_id TEXT PRIMARY KEY,
236 embedding float[{}]
237 )",
238 DEFAULT_EMBEDDING_DIMENSION
239 ),
240 [],
241 )?;
242
243 conn.execute(
245 "CREATE TABLE IF NOT EXISTS project_file_index (
246 project_id TEXT NOT NULL,
247 path TEXT NOT NULL,
248 mtime INTEGER NOT NULL,
249 size INTEGER NOT NULL,
250 hash TEXT NOT NULL,
251 indexed_at TEXT NOT NULL,
252 PRIMARY KEY(project_id, path)
253 )",
254 [],
255 )?;
256 conn.execute(
257 "CREATE TABLE IF NOT EXISTS session_file_index (
258 session_id TEXT NOT NULL,
259 path TEXT NOT NULL,
260 mtime INTEGER NOT NULL,
261 size INTEGER NOT NULL,
262 hash TEXT NOT NULL,
263 indexed_at TEXT NOT NULL,
264 PRIMARY KEY(session_id, path)
265 )",
266 [],
267 )?;
268
269 conn.execute(
270 "CREATE TABLE IF NOT EXISTS project_index_status (
271 project_id TEXT PRIMARY KEY,
272 last_indexed_at TEXT,
273 last_total_files INTEGER,
274 last_processed_files INTEGER,
275 last_indexed_files INTEGER,
276 last_skipped_files INTEGER,
277 last_errors INTEGER
278 )",
279 [],
280 )?;
281
282 conn.execute(
284 "CREATE TABLE IF NOT EXISTS global_memory_chunks (
285 id TEXT PRIMARY KEY,
286 content TEXT NOT NULL,
287 source TEXT NOT NULL,
288 created_at TEXT NOT NULL,
289 token_count INTEGER NOT NULL DEFAULT 0,
290 metadata TEXT
291 )",
292 [],
293 )?;
294 let global_existing_cols: HashSet<String> = {
295 let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
296 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
297 rows.collect::<Result<HashSet<_>, _>>()?
298 };
299 if !global_existing_cols.contains("source_path") {
300 conn.execute(
301 "ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
302 [],
303 )?;
304 }
305 if !global_existing_cols.contains("source_mtime") {
306 conn.execute(
307 "ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
308 [],
309 )?;
310 }
311 if !global_existing_cols.contains("source_size") {
312 conn.execute(
313 "ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
314 [],
315 )?;
316 }
317 if !global_existing_cols.contains("source_hash") {
318 conn.execute(
319 "ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
320 [],
321 )?;
322 }
323
324 conn.execute(
326 &format!(
327 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
328 chunk_id TEXT PRIMARY KEY,
329 embedding float[{}]
330 )",
331 DEFAULT_EMBEDDING_DIMENSION
332 ),
333 [],
334 )?;
335
336 conn.execute(
338 "CREATE TABLE IF NOT EXISTS memory_config (
339 project_id TEXT PRIMARY KEY,
340 max_chunks INTEGER NOT NULL DEFAULT 10000,
341 chunk_size INTEGER NOT NULL DEFAULT 512,
342 retrieval_k INTEGER NOT NULL DEFAULT 5,
343 auto_cleanup INTEGER NOT NULL DEFAULT 1,
344 session_retention_days INTEGER NOT NULL DEFAULT 30,
345 token_budget INTEGER NOT NULL DEFAULT 5000,
346 chunk_overlap INTEGER NOT NULL DEFAULT 64,
347 updated_at TEXT NOT NULL
348 )",
349 [],
350 )?;
351
352 conn.execute(
354 "CREATE TABLE IF NOT EXISTS memory_cleanup_log (
355 id TEXT PRIMARY KEY,
356 cleanup_type TEXT NOT NULL,
357 tier TEXT NOT NULL,
358 project_id TEXT,
359 session_id TEXT,
360 chunks_deleted INTEGER NOT NULL DEFAULT 0,
361 bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
362 created_at TEXT NOT NULL
363 )",
364 [],
365 )?;
366
367 conn.execute(
369 "CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
370 [],
371 )?;
372 conn.execute(
373 "CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
374 [],
375 )?;
376 conn.execute(
377 "CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
378 [],
379 )?;
380 conn.execute(
381 "CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
382 [],
383 )?;
384 conn.execute(
385 "CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
386 [],
387 )?;
388 conn.execute(
389 "CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
390 [],
391 )?;
392 conn.execute(
393 "CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
394 [],
395 )?;
396 conn.execute(
397 "CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
398 [],
399 )?;
400 conn.execute(
401 "CREATE TABLE IF NOT EXISTS global_file_index (
402 path TEXT PRIMARY KEY,
403 mtime INTEGER NOT NULL,
404 size INTEGER NOT NULL,
405 hash TEXT NOT NULL,
406 indexed_at TEXT NOT NULL
407 )",
408 [],
409 )?;
410
411 conn.execute(
413 "CREATE TABLE IF NOT EXISTS memory_records (
414 id TEXT PRIMARY KEY,
415 user_id TEXT NOT NULL,
416 source_type TEXT NOT NULL,
417 content TEXT NOT NULL,
418 content_hash TEXT NOT NULL,
419 run_id TEXT NOT NULL,
420 session_id TEXT,
421 message_id TEXT,
422 tool_name TEXT,
423 project_tag TEXT,
424 channel_tag TEXT,
425 host_tag TEXT,
426 metadata TEXT,
427 provenance TEXT,
428 redaction_status TEXT NOT NULL,
429 redaction_count INTEGER NOT NULL DEFAULT 0,
430 visibility TEXT NOT NULL DEFAULT 'private',
431 demoted INTEGER NOT NULL DEFAULT 0,
432 score_boost REAL NOT NULL DEFAULT 0.0,
433 created_at_ms INTEGER NOT NULL,
434 updated_at_ms INTEGER NOT NULL,
435 expires_at_ms INTEGER
436 )",
437 [],
438 )?;
439 conn.execute(
440 "CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
441 ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
442 [],
443 )?;
444 conn.execute(
445 "CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
446 ON memory_records(user_id, created_at_ms DESC)",
447 [],
448 )?;
449 conn.execute(
450 "CREATE INDEX IF NOT EXISTS idx_memory_records_run
451 ON memory_records(run_id)",
452 [],
453 )?;
454 conn.execute(
455 "CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
456 id UNINDEXED,
457 user_id UNINDEXED,
458 content
459 )",
460 [],
461 )?;
462 conn.execute(
463 "CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
464 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
465 END",
466 [],
467 )?;
468 conn.execute(
469 "CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
470 DELETE FROM memory_records_fts WHERE id = old.id;
471 END",
472 [],
473 )?;
474 conn.execute(
475 "CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
476 DELETE FROM memory_records_fts WHERE id = old.id;
477 INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
478 END",
479 [],
480 )?;
481
482 conn.execute(
483 "CREATE TABLE IF NOT EXISTS memory_nodes (
484 id TEXT PRIMARY KEY,
485 uri TEXT NOT NULL UNIQUE,
486 parent_uri TEXT,
487 node_type TEXT NOT NULL,
488 created_at TEXT NOT NULL,
489 updated_at TEXT NOT NULL,
490 metadata TEXT
491 )",
492 [],
493 )?;
494 conn.execute(
495 "CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
496 [],
497 )?;
498 conn.execute(
499 "CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
500 [],
501 )?;
502
503 conn.execute(
504 "CREATE TABLE IF NOT EXISTS memory_layers (
505 id TEXT PRIMARY KEY,
506 node_id TEXT NOT NULL,
507 layer_type TEXT NOT NULL,
508 content TEXT NOT NULL,
509 token_count INTEGER NOT NULL,
510 embedding_id TEXT,
511 created_at TEXT NOT NULL,
512 source_chunk_id TEXT,
513 FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
514 )",
515 [],
516 )?;
517 conn.execute(
518 "CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
519 [],
520 )?;
521 conn.execute(
522 "CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
523 [],
524 )?;
525
526 conn.execute(
527 "CREATE TABLE IF NOT EXISTS memory_retrieval_state (
528 node_id TEXT PRIMARY KEY,
529 active_layer TEXT NOT NULL DEFAULT 'L0',
530 last_accessed TEXT,
531 access_count INTEGER DEFAULT 0,
532 FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
533 )",
534 [],
535 )?;
536
537 Ok(())
538 }
539
540 pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
543 let conn = self.conn.lock().await;
544 let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
545
546 for table in [
547 "session_memory_vectors",
548 "project_memory_vectors",
549 "global_memory_vectors",
550 ] {
551 let sql = format!("SELECT COUNT(*) FROM {}", table);
552 let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
553
554 if row_count > 0 {
557 let probe_sql = format!(
558 "SELECT chunk_id, distance
559 FROM {}
560 WHERE embedding MATCH ?1 AND k = 1",
561 table
562 );
563 let mut stmt = conn.prepare(&probe_sql)?;
564 let mut rows = stmt.query(params![probe_embedding.as_str()])?;
565 let _ = rows.next()?;
566 }
567 }
568 Ok(())
569 }
570
571 fn is_vector_table_error(err: &rusqlite::Error) -> bool {
572 let text = err.to_string().to_lowercase();
573 text.contains("vector blob")
574 || text.contains("chunks iter error")
575 || text.contains("chunks iter")
576 || text.contains("internal sqlite-vec error")
577 || text.contains("insert rowids id")
578 || text.contains("sql logic error")
579 || text.contains("database disk image is malformed")
580 || text.contains("session_memory_vectors")
581 || text.contains("project_memory_vectors")
582 || text.contains("global_memory_vectors")
583 || text.contains("vec0")
584 }
585
586 async fn recreate_vector_tables(&self) -> MemoryResult<()> {
587 let conn = self.conn.lock().await;
588
589 for base in [
590 "session_memory_vectors",
591 "project_memory_vectors",
592 "global_memory_vectors",
593 ] {
594 for name in [
596 base.to_string(),
597 format!("{}_chunks", base),
598 format!("{}_info", base),
599 format!("{}_rowids", base),
600 format!("{}_vector_chunks00", base),
601 ] {
602 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
603 conn.execute(&sql, [])?;
604 }
605
606 let like_pattern = format!("{base}_%");
608 let mut stmt = conn.prepare(
609 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
610 )?;
611 let table_names = stmt
612 .query_map(params![like_pattern], |row| row.get::<_, String>(0))?
613 .collect::<Result<Vec<_>, _>>()?;
614 drop(stmt);
615 for name in table_names {
616 let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
617 conn.execute(&sql, [])?;
618 }
619 }
620
621 conn.execute(
622 &format!(
623 "CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
624 chunk_id TEXT PRIMARY KEY,
625 embedding float[{}]
626 )",
627 DEFAULT_EMBEDDING_DIMENSION
628 ),
629 [],
630 )?;
631
632 conn.execute(
633 &format!(
634 "CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
635 chunk_id TEXT PRIMARY KEY,
636 embedding float[{}]
637 )",
638 DEFAULT_EMBEDDING_DIMENSION
639 ),
640 [],
641 )?;
642
643 conn.execute(
644 &format!(
645 "CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
646 chunk_id TEXT PRIMARY KEY,
647 embedding float[{}]
648 )",
649 DEFAULT_EMBEDDING_DIMENSION
650 ),
651 [],
652 )?;
653
654 Ok(())
655 }
656
657 pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
660 match self.validate_vector_tables().await {
661 Ok(()) => Ok(false),
662 Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
663 tracing::warn!(
664 "Memory vector tables appear corrupted ({}). Recreating vector tables.",
665 err
666 );
667 self.recreate_vector_tables().await?;
668 Ok(true)
669 }
670 Err(err) => Err(err),
671 }
672 }
673
674 pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
678 let table_names = {
679 let conn = self.conn.lock().await;
680 let mut stmt = conn.prepare(
681 "SELECT name FROM sqlite_master
682 WHERE type='table'
683 AND name NOT LIKE 'sqlite_%'
684 ORDER BY name",
685 )?;
686 let names = stmt
687 .query_map([], |row| row.get::<_, String>(0))?
688 .collect::<Result<Vec<_>, _>>()?;
689 names
690 };
691
692 {
693 let conn = self.conn.lock().await;
694 for table in table_names {
695 let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
696 let _ = conn.execute(&sql, []);
697 }
698 }
699
700 self.init_schema().await
701 }
702
703 pub async fn try_repair_after_error(
706 &self,
707 err: &crate::types::MemoryError,
708 ) -> MemoryResult<bool> {
709 match err {
710 crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
711 tracing::warn!(
712 "Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
713 db_err
714 );
715 self.recreate_vector_tables().await?;
716 Ok(true)
717 }
718 _ => Ok(false),
719 }
720 }
721
722 pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
724 let conn = self.conn.lock().await;
725
726 let (chunks_table, vectors_table) = match chunk.tier {
727 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
728 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
729 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
730 };
731
732 let created_at_str = chunk.created_at.to_rfc3339();
733 let metadata_str = chunk
734 .metadata
735 .as_ref()
736 .map(|m| m.to_string())
737 .unwrap_or_default();
738
739 match chunk.tier {
741 MemoryTier::Session => {
742 conn.execute(
743 &format!(
744 "INSERT INTO {} (
745 id, content, session_id, project_id, source, created_at, token_count, metadata,
746 source_path, source_mtime, source_size, source_hash
747 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
748 chunks_table
749 ),
750 params![
751 chunk.id,
752 chunk.content,
753 chunk.session_id.as_ref().unwrap_or(&String::new()),
754 chunk.project_id,
755 chunk.source,
756 created_at_str,
757 chunk.token_count,
758 metadata_str,
759 chunk.source_path.clone(),
760 chunk.source_mtime,
761 chunk.source_size,
762 chunk.source_hash.clone()
763 ],
764 )?;
765 }
766 MemoryTier::Project => {
767 conn.execute(
768 &format!(
769 "INSERT INTO {} (
770 id, content, project_id, session_id, source, created_at, token_count, metadata,
771 source_path, source_mtime, source_size, source_hash
772 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
773 chunks_table
774 ),
775 params![
776 chunk.id,
777 chunk.content,
778 chunk.project_id.as_ref().unwrap_or(&String::new()),
779 chunk.session_id,
780 chunk.source,
781 created_at_str,
782 chunk.token_count,
783 metadata_str,
784 chunk.source_path.clone(),
785 chunk.source_mtime,
786 chunk.source_size,
787 chunk.source_hash.clone()
788 ],
789 )?;
790 }
791 MemoryTier::Global => {
792 conn.execute(
793 &format!(
794 "INSERT INTO {} (
795 id, content, source, created_at, token_count, metadata,
796 source_path, source_mtime, source_size, source_hash
797 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
798 chunks_table
799 ),
800 params![
801 chunk.id,
802 chunk.content,
803 chunk.source,
804 created_at_str,
805 chunk.token_count,
806 metadata_str,
807 chunk.source_path.clone(),
808 chunk.source_mtime,
809 chunk.source_size,
810 chunk.source_hash.clone()
811 ],
812 )?;
813 }
814 }
815
816 let embedding_json = format!(
818 "[{}]",
819 embedding
820 .iter()
821 .map(|f| f.to_string())
822 .collect::<Vec<_>>()
823 .join(",")
824 );
825 conn.execute(
826 &format!(
827 "INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
828 vectors_table
829 ),
830 params![chunk.id, embedding_json],
831 )?;
832
833 Ok(())
834 }
835
836 pub async fn search_similar(
838 &self,
839 query_embedding: &[f32],
840 tier: MemoryTier,
841 project_id: Option<&str>,
842 session_id: Option<&str>,
843 limit: i64,
844 ) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
845 let conn = self.conn.lock().await;
846
847 let (chunks_table, vectors_table) = match tier {
848 MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
849 MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
850 MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
851 };
852
853 let embedding_json = format!(
854 "[{}]",
855 query_embedding
856 .iter()
857 .map(|f| f.to_string())
858 .collect::<Vec<_>>()
859 .join(",")
860 );
861
862 let results = match tier {
864 MemoryTier::Session => {
865 if let Some(sid) = session_id {
866 let sql = format!(
867 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
868 c.source_path, c.source_mtime, c.source_size, c.source_hash,
869 v.distance
870 FROM {} AS v
871 JOIN {} AS c ON v.chunk_id = c.id
872 WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
873 ORDER BY v.distance",
874 vectors_table, chunks_table
875 );
876 let mut stmt = conn.prepare(&sql)?;
877 let results = stmt
878 .query_map(params![sid, embedding_json, limit], |row| {
879 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
880 })?
881 .collect::<Result<Vec<_>, _>>()?;
882 results
883 } else if let Some(pid) = project_id {
884 let sql = format!(
885 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
886 c.source_path, c.source_mtime, c.source_size, c.source_hash,
887 v.distance
888 FROM {} AS v
889 JOIN {} AS c ON v.chunk_id = c.id
890 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
891 ORDER BY v.distance",
892 vectors_table, chunks_table
893 );
894 let mut stmt = conn.prepare(&sql)?;
895 let results = stmt
896 .query_map(params![pid, embedding_json, limit], |row| {
897 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
898 })?
899 .collect::<Result<Vec<_>, _>>()?;
900 results
901 } else {
902 let sql = format!(
903 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
904 c.source_path, c.source_mtime, c.source_size, c.source_hash,
905 v.distance
906 FROM {} AS v
907 JOIN {} AS c ON v.chunk_id = c.id
908 WHERE v.embedding MATCH ?1 AND k = ?2
909 ORDER BY v.distance",
910 vectors_table, chunks_table
911 );
912 let mut stmt = conn.prepare(&sql)?;
913 let results = stmt
914 .query_map(params![embedding_json, limit], |row| {
915 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
916 })?
917 .collect::<Result<Vec<_>, _>>()?;
918 results
919 }
920 }
921 MemoryTier::Project => {
922 if let Some(pid) = project_id {
923 let sql = format!(
924 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
925 c.source_path, c.source_mtime, c.source_size, c.source_hash,
926 v.distance
927 FROM {} AS v
928 JOIN {} AS c ON v.chunk_id = c.id
929 WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
930 ORDER BY v.distance",
931 vectors_table, chunks_table
932 );
933 let mut stmt = conn.prepare(&sql)?;
934 let results = stmt
935 .query_map(params![pid, embedding_json, limit], |row| {
936 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
937 })?
938 .collect::<Result<Vec<_>, _>>()?;
939 results
940 } else {
941 let sql = format!(
942 "SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
943 c.source_path, c.source_mtime, c.source_size, c.source_hash,
944 v.distance
945 FROM {} AS v
946 JOIN {} AS c ON v.chunk_id = c.id
947 WHERE v.embedding MATCH ?1 AND k = ?2
948 ORDER BY v.distance",
949 vectors_table, chunks_table
950 );
951 let mut stmt = conn.prepare(&sql)?;
952 let results = stmt
953 .query_map(params![embedding_json, limit], |row| {
954 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
955 })?
956 .collect::<Result<Vec<_>, _>>()?;
957 results
958 }
959 }
960 MemoryTier::Global => {
961 let sql = format!(
962 "SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
963 c.source_path, c.source_mtime, c.source_size, c.source_hash,
964 v.distance
965 FROM {} AS v
966 JOIN {} AS c ON v.chunk_id = c.id
967 WHERE v.embedding MATCH ?1 AND k = ?2
968 ORDER BY v.distance",
969 vectors_table, chunks_table
970 );
971 let mut stmt = conn.prepare(&sql)?;
972 let results = stmt
973 .query_map(params![embedding_json, limit], |row| {
974 Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
975 })?
976 .collect::<Result<Vec<_>, _>>()?;
977 results
978 }
979 };
980
981 Ok(results)
982 }
983
984 pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
986 let conn = self.conn.lock().await;
987
988 let mut stmt = conn.prepare(
989 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
990 source_path, source_mtime, source_size, source_hash
991 FROM session_memory_chunks
992 WHERE session_id = ?1
993 ORDER BY created_at DESC",
994 )?;
995
996 let chunks = stmt
997 .query_map(params![session_id], |row| {
998 row_to_chunk(row, MemoryTier::Session)
999 })?
1000 .collect::<Result<Vec<_>, _>>()?;
1001
1002 Ok(chunks)
1003 }
1004
1005 pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
1007 let conn = self.conn.lock().await;
1008
1009 let mut stmt = conn.prepare(
1010 "SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
1011 source_path, source_mtime, source_size, source_hash
1012 FROM project_memory_chunks
1013 WHERE project_id = ?1
1014 ORDER BY created_at DESC",
1015 )?;
1016
1017 let chunks = stmt
1018 .query_map(params![project_id], |row| {
1019 row_to_chunk(row, MemoryTier::Project)
1020 })?
1021 .collect::<Result<Vec<_>, _>>()?;
1022
1023 Ok(chunks)
1024 }
1025
1026 pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
1028 let conn = self.conn.lock().await;
1029
1030 let mut stmt = conn.prepare(
1031 "SELECT id, content, source, created_at, token_count, metadata,
1032 source_path, source_mtime, source_size, source_hash
1033 FROM global_memory_chunks
1034 ORDER BY created_at DESC
1035 LIMIT ?1",
1036 )?;
1037
1038 let chunks = stmt
1039 .query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
1040 .collect::<Result<Vec<_>, _>>()?;
1041
1042 Ok(chunks)
1043 }
1044
1045 pub async fn global_chunk_exists_by_source_hash(
1046 &self,
1047 source_hash: &str,
1048 ) -> MemoryResult<bool> {
1049 let conn = self.conn.lock().await;
1050 let exists = conn
1051 .query_row(
1052 "SELECT 1 FROM global_memory_chunks WHERE source_hash = ?1 LIMIT 1",
1053 params![source_hash],
1054 |_row| Ok(()),
1055 )
1056 .optional()?
1057 .is_some();
1058 Ok(exists)
1059 }
1060
1061 pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
1063 let conn = self.conn.lock().await;
1064
1065 let count: i64 = conn.query_row(
1067 "SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
1068 params![session_id],
1069 |row| row.get(0),
1070 )?;
1071
1072 conn.execute(
1074 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1075 (SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
1076 params![session_id],
1077 )?;
1078
1079 conn.execute(
1081 "DELETE FROM session_memory_chunks WHERE session_id = ?1",
1082 params![session_id],
1083 )?;
1084
1085 Ok(count as u64)
1086 }
1087
1088 pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
1090 let conn = self.conn.lock().await;
1091
1092 let count: i64 = conn.query_row(
1094 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1095 params![project_id],
1096 |row| row.get(0),
1097 )?;
1098
1099 conn.execute(
1101 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1102 (SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
1103 params![project_id],
1104 )?;
1105
1106 conn.execute(
1108 "DELETE FROM project_memory_chunks WHERE project_id = ?1",
1109 params![project_id],
1110 )?;
1111
1112 Ok(count as u64)
1113 }
1114
1115 pub async fn clear_global_memory_by_source_prefix(
1117 &self,
1118 source_prefix: &str,
1119 ) -> MemoryResult<u64> {
1120 let conn = self.conn.lock().await;
1121 let like = format!("{}%", source_prefix);
1122
1123 let count: i64 = conn.query_row(
1124 "SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
1125 params![like],
1126 |row| row.get(0),
1127 )?;
1128
1129 conn.execute(
1130 "DELETE FROM global_memory_vectors WHERE chunk_id IN
1131 (SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
1132 params![like],
1133 )?;
1134
1135 conn.execute(
1136 "DELETE FROM global_memory_chunks WHERE source LIKE ?1",
1137 params![like],
1138 )?;
1139
1140 Ok(count as u64)
1141 }
1142
1143 pub async fn delete_chunk(
1145 &self,
1146 tier: MemoryTier,
1147 chunk_id: &str,
1148 project_id: Option<&str>,
1149 session_id: Option<&str>,
1150 ) -> MemoryResult<u64> {
1151 let conn = self.conn.lock().await;
1152
1153 let deleted = match tier {
1154 MemoryTier::Session => {
1155 let Some(session_id) = session_id else {
1156 return Err(MemoryError::InvalidConfig(
1157 "session_id is required to delete session memory chunks".to_string(),
1158 ));
1159 };
1160 conn.execute(
1161 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1162 (SELECT id FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2)",
1163 params![chunk_id, session_id],
1164 )?;
1165 conn.execute(
1166 "DELETE FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2",
1167 params![chunk_id, session_id],
1168 )?
1169 }
1170 MemoryTier::Project => {
1171 let Some(project_id) = project_id else {
1172 return Err(MemoryError::InvalidConfig(
1173 "project_id is required to delete project memory chunks".to_string(),
1174 ));
1175 };
1176 conn.execute(
1177 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1178 (SELECT id FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2)",
1179 params![chunk_id, project_id],
1180 )?;
1181 conn.execute(
1182 "DELETE FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2",
1183 params![chunk_id, project_id],
1184 )?
1185 }
1186 MemoryTier::Global => {
1187 conn.execute(
1188 "DELETE FROM global_memory_vectors WHERE chunk_id IN
1189 (SELECT id FROM global_memory_chunks WHERE id = ?1)",
1190 params![chunk_id],
1191 )?;
1192 conn.execute(
1193 "DELETE FROM global_memory_chunks WHERE id = ?1",
1194 params![chunk_id],
1195 )?
1196 }
1197 };
1198
1199 Ok(deleted as u64)
1200 }
1201
1202 pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
1204 let conn = self.conn.lock().await;
1205
1206 let cutoff = Utc::now() - chrono::Duration::days(retention_days);
1207 let cutoff_str = cutoff.to_rfc3339();
1208
1209 let count: i64 = conn.query_row(
1211 "SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
1212 params![cutoff_str],
1213 |row| row.get(0),
1214 )?;
1215
1216 conn.execute(
1218 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1219 (SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
1220 params![cutoff_str],
1221 )?;
1222
1223 conn.execute(
1225 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1226 params![cutoff_str],
1227 )?;
1228
1229 Ok(count as u64)
1230 }
1231
1232 pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1234 let conn = self.conn.lock().await;
1235
1236 let result: Option<MemoryConfig> = conn
1237 .query_row(
1238 "SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
1239 session_retention_days, token_budget, chunk_overlap
1240 FROM memory_config WHERE project_id = ?1",
1241 params![project_id],
1242 |row| {
1243 Ok(MemoryConfig {
1244 max_chunks: row.get(0)?,
1245 chunk_size: row.get(1)?,
1246 retrieval_k: row.get(2)?,
1247 auto_cleanup: row.get::<_, i64>(3)? != 0,
1248 session_retention_days: row.get(4)?,
1249 token_budget: row.get(5)?,
1250 chunk_overlap: row.get(6)?,
1251 })
1252 },
1253 )
1254 .optional()?;
1255
1256 match result {
1257 Some(config) => Ok(config),
1258 None => {
1259 let config = MemoryConfig::default();
1261 let updated_at = Utc::now().to_rfc3339();
1262
1263 conn.execute(
1264 "INSERT INTO memory_config
1265 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1266 session_retention_days, token_budget, chunk_overlap, updated_at)
1267 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1268 params![
1269 project_id,
1270 config.max_chunks,
1271 config.chunk_size,
1272 config.retrieval_k,
1273 config.auto_cleanup as i64,
1274 config.session_retention_days,
1275 config.token_budget,
1276 config.chunk_overlap,
1277 updated_at
1278 ],
1279 )?;
1280
1281 Ok(config)
1282 }
1283 }
1284 }
1285
1286 pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1288 let conn = self.conn.lock().await;
1289
1290 let updated_at = Utc::now().to_rfc3339();
1291
1292 conn.execute(
1293 "INSERT OR REPLACE INTO memory_config
1294 (project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
1295 session_retention_days, token_budget, chunk_overlap, updated_at)
1296 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1297 params![
1298 project_id,
1299 config.max_chunks,
1300 config.chunk_size,
1301 config.retrieval_k,
1302 config.auto_cleanup as i64,
1303 config.session_retention_days,
1304 config.token_budget,
1305 config.chunk_overlap,
1306 updated_at
1307 ],
1308 )?;
1309
1310 Ok(())
1311 }
1312
1313 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1315 let conn = self.conn.lock().await;
1316
1317 let session_chunks: i64 =
1319 conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
1320 row.get(0)
1321 })?;
1322
1323 let project_chunks: i64 =
1324 conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
1325 row.get(0)
1326 })?;
1327
1328 let global_chunks: i64 =
1329 conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
1330 row.get(0)
1331 })?;
1332
1333 let session_bytes: i64 = conn.query_row(
1335 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
1336 [],
1337 |row| row.get(0),
1338 )?;
1339
1340 let project_bytes: i64 = conn.query_row(
1341 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
1342 [],
1343 |row| row.get(0),
1344 )?;
1345
1346 let global_bytes: i64 = conn.query_row(
1347 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
1348 [],
1349 |row| row.get(0),
1350 )?;
1351
1352 let last_cleanup: Option<String> = conn
1354 .query_row(
1355 "SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
1356 [],
1357 |row| row.get(0),
1358 )
1359 .optional()?;
1360
1361 let last_cleanup = last_cleanup.and_then(|s| {
1362 DateTime::parse_from_rfc3339(&s)
1363 .ok()
1364 .map(|dt| dt.with_timezone(&Utc))
1365 });
1366
1367 let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
1369
1370 Ok(MemoryStats {
1371 total_chunks: session_chunks + project_chunks + global_chunks,
1372 session_chunks,
1373 project_chunks,
1374 global_chunks,
1375 total_bytes: session_bytes + project_bytes + global_bytes,
1376 session_bytes,
1377 project_bytes,
1378 global_bytes,
1379 file_size,
1380 last_cleanup,
1381 })
1382 }
1383
1384 pub async fn log_cleanup(
1386 &self,
1387 cleanup_type: &str,
1388 tier: MemoryTier,
1389 project_id: Option<&str>,
1390 session_id: Option<&str>,
1391 chunks_deleted: i64,
1392 bytes_reclaimed: i64,
1393 ) -> MemoryResult<()> {
1394 let conn = self.conn.lock().await;
1395
1396 let id = uuid::Uuid::new_v4().to_string();
1397 let created_at = Utc::now().to_rfc3339();
1398
1399 conn.execute(
1400 "INSERT INTO memory_cleanup_log
1401 (id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
1402 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1403 params![
1404 id,
1405 cleanup_type,
1406 tier.to_string(),
1407 project_id,
1408 session_id,
1409 chunks_deleted,
1410 bytes_reclaimed,
1411 created_at
1412 ],
1413 )?;
1414
1415 Ok(())
1416 }
1417
1418 pub async fn vacuum(&self) -> MemoryResult<()> {
1420 let conn = self.conn.lock().await;
1421 conn.execute("VACUUM", [])?;
1422 Ok(())
1423 }
1424
1425 pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
1430 let conn = self.conn.lock().await;
1431 let n: i64 = conn.query_row(
1432 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1433 params![project_id],
1434 |row| row.get(0),
1435 )?;
1436 Ok(n)
1437 }
1438
1439 pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
1440 let conn = self.conn.lock().await;
1441 let exists: Option<i64> = conn
1442 .query_row(
1443 "SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
1444 params![project_id],
1445 |row| row.get(0),
1446 )
1447 .optional()?;
1448 Ok(exists.is_some())
1449 }
1450
1451 pub async fn get_file_index_entry(
1452 &self,
1453 project_id: &str,
1454 path: &str,
1455 ) -> MemoryResult<Option<(i64, i64, String)>> {
1456 let conn = self.conn.lock().await;
1457 let row: Option<(i64, i64, String)> = conn
1458 .query_row(
1459 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1460 params![project_id, path],
1461 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1462 )
1463 .optional()?;
1464 Ok(row)
1465 }
1466
1467 pub async fn upsert_file_index_entry(
1468 &self,
1469 project_id: &str,
1470 path: &str,
1471 mtime: i64,
1472 size: i64,
1473 hash: &str,
1474 ) -> MemoryResult<()> {
1475 let conn = self.conn.lock().await;
1476 let indexed_at = Utc::now().to_rfc3339();
1477 conn.execute(
1478 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1479 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1480 ON CONFLICT(project_id, path) DO UPDATE SET
1481 mtime = excluded.mtime,
1482 size = excluded.size,
1483 hash = excluded.hash,
1484 indexed_at = excluded.indexed_at",
1485 params![project_id, path, mtime, size, hash, indexed_at],
1486 )?;
1487 Ok(())
1488 }
1489
1490 pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
1491 let conn = self.conn.lock().await;
1492 conn.execute(
1493 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1494 params![project_id, path],
1495 )?;
1496 Ok(())
1497 }
1498
1499 pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
1500 let conn = self.conn.lock().await;
1501 let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1502 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1503 Ok(rows.collect::<Result<Vec<_>, _>>()?)
1504 }
1505
1506 pub async fn delete_project_file_chunks_by_path(
1507 &self,
1508 project_id: &str,
1509 source_path: &str,
1510 ) -> MemoryResult<(i64, i64)> {
1511 let conn = self.conn.lock().await;
1512
1513 let chunks_deleted: i64 = conn.query_row(
1514 "SELECT COUNT(*) FROM project_memory_chunks
1515 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1516 params![project_id, source_path],
1517 |row| row.get(0),
1518 )?;
1519
1520 let bytes_estimated: i64 = conn.query_row(
1521 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1522 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1523 params![project_id, source_path],
1524 |row| row.get(0),
1525 )?;
1526
1527 conn.execute(
1529 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1530 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1531 params![project_id, source_path],
1532 )?;
1533
1534 conn.execute(
1535 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1536 params![project_id, source_path],
1537 )?;
1538
1539 Ok((chunks_deleted, bytes_estimated))
1540 }
1541
1542 pub async fn get_import_index_entry(
1543 &self,
1544 tier: MemoryTier,
1545 session_id: Option<&str>,
1546 project_id: Option<&str>,
1547 path: &str,
1548 ) -> MemoryResult<Option<(i64, i64, String)>> {
1549 let conn = self.conn.lock().await;
1550 let row = match tier {
1551 MemoryTier::Session => {
1552 let session_id = require_scope_id(tier, session_id)?;
1553 conn.query_row(
1554 "SELECT mtime, size, hash FROM session_file_index WHERE session_id = ?1 AND path = ?2",
1555 params![session_id, path],
1556 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1557 )
1558 .optional()?
1559 }
1560 MemoryTier::Project => {
1561 let project_id = require_scope_id(tier, project_id)?;
1562 conn.query_row(
1563 "SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1564 params![project_id, path],
1565 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1566 )
1567 .optional()?
1568 }
1569 MemoryTier::Global => conn
1570 .query_row(
1571 "SELECT mtime, size, hash FROM global_file_index WHERE path = ?1",
1572 params![path],
1573 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1574 )
1575 .optional()?,
1576 };
1577 Ok(row)
1578 }
1579
1580 pub async fn upsert_import_index_entry(
1581 &self,
1582 tier: MemoryTier,
1583 session_id: Option<&str>,
1584 project_id: Option<&str>,
1585 path: &str,
1586 mtime: i64,
1587 size: i64,
1588 hash: &str,
1589 ) -> MemoryResult<()> {
1590 let conn = self.conn.lock().await;
1591 let indexed_at = Utc::now().to_rfc3339();
1592 match tier {
1593 MemoryTier::Session => {
1594 let session_id = require_scope_id(tier, session_id)?;
1595 conn.execute(
1596 "INSERT INTO session_file_index (session_id, path, mtime, size, hash, indexed_at)
1597 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1598 ON CONFLICT(session_id, path) DO UPDATE SET
1599 mtime = excluded.mtime,
1600 size = excluded.size,
1601 hash = excluded.hash,
1602 indexed_at = excluded.indexed_at",
1603 params![session_id, path, mtime, size, hash, indexed_at],
1604 )?;
1605 }
1606 MemoryTier::Project => {
1607 let project_id = require_scope_id(tier, project_id)?;
1608 conn.execute(
1609 "INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
1610 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1611 ON CONFLICT(project_id, path) DO UPDATE SET
1612 mtime = excluded.mtime,
1613 size = excluded.size,
1614 hash = excluded.hash,
1615 indexed_at = excluded.indexed_at",
1616 params![project_id, path, mtime, size, hash, indexed_at],
1617 )?;
1618 }
1619 MemoryTier::Global => {
1620 conn.execute(
1621 "INSERT INTO global_file_index (path, mtime, size, hash, indexed_at)
1622 VALUES (?1, ?2, ?3, ?4, ?5)
1623 ON CONFLICT(path) DO UPDATE SET
1624 mtime = excluded.mtime,
1625 size = excluded.size,
1626 hash = excluded.hash,
1627 indexed_at = excluded.indexed_at",
1628 params![path, mtime, size, hash, indexed_at],
1629 )?;
1630 }
1631 }
1632 Ok(())
1633 }
1634
1635 pub async fn list_import_index_paths(
1636 &self,
1637 tier: MemoryTier,
1638 session_id: Option<&str>,
1639 project_id: Option<&str>,
1640 ) -> MemoryResult<Vec<String>> {
1641 let conn = self.conn.lock().await;
1642 let rows = match tier {
1643 MemoryTier::Session => {
1644 let session_id = require_scope_id(tier, session_id)?;
1645 let mut stmt =
1646 conn.prepare("SELECT path FROM session_file_index WHERE session_id = ?1")?;
1647 let rows = stmt.query_map(params![session_id], |row| row.get::<_, String>(0))?;
1648 rows.collect::<Result<Vec<_>, _>>()?
1649 }
1650 MemoryTier::Project => {
1651 let project_id = require_scope_id(tier, project_id)?;
1652 let mut stmt =
1653 conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
1654 let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
1655 rows.collect::<Result<Vec<_>, _>>()?
1656 }
1657 MemoryTier::Global => {
1658 let mut stmt = conn.prepare("SELECT path FROM global_file_index")?;
1659 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
1660 rows.collect::<Result<Vec<_>, _>>()?
1661 }
1662 };
1663 Ok(rows)
1664 }
1665
1666 pub async fn delete_import_index_entry(
1667 &self,
1668 tier: MemoryTier,
1669 session_id: Option<&str>,
1670 project_id: Option<&str>,
1671 path: &str,
1672 ) -> MemoryResult<()> {
1673 let conn = self.conn.lock().await;
1674 match tier {
1675 MemoryTier::Session => {
1676 let session_id = require_scope_id(tier, session_id)?;
1677 conn.execute(
1678 "DELETE FROM session_file_index WHERE session_id = ?1 AND path = ?2",
1679 params![session_id, path],
1680 )?;
1681 }
1682 MemoryTier::Project => {
1683 let project_id = require_scope_id(tier, project_id)?;
1684 conn.execute(
1685 "DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
1686 params![project_id, path],
1687 )?;
1688 }
1689 MemoryTier::Global => {
1690 conn.execute(
1691 "DELETE FROM global_file_index WHERE path = ?1",
1692 params![path],
1693 )?;
1694 }
1695 }
1696 Ok(())
1697 }
1698
1699 pub async fn delete_file_chunks_by_path(
1700 &self,
1701 tier: MemoryTier,
1702 session_id: Option<&str>,
1703 project_id: Option<&str>,
1704 source_path: &str,
1705 ) -> MemoryResult<(i64, i64)> {
1706 let conn = self.conn.lock().await;
1707 let result = match tier {
1708 MemoryTier::Session => {
1709 let session_id = require_scope_id(tier, session_id)?;
1710 let chunks_deleted: i64 = conn.query_row(
1711 "SELECT COUNT(*) FROM session_memory_chunks
1712 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1713 params![session_id, source_path],
1714 |row| row.get(0),
1715 )?;
1716 let bytes_estimated: i64 = conn.query_row(
1717 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
1718 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1719 params![session_id, source_path],
1720 |row| row.get(0),
1721 )?;
1722 conn.execute(
1723 "DELETE FROM session_memory_vectors WHERE chunk_id IN
1724 (SELECT id FROM session_memory_chunks WHERE session_id = ?1 AND source = 'file' AND source_path = ?2)",
1725 params![session_id, source_path],
1726 )?;
1727 conn.execute(
1728 "DELETE FROM session_memory_chunks
1729 WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
1730 params![session_id, source_path],
1731 )?;
1732 (chunks_deleted, bytes_estimated)
1733 }
1734 MemoryTier::Project => {
1735 let project_id = require_scope_id(tier, project_id)?;
1736 let chunks_deleted: i64 = conn.query_row(
1737 "SELECT COUNT(*) FROM project_memory_chunks
1738 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1739 params![project_id, source_path],
1740 |row| row.get(0),
1741 )?;
1742 let bytes_estimated: i64 = conn.query_row(
1743 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
1744 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1745 params![project_id, source_path],
1746 |row| row.get(0),
1747 )?;
1748 conn.execute(
1749 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1750 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
1751 params![project_id, source_path],
1752 )?;
1753 conn.execute(
1754 "DELETE FROM project_memory_chunks
1755 WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
1756 params![project_id, source_path],
1757 )?;
1758 (chunks_deleted, bytes_estimated)
1759 }
1760 MemoryTier::Global => {
1761 let chunks_deleted: i64 = conn.query_row(
1762 "SELECT COUNT(*) FROM global_memory_chunks
1763 WHERE source = 'file' AND source_path = ?1",
1764 params![source_path],
1765 |row| row.get(0),
1766 )?;
1767 let bytes_estimated: i64 = conn.query_row(
1768 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
1769 WHERE source = 'file' AND source_path = ?1",
1770 params![source_path],
1771 |row| row.get(0),
1772 )?;
1773 conn.execute(
1774 "DELETE FROM global_memory_vectors WHERE chunk_id IN
1775 (SELECT id FROM global_memory_chunks WHERE source = 'file' AND source_path = ?1)",
1776 params![source_path],
1777 )?;
1778 conn.execute(
1779 "DELETE FROM global_memory_chunks
1780 WHERE source = 'file' AND source_path = ?1",
1781 params![source_path],
1782 )?;
1783 (chunks_deleted, bytes_estimated)
1784 }
1785 };
1786 Ok(result)
1787 }
1788
1789 pub async fn upsert_project_index_status(
1790 &self,
1791 project_id: &str,
1792 total_files: i64,
1793 processed_files: i64,
1794 indexed_files: i64,
1795 skipped_files: i64,
1796 errors: i64,
1797 ) -> MemoryResult<()> {
1798 let conn = self.conn.lock().await;
1799 let last_indexed_at = Utc::now().to_rfc3339();
1800 conn.execute(
1801 "INSERT INTO project_index_status (
1802 project_id, last_indexed_at, last_total_files, last_processed_files,
1803 last_indexed_files, last_skipped_files, last_errors
1804 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1805 ON CONFLICT(project_id) DO UPDATE SET
1806 last_indexed_at = excluded.last_indexed_at,
1807 last_total_files = excluded.last_total_files,
1808 last_processed_files = excluded.last_processed_files,
1809 last_indexed_files = excluded.last_indexed_files,
1810 last_skipped_files = excluded.last_skipped_files,
1811 last_errors = excluded.last_errors",
1812 params![
1813 project_id,
1814 last_indexed_at,
1815 total_files,
1816 processed_files,
1817 indexed_files,
1818 skipped_files,
1819 errors
1820 ],
1821 )?;
1822 Ok(())
1823 }
1824
1825 pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
1826 let conn = self.conn.lock().await;
1827
1828 let project_chunks: i64 = conn.query_row(
1829 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
1830 params![project_id],
1831 |row| row.get(0),
1832 )?;
1833
1834 let project_bytes: i64 = conn.query_row(
1835 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
1836 params![project_id],
1837 |row| row.get(0),
1838 )?;
1839
1840 let file_index_chunks: i64 = conn.query_row(
1841 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1842 params![project_id],
1843 |row| row.get(0),
1844 )?;
1845
1846 let file_index_bytes: i64 = conn.query_row(
1847 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1848 params![project_id],
1849 |row| row.get(0),
1850 )?;
1851
1852 let indexed_files: i64 = conn.query_row(
1853 "SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
1854 params![project_id],
1855 |row| row.get(0),
1856 )?;
1857
1858 let status_row: Option<ProjectIndexStatusRow> =
1859 conn
1860 .query_row(
1861 "SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
1862 FROM project_index_status WHERE project_id = ?1",
1863 params![project_id],
1864 |row| {
1865 Ok((
1866 row.get(0)?,
1867 row.get(1)?,
1868 row.get(2)?,
1869 row.get(3)?,
1870 row.get(4)?,
1871 row.get(5)?,
1872 ))
1873 },
1874 )
1875 .optional()?;
1876
1877 let (
1878 last_indexed_at,
1879 last_total_files,
1880 last_processed_files,
1881 last_indexed_files,
1882 last_skipped_files,
1883 last_errors,
1884 ) = status_row.unwrap_or((None, None, None, None, None, None));
1885
1886 let last_indexed_at = last_indexed_at.and_then(|s| {
1887 DateTime::parse_from_rfc3339(&s)
1888 .ok()
1889 .map(|dt| dt.with_timezone(&Utc))
1890 });
1891
1892 Ok(ProjectMemoryStats {
1893 project_id: project_id.to_string(),
1894 project_chunks,
1895 project_bytes,
1896 file_index_chunks,
1897 file_index_bytes,
1898 indexed_files,
1899 last_indexed_at,
1900 last_total_files,
1901 last_processed_files,
1902 last_indexed_files,
1903 last_skipped_files,
1904 last_errors,
1905 })
1906 }
1907
1908 pub async fn clear_project_file_index(
1909 &self,
1910 project_id: &str,
1911 vacuum: bool,
1912 ) -> MemoryResult<ClearFileIndexResult> {
1913 let conn = self.conn.lock().await;
1914
1915 let chunks_deleted: i64 = conn.query_row(
1916 "SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1917 params![project_id],
1918 |row| row.get(0),
1919 )?;
1920
1921 let bytes_estimated: i64 = conn.query_row(
1922 "SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1923 params![project_id],
1924 |row| row.get(0),
1925 )?;
1926
1927 conn.execute(
1929 "DELETE FROM project_memory_vectors WHERE chunk_id IN
1930 (SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
1931 params![project_id],
1932 )?;
1933
1934 conn.execute(
1936 "DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
1937 params![project_id],
1938 )?;
1939
1940 conn.execute(
1942 "DELETE FROM project_file_index WHERE project_id = ?1",
1943 params![project_id],
1944 )?;
1945 conn.execute(
1946 "DELETE FROM project_index_status WHERE project_id = ?1",
1947 params![project_id],
1948 )?;
1949
1950 drop(conn); if vacuum {
1953 self.vacuum().await?;
1954 }
1955
1956 Ok(ClearFileIndexResult {
1957 chunks_deleted,
1958 bytes_estimated,
1959 did_vacuum: vacuum,
1960 })
1961 }
1962
1963 pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
1975 if retention_days == 0 {
1976 return Ok(0);
1977 }
1978
1979 let conn = self.conn.lock().await;
1980
1981 let cutoff =
1983 (chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
1984
1985 conn.execute(
1987 "DELETE FROM session_memory_vectors
1988 WHERE chunk_id IN (
1989 SELECT id FROM session_memory_chunks WHERE created_at < ?1
1990 )",
1991 params![cutoff],
1992 )?;
1993
1994 let deleted = conn.execute(
1995 "DELETE FROM session_memory_chunks WHERE created_at < ?1",
1996 params![cutoff],
1997 )?;
1998
1999 if deleted > 0 {
2000 tracing::info!(
2001 retention_days,
2002 deleted,
2003 "memory hygiene: pruned old session chunks"
2004 );
2005 }
2006
2007 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2008 Ok(deleted as u64)
2009 }
2010
2011 pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
2017 let retention_days = if env_override_days > 0 {
2019 env_override_days
2020 } else {
2021 let conn = self.conn.lock().await;
2023 let days: Option<i64> = conn
2024 .query_row(
2025 "SELECT session_retention_days FROM memory_config
2026 WHERE project_id = '__global__' LIMIT 1",
2027 [],
2028 |row| row.get(0),
2029 )
2030 .ok();
2031 drop(conn);
2032 days.unwrap_or(30) as u32
2033 };
2034
2035 self.prune_old_session_chunks(retention_days).await
2036 }
2037
2038 pub async fn put_global_memory_record(
2039 &self,
2040 record: &GlobalMemoryRecord,
2041 ) -> MemoryResult<GlobalMemoryWriteResult> {
2042 let conn = self.conn.lock().await;
2043
2044 let existing: Option<String> = conn
2045 .query_row(
2046 "SELECT id FROM memory_records
2047 WHERE user_id = ?1
2048 AND source_type = ?2
2049 AND content_hash = ?3
2050 AND run_id = ?4
2051 AND IFNULL(session_id, '') = IFNULL(?5, '')
2052 AND IFNULL(message_id, '') = IFNULL(?6, '')
2053 AND IFNULL(tool_name, '') = IFNULL(?7, '')
2054 LIMIT 1",
2055 params![
2056 record.user_id,
2057 record.source_type,
2058 record.content_hash,
2059 record.run_id,
2060 record.session_id,
2061 record.message_id,
2062 record.tool_name
2063 ],
2064 |row| row.get(0),
2065 )
2066 .optional()?;
2067
2068 if let Some(id) = existing {
2069 return Ok(GlobalMemoryWriteResult {
2070 id,
2071 stored: false,
2072 deduped: true,
2073 });
2074 }
2075
2076 let metadata = record
2077 .metadata
2078 .as_ref()
2079 .map(ToString::to_string)
2080 .unwrap_or_default();
2081 let provenance = record
2082 .provenance
2083 .as_ref()
2084 .map(ToString::to_string)
2085 .unwrap_or_default();
2086 conn.execute(
2087 "INSERT INTO memory_records(
2088 id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
2089 project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
2090 visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
2091 ) VALUES (
2092 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
2093 ?10, ?11, ?12, ?13, ?14, ?15, ?16,
2094 ?17, ?18, ?19, ?20, ?21, ?22
2095 )",
2096 params![
2097 record.id,
2098 record.user_id,
2099 record.source_type,
2100 record.content,
2101 record.content_hash,
2102 record.run_id,
2103 record.session_id,
2104 record.message_id,
2105 record.tool_name,
2106 record.project_tag,
2107 record.channel_tag,
2108 record.host_tag,
2109 metadata,
2110 provenance,
2111 record.redaction_status,
2112 i64::from(record.redaction_count),
2113 record.visibility,
2114 if record.demoted { 1i64 } else { 0i64 },
2115 record.score_boost,
2116 record.created_at_ms as i64,
2117 record.updated_at_ms as i64,
2118 record.expires_at_ms.map(|v| v as i64),
2119 ],
2120 )?;
2121
2122 Ok(GlobalMemoryWriteResult {
2123 id: record.id.clone(),
2124 stored: true,
2125 deduped: false,
2126 })
2127 }
2128
2129 #[allow(clippy::too_many_arguments)]
2130 pub async fn search_global_memory(
2131 &self,
2132 user_id: &str,
2133 query: &str,
2134 limit: i64,
2135 project_tag: Option<&str>,
2136 channel_tag: Option<&str>,
2137 host_tag: Option<&str>,
2138 ) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
2139 let conn = self.conn.lock().await;
2140 let now_ms = chrono::Utc::now().timestamp_millis();
2141 let mut hits = Vec::new();
2142
2143 let fts_query = build_fts_query(query);
2144 let search_limit = limit.clamp(1, 100);
2145 let maybe_rows = conn.prepare(
2146 "SELECT
2147 m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
2148 m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
2149 m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
2150 m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
2151 bm25(memory_records_fts) AS rank
2152 FROM memory_records_fts
2153 JOIN memory_records m ON m.id = memory_records_fts.id
2154 WHERE memory_records_fts MATCH ?1
2155 AND m.user_id = ?2
2156 AND m.demoted = 0
2157 AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
2158 AND (?4 IS NULL OR m.project_tag = ?4)
2159 AND (?5 IS NULL OR m.channel_tag = ?5)
2160 AND (?6 IS NULL OR m.host_tag = ?6)
2161 ORDER BY rank ASC
2162 LIMIT ?7"
2163 );
2164
2165 if let Ok(mut stmt) = maybe_rows {
2166 let rows = stmt.query_map(
2167 params![
2168 fts_query,
2169 user_id,
2170 now_ms,
2171 project_tag,
2172 channel_tag,
2173 host_tag,
2174 search_limit
2175 ],
2176 |row| {
2177 let record = row_to_global_record(row)?;
2178 let rank = row.get::<_, f64>(22)?;
2179 let score = 1.0 / (1.0 + rank.max(0.0));
2180 Ok(GlobalMemorySearchHit { record, score })
2181 },
2182 )?;
2183 for row in rows {
2184 hits.push(row?);
2185 }
2186 }
2187
2188 if !hits.is_empty() {
2189 return Ok(hits);
2190 }
2191
2192 let like = format!("%{}%", query.trim());
2193 let mut stmt = conn.prepare(
2194 "SELECT
2195 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2196 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2197 redaction_status, redaction_count, visibility, demoted, score_boost,
2198 created_at_ms, updated_at_ms, expires_at_ms
2199 FROM memory_records
2200 WHERE user_id = ?1
2201 AND demoted = 0
2202 AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
2203 AND (?3 IS NULL OR project_tag = ?3)
2204 AND (?4 IS NULL OR channel_tag = ?4)
2205 AND (?5 IS NULL OR host_tag = ?5)
2206 AND (?6 = '' OR content LIKE ?7)
2207 ORDER BY created_at_ms DESC
2208 LIMIT ?8",
2209 )?;
2210 let rows = stmt.query_map(
2211 params![
2212 user_id,
2213 now_ms,
2214 project_tag,
2215 channel_tag,
2216 host_tag,
2217 query.trim(),
2218 like,
2219 search_limit
2220 ],
2221 |row| {
2222 let record = row_to_global_record(row)?;
2223 Ok(GlobalMemorySearchHit {
2224 record,
2225 score: 0.25,
2226 })
2227 },
2228 )?;
2229 for row in rows {
2230 hits.push(row?);
2231 }
2232
2233 Ok(hits)
2234 }
2235
2236 pub async fn list_global_memory(
2237 &self,
2238 user_id: &str,
2239 q: Option<&str>,
2240 project_tag: Option<&str>,
2241 channel_tag: Option<&str>,
2242 limit: i64,
2243 offset: i64,
2244 ) -> MemoryResult<Vec<GlobalMemoryRecord>> {
2245 let conn = self.conn.lock().await;
2246 let query = q.unwrap_or("").trim();
2247 let like = format!("%{}%", query);
2248 let mut stmt = conn.prepare(
2249 "SELECT
2250 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2251 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2252 redaction_status, redaction_count, visibility, demoted, score_boost,
2253 created_at_ms, updated_at_ms, expires_at_ms
2254 FROM memory_records
2255 WHERE user_id = ?1
2256 AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
2257 AND (?4 IS NULL OR project_tag = ?4)
2258 AND (?5 IS NULL OR channel_tag = ?5)
2259 ORDER BY created_at_ms DESC
2260 LIMIT ?6 OFFSET ?7",
2261 )?;
2262 let rows = stmt.query_map(
2263 params![
2264 user_id,
2265 query,
2266 like,
2267 project_tag,
2268 channel_tag,
2269 limit.clamp(1, 1000),
2270 offset.max(0)
2271 ],
2272 row_to_global_record,
2273 )?;
2274 let mut out = Vec::new();
2275 for row in rows {
2276 out.push(row?);
2277 }
2278 Ok(out)
2279 }
2280
2281 pub async fn set_global_memory_visibility(
2282 &self,
2283 id: &str,
2284 visibility: &str,
2285 demoted: bool,
2286 ) -> MemoryResult<bool> {
2287 let conn = self.conn.lock().await;
2288 let now_ms = chrono::Utc::now().timestamp_millis();
2289 let changed = conn.execute(
2290 "UPDATE memory_records
2291 SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
2292 WHERE id = ?1",
2293 params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
2294 )?;
2295 Ok(changed > 0)
2296 }
2297
2298 pub async fn update_global_memory_context(
2299 &self,
2300 id: &str,
2301 visibility: &str,
2302 demoted: bool,
2303 metadata: Option<&serde_json::Value>,
2304 provenance: Option<&serde_json::Value>,
2305 ) -> MemoryResult<bool> {
2306 let conn = self.conn.lock().await;
2307 let now_ms = chrono::Utc::now().timestamp_millis();
2308 let metadata = metadata.map(ToString::to_string).unwrap_or_default();
2309 let provenance = provenance.map(ToString::to_string).unwrap_or_default();
2310 let changed = conn.execute(
2311 "UPDATE memory_records
2312 SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
2313 WHERE id = ?1",
2314 params![
2315 id,
2316 visibility,
2317 if demoted { 1i64 } else { 0i64 },
2318 metadata,
2319 provenance,
2320 now_ms,
2321 ],
2322 )?;
2323 Ok(changed > 0)
2324 }
2325
2326 pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
2327 let conn = self.conn.lock().await;
2328 let mut stmt = conn.prepare(
2329 "SELECT
2330 id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
2331 tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
2332 redaction_status, redaction_count, visibility, demoted, score_boost,
2333 created_at_ms, updated_at_ms, expires_at_ms
2334 FROM memory_records
2335 WHERE id = ?1
2336 LIMIT 1",
2337 )?;
2338 let record = stmt
2339 .query_row(params![id], row_to_global_record)
2340 .optional()?;
2341 Ok(record)
2342 }
2343
2344 pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
2345 let conn = self.conn.lock().await;
2346 let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
2347 Ok(changed > 0)
2348 }
2349}
2350
2351fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
2353 let id: String = row.get(0)?;
2354 let content: String = row.get(1)?;
2355 let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
2356 match tier {
2357 MemoryTier::Session => (
2358 Some(row.get(2)?),
2359 row.get(3)?,
2360 4usize,
2361 5usize,
2362 6usize,
2363 7usize,
2364 ),
2365 MemoryTier::Project => (
2366 row.get(2)?,
2367 Some(row.get(3)?),
2368 4usize,
2369 5usize,
2370 6usize,
2371 7usize,
2372 ),
2373 MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
2374 };
2375
2376 let source: String = row.get(source_idx)?;
2377 let created_at_str: String = row.get(created_at_idx)?;
2378 let token_count: i64 = row.get(token_count_idx)?;
2379 let metadata_str: Option<String> = row.get(metadata_idx)?;
2380
2381 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
2382 .map_err(|e| {
2383 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
2384 })?
2385 .with_timezone(&Utc);
2386
2387 let metadata = metadata_str
2388 .filter(|s| !s.is_empty())
2389 .and_then(|s| serde_json::from_str(&s).ok());
2390
2391 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
2392 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
2393 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
2394 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
2395
2396 Ok(MemoryChunk {
2397 id,
2398 content,
2399 tier,
2400 session_id,
2401 project_id,
2402 source,
2403 source_path,
2404 source_mtime,
2405 source_size,
2406 source_hash,
2407 created_at,
2408 token_count,
2409 metadata,
2410 })
2411}
2412
2413fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
2414 scope
2415 .filter(|value| !value.trim().is_empty())
2416 .ok_or_else(|| {
2417 crate::types::MemoryError::InvalidConfig(match tier {
2418 MemoryTier::Session => "tier=session requires session_id".to_string(),
2419 MemoryTier::Project => "tier=project requires project_id".to_string(),
2420 MemoryTier::Global => "tier=global does not require a scope id".to_string(),
2421 })
2422 })
2423}
2424
2425fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
2426 let metadata_str: Option<String> = row.get(12)?;
2427 let provenance_str: Option<String> = row.get(13)?;
2428 Ok(GlobalMemoryRecord {
2429 id: row.get(0)?,
2430 user_id: row.get(1)?,
2431 source_type: row.get(2)?,
2432 content: row.get(3)?,
2433 content_hash: row.get(4)?,
2434 run_id: row.get(5)?,
2435 session_id: row.get(6)?,
2436 message_id: row.get(7)?,
2437 tool_name: row.get(8)?,
2438 project_tag: row.get(9)?,
2439 channel_tag: row.get(10)?,
2440 host_tag: row.get(11)?,
2441 metadata: metadata_str
2442 .filter(|s| !s.is_empty())
2443 .and_then(|s| serde_json::from_str(&s).ok()),
2444 provenance: provenance_str
2445 .filter(|s| !s.is_empty())
2446 .and_then(|s| serde_json::from_str(&s).ok()),
2447 redaction_status: row.get(14)?,
2448 redaction_count: row.get::<_, i64>(15)? as u32,
2449 visibility: row.get(16)?,
2450 demoted: row.get::<_, i64>(17)? != 0,
2451 score_boost: row.get(18)?,
2452 created_at_ms: row.get::<_, i64>(19)? as u64,
2453 updated_at_ms: row.get::<_, i64>(20)? as u64,
2454 expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
2455 })
2456}
2457
2458impl MemoryDatabase {
2459 pub async fn get_node_by_uri(
2460 &self,
2461 uri: &str,
2462 ) -> MemoryResult<Option<crate::types::MemoryNode>> {
2463 let conn = self.conn.lock().await;
2464 let mut stmt = conn.prepare(
2465 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2466 FROM memory_nodes WHERE uri = ?1",
2467 )?;
2468
2469 let result = stmt.query_row(params![uri], |row| {
2470 let node_type_str: String = row.get(3)?;
2471 let node_type = node_type_str
2472 .parse()
2473 .unwrap_or(crate::types::NodeType::File);
2474 let metadata_str: Option<String> = row.get(6)?;
2475 Ok(crate::types::MemoryNode {
2476 id: row.get(0)?,
2477 uri: row.get(1)?,
2478 parent_uri: row.get(2)?,
2479 node_type,
2480 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2481 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2482 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2483 })
2484 });
2485
2486 match result {
2487 Ok(node) => Ok(Some(node)),
2488 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2489 Err(e) => Err(MemoryError::Database(e)),
2490 }
2491 }
2492
2493 pub async fn create_node(
2494 &self,
2495 uri: &str,
2496 parent_uri: Option<&str>,
2497 node_type: crate::types::NodeType,
2498 metadata: Option<&serde_json::Value>,
2499 ) -> MemoryResult<String> {
2500 let id = uuid::Uuid::new_v4().to_string();
2501 let now = Utc::now().to_rfc3339();
2502 let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
2503
2504 let conn = self.conn.lock().await;
2505 conn.execute(
2506 "INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
2507 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2508 params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
2509 )?;
2510
2511 Ok(id)
2512 }
2513
2514 pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
2515 let conn = self.conn.lock().await;
2516 let mut stmt = conn.prepare(
2517 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
2518 FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
2519 )?;
2520
2521 let rows = stmt.query_map(params![uri], |row| {
2522 let node_type_str: String = row.get(3)?;
2523 let node_type = node_type_str
2524 .parse()
2525 .unwrap_or(crate::types::NodeType::File);
2526 let metadata_str: Option<String> = row.get(6)?;
2527 Ok(crate::types::MemoryNode {
2528 id: row.get(0)?,
2529 uri: row.get(1)?,
2530 parent_uri: row.get(2)?,
2531 node_type,
2532 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
2533 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
2534 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
2535 })
2536 })?;
2537
2538 rows.collect::<Result<Vec<_>, _>>()
2539 .map_err(MemoryError::Database)
2540 }
2541
2542 pub async fn get_layer(
2543 &self,
2544 node_id: &str,
2545 layer_type: crate::types::LayerType,
2546 ) -> MemoryResult<Option<crate::types::MemoryLayer>> {
2547 let conn = self.conn.lock().await;
2548 let mut stmt = conn.prepare(
2549 "SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
2550 FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
2551 )?;
2552
2553 let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
2554 let layer_type_str: String = row.get(2)?;
2555 let layer_type = layer_type_str
2556 .parse()
2557 .unwrap_or(crate::types::LayerType::L2);
2558 Ok(crate::types::MemoryLayer {
2559 id: row.get(0)?,
2560 node_id: row.get(1)?,
2561 layer_type,
2562 content: row.get(3)?,
2563 token_count: row.get(4)?,
2564 embedding_id: row.get(5)?,
2565 created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
2566 source_chunk_id: row.get(7)?,
2567 })
2568 });
2569
2570 match result {
2571 Ok(layer) => Ok(Some(layer)),
2572 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2573 Err(e) => Err(MemoryError::Database(e)),
2574 }
2575 }
2576
2577 pub async fn create_layer(
2578 &self,
2579 node_id: &str,
2580 layer_type: crate::types::LayerType,
2581 content: &str,
2582 token_count: i64,
2583 source_chunk_id: Option<&str>,
2584 ) -> MemoryResult<String> {
2585 let id = uuid::Uuid::new_v4().to_string();
2586 let now = Utc::now().to_rfc3339();
2587
2588 let conn = self.conn.lock().await;
2589 conn.execute(
2590 "INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
2591 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2592 params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
2593 )?;
2594
2595 Ok(id)
2596 }
2597
2598 pub async fn get_children_tree(
2599 &self,
2600 parent_uri: &str,
2601 max_depth: usize,
2602 ) -> MemoryResult<Vec<crate::types::TreeNode>> {
2603 if max_depth == 0 {
2604 return Ok(Vec::new());
2605 }
2606
2607 let children = self.list_directory(parent_uri).await?;
2608 let mut tree_nodes = Vec::new();
2609
2610 for child in children {
2611 let layer_summary = self.get_layer_summary(&child.id).await?;
2612
2613 let grandchildren = if child.node_type == crate::types::NodeType::Directory {
2614 Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
2615 } else {
2616 Vec::new()
2617 };
2618
2619 tree_nodes.push(crate::types::TreeNode {
2620 node: child,
2621 children: grandchildren,
2622 layer_summary,
2623 });
2624 }
2625
2626 Ok(tree_nodes)
2627 }
2628
2629 async fn get_layer_summary(
2630 &self,
2631 node_id: &str,
2632 ) -> MemoryResult<Option<crate::types::LayerSummary>> {
2633 let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
2634 let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
2635 let has_l2 = self
2636 .get_layer(node_id, crate::types::LayerType::L2)
2637 .await?
2638 .is_some();
2639
2640 if l0.is_none() && l1.is_none() && !has_l2 {
2641 return Ok(None);
2642 }
2643
2644 Ok(Some(crate::types::LayerSummary {
2645 l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
2646 l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
2647 has_l2,
2648 }))
2649 }
2650
2651 pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
2652 let conn = self.conn.lock().await;
2653 let count: i64 = conn.query_row(
2654 "SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
2655 params![uri],
2656 |row| row.get(0),
2657 )?;
2658 Ok(count > 0)
2659 }
2660}
2661
2662fn truncate_string(s: &str, max_len: usize) -> String {
2663 if s.len() <= max_len {
2664 s.to_string()
2665 } else {
2666 format!("{}...", &s[..max_len.saturating_sub(3)])
2667 }
2668}
2669
2670fn build_fts_query(query: &str) -> String {
2671 let tokens = query
2672 .split_whitespace()
2673 .filter_map(|tok| {
2674 let cleaned =
2675 tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
2676 if cleaned.is_empty() {
2677 None
2678 } else {
2679 Some(format!("\"{}\"", cleaned))
2680 }
2681 })
2682 .collect::<Vec<_>>();
2683 if tokens.is_empty() {
2684 "\"\"".to_string()
2685 } else {
2686 tokens.join(" OR ")
2687 }
2688}
2689
2690#[cfg(test)]
2691mod tests {
2692 use super::*;
2693 use tempfile::TempDir;
2694
2695 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
2696 let temp_dir = TempDir::new().unwrap();
2697 let db_path = temp_dir.path().join("test_memory.db");
2698 let db = MemoryDatabase::new(&db_path).await.unwrap();
2699 (db, temp_dir)
2700 }
2701
2702 #[tokio::test]
2703 async fn test_init_schema() {
2704 let (db, _temp) = setup_test_db().await;
2705 let stats = db.get_stats().await.unwrap();
2707 assert_eq!(stats.total_chunks, 0);
2708 }
2709
2710 #[tokio::test]
2711 async fn test_store_and_retrieve_chunk() {
2712 let (db, _temp) = setup_test_db().await;
2713
2714 let chunk = MemoryChunk {
2715 id: "test-1".to_string(),
2716 content: "Test content".to_string(),
2717 tier: MemoryTier::Session,
2718 session_id: Some("session-1".to_string()),
2719 project_id: Some("project-1".to_string()),
2720 source: "user_message".to_string(),
2721 source_path: None,
2722 source_mtime: None,
2723 source_size: None,
2724 source_hash: None,
2725 created_at: Utc::now(),
2726 token_count: 10,
2727 metadata: None,
2728 };
2729
2730 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
2731 db.store_chunk(&chunk, &embedding).await.unwrap();
2732
2733 let chunks = db.get_session_chunks("session-1").await.unwrap();
2734 assert_eq!(chunks.len(), 1);
2735 assert_eq!(chunks[0].content, "Test content");
2736 }
2737
2738 #[tokio::test]
2739 async fn test_store_and_retrieve_global_chunk() {
2740 let (db, _temp) = setup_test_db().await;
2741
2742 let chunk = MemoryChunk {
2743 id: "global-1".to_string(),
2744 content: "Global note".to_string(),
2745 tier: MemoryTier::Global,
2746 session_id: None,
2747 project_id: None,
2748 source: "agent_note".to_string(),
2749 source_path: None,
2750 source_mtime: None,
2751 source_size: None,
2752 source_hash: None,
2753 created_at: Utc::now(),
2754 token_count: 7,
2755 metadata: Some(serde_json::json!({"kind":"test"})),
2756 };
2757
2758 let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
2759 db.store_chunk(&chunk, &embedding).await.unwrap();
2760
2761 let chunks = db.get_global_chunks(10).await.unwrap();
2762 assert_eq!(chunks.len(), 1);
2763 assert_eq!(chunks[0].content, "Global note");
2764 assert_eq!(chunks[0].source, "agent_note");
2765 assert_eq!(chunks[0].token_count, 7);
2766 assert_eq!(chunks[0].tier, MemoryTier::Global);
2767 }
2768
2769 #[tokio::test]
2770 async fn test_global_chunk_exists_by_source_hash() {
2771 let (db, _temp) = setup_test_db().await;
2772
2773 let chunk = MemoryChunk {
2774 id: "global-hash".to_string(),
2775 content: "Global hash note".to_string(),
2776 tier: MemoryTier::Global,
2777 session_id: None,
2778 project_id: None,
2779 source: "chat_exchange".to_string(),
2780 source_path: None,
2781 source_mtime: None,
2782 source_size: None,
2783 source_hash: Some("hash-123".to_string()),
2784 created_at: Utc::now(),
2785 token_count: 5,
2786 metadata: None,
2787 };
2788
2789 let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
2790 db.store_chunk(&chunk, &embedding).await.unwrap();
2791
2792 assert!(db
2793 .global_chunk_exists_by_source_hash("hash-123")
2794 .await
2795 .unwrap());
2796 assert!(!db
2797 .global_chunk_exists_by_source_hash("missing-hash")
2798 .await
2799 .unwrap());
2800 }
2801
2802 #[tokio::test]
2803 async fn test_config_crud() {
2804 let (db, _temp) = setup_test_db().await;
2805
2806 let config = db.get_or_create_config("project-1").await.unwrap();
2807 assert_eq!(config.max_chunks, 10000);
2808
2809 let new_config = MemoryConfig {
2810 max_chunks: 5000,
2811 ..Default::default()
2812 };
2813 db.update_config("project-1", &new_config).await.unwrap();
2814
2815 let updated = db.get_or_create_config("project-1").await.unwrap();
2816 assert_eq!(updated.max_chunks, 5000);
2817 }
2818
2819 #[tokio::test]
2820 async fn test_global_memory_put_search_and_dedup() {
2821 let (db, _temp) = setup_test_db().await;
2822 let now = chrono::Utc::now().timestamp_millis() as u64;
2823 let record = GlobalMemoryRecord {
2824 id: "gm-1".to_string(),
2825 user_id: "user-a".to_string(),
2826 source_type: "user_message".to_string(),
2827 content: "remember rust workspace layout".to_string(),
2828 content_hash: "h1".to_string(),
2829 run_id: "run-1".to_string(),
2830 session_id: Some("s1".to_string()),
2831 message_id: Some("m1".to_string()),
2832 tool_name: None,
2833 project_tag: Some("proj-x".to_string()),
2834 channel_tag: None,
2835 host_tag: None,
2836 metadata: None,
2837 provenance: None,
2838 redaction_status: "passed".to_string(),
2839 redaction_count: 0,
2840 visibility: "private".to_string(),
2841 demoted: false,
2842 score_boost: 0.0,
2843 created_at_ms: now,
2844 updated_at_ms: now,
2845 expires_at_ms: None,
2846 };
2847 let first = db.put_global_memory_record(&record).await.unwrap();
2848 assert!(first.stored);
2849 let second = db.put_global_memory_record(&record).await.unwrap();
2850 assert!(second.deduped);
2851
2852 let hits = db
2853 .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
2854 .await
2855 .unwrap();
2856 assert!(!hits.is_empty());
2857 assert_eq!(hits[0].record.id, "gm-1");
2858 }
2859}