1use crate::{MemoryRow, Storage};
4use codemem_core::{
5 CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Session,
6 StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13 let mut s = String::new();
14 for r in 0..rows {
15 if r > 0 {
16 s.push(',');
17 }
18 s.push('(');
19 for c in 0..cols {
20 if c > 0 {
21 s.push(',');
22 }
23 s.push('?');
24 s.push_str(&(r * cols + c + 1).to_string());
25 }
26 s.push(')');
27 }
28 s
29}
30
31macro_rules! delegate_storage {
33 ($method:ident(&self) -> $ret:ty) => {
35 fn $method(&self) -> $ret {
36 Storage::$method(self)
37 }
38 };
39 ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41 fn $method(&self, $a1: $t1) -> $ret {
42 Storage::$method(self, $a1)
43 }
44 };
45 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47 fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48 Storage::$method(self, $a1, $a2)
49 }
50 };
51 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54 Storage::$method(self, $a1, $a2, $a3)
55 }
56 };
57 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60 Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61 }
62 };
63}
64
65impl StorageBackend for Storage {
66 delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69 delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70 delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71 delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72 delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74 fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76 Storage::delete_memory_cascade(self, id)
79 }
80
81 fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83 Storage::delete_memories_batch_cascade(self, ids)
84 }
85
86 delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
87 delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
88 delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
89 delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
90 delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
91
92 fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
93 if ids.is_empty() {
94 return Ok(Vec::new());
95 }
96 let conn = self.conn()?;
97
98 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
99 let sql = format!(
100 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
101 placeholders.join(",")
102 );
103
104 let mut stmt = conn
105 .prepare(&sql)
106 .map_err(|e| CodememError::Storage(e.to_string()))?;
107
108 let params: Vec<&dyn rusqlite::types::ToSql> = ids
109 .iter()
110 .map(|id| id as &dyn rusqlite::types::ToSql)
111 .collect();
112
113 let rows = stmt
114 .query_map(params.as_slice(), |row| {
115 Ok(MemoryRow {
116 id: row.get(0)?,
117 content: row.get(1)?,
118 memory_type: row.get(2)?,
119 importance: row.get(3)?,
120 confidence: row.get(4)?,
121 access_count: row.get(5)?,
122 content_hash: row.get(6)?,
123 tags: row.get(7)?,
124 metadata: row.get(8)?,
125 namespace: row.get(9)?,
126 session_id: row.get(10)?,
127 created_at: row.get(11)?,
128 updated_at: row.get(12)?,
129 last_accessed_at: row.get(13)?,
130 })
131 })
132 .map_err(|e| CodememError::Storage(e.to_string()))?;
133
134 let mut memories = Vec::new();
135 for row in rows {
136 let row = row.map_err(|e| CodememError::Storage(e.to_string()))?;
137 memories.push(row.into_memory_node()?);
138 }
139 Ok(memories)
140 }
141
142 delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
145 delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
146
147 fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
148 let conn = self.conn()?;
149 let deleted = conn
150 .execute(
151 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
152 [memory_id],
153 )
154 .map_err(|e| CodememError::Storage(e.to_string()))?;
155 Ok(deleted > 0)
156 }
157
158 fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
159 let conn = self.conn()?;
160 let mut stmt = conn
161 .prepare("SELECT memory_id, embedding FROM memory_embeddings")
162 .map_err(|e| CodememError::Storage(e.to_string()))?;
163 let rows = stmt
164 .query_map([], |row| {
165 let id: String = row.get(0)?;
166 let blob: Vec<u8> = row.get(1)?;
167 Ok((id, blob))
168 })
169 .map_err(|e| CodememError::Storage(e.to_string()))?;
170 let mut result = Vec::new();
171 for row in rows {
172 let (id, blob) = row.map_err(|e| CodememError::Storage(e.to_string()))?;
173 let floats: Vec<f32> = blob
174 .chunks_exact(4)
175 .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
176 .collect();
177 result.push((id, floats));
178 }
179 Ok(result)
180 }
181
182 delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
185 delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
186 delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
187 delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
188 delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
189 delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
190 delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
191 delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
192 delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
193
194 delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
197 delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
198
199 fn list_sessions(
200 &self,
201 namespace: Option<&str>,
202 limit: usize,
203 ) -> Result<Vec<Session>, CodememError> {
204 self.list_sessions_with_limit(namespace, limit)
205 }
206
207 delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
210 delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
211
212 delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
215 delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
216 delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
217 delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
218
219 fn decay_stale_memories(
222 &self,
223 threshold_ts: i64,
224 decay_factor: f64,
225 ) -> Result<usize, CodememError> {
226 let conn = self.conn()?;
227 let rows = conn
228 .execute(
229 "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
230 params![decay_factor, threshold_ts],
231 )
232 .map_err(|e| CodememError::Storage(e.to_string()))?;
233 Ok(rows)
234 }
235
236 fn list_memories_for_creative(
237 &self,
238 ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
239 let conn = self.conn()?;
240 let mut stmt = conn
241 .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
242 .map_err(|e| CodememError::Storage(e.to_string()))?;
243
244 let rows = stmt
245 .query_map([], |row| {
246 Ok((
247 row.get::<_, String>(0)?,
248 row.get::<_, String>(1)?,
249 row.get::<_, String>(2)?,
250 ))
251 })
252 .map_err(|e| CodememError::Storage(e.to_string()))?
253 .collect::<Result<Vec<_>, _>>()
254 .map_err(|e| CodememError::Storage(e.to_string()))?;
255
256 Ok(rows
257 .into_iter()
258 .map(|(id, mtype, tags_json)| {
259 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
260 (id, mtype, tags)
261 })
262 .collect())
263 }
264
265 fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
266 let conn = self.conn()?;
267 let mut stmt = conn
268 .prepare(
269 "SELECT a.id, b.id, 1.0 as similarity
270 FROM memories a
271 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
272 WHERE a.id < b.id",
273 )
274 .map_err(|e| CodememError::Storage(e.to_string()))?;
275
276 let rows = stmt
277 .query_map([], |row| {
278 Ok((
279 row.get::<_, String>(0)?,
280 row.get::<_, String>(1)?,
281 row.get::<_, f64>(2)?,
282 ))
283 })
284 .map_err(|e| CodememError::Storage(e.to_string()))?
285 .collect::<Result<Vec<_>, _>>()
286 .map_err(|e| CodememError::Storage(e.to_string()))?;
287
288 Ok(rows)
289 }
290
291 fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
292 let conn = self.conn()?;
293 let mut stmt = conn
294 .prepare(
295 "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
296 )
297 .map_err(|e| CodememError::Storage(e.to_string()))?;
298
299 let ids = stmt
300 .query_map(params![importance_threshold], |row| row.get(0))
301 .map_err(|e| CodememError::Storage(e.to_string()))?
302 .collect::<Result<Vec<String>, _>>()
303 .map_err(|e| CodememError::Storage(e.to_string()))?;
304
305 Ok(ids)
306 }
307
308 fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
311 if memories.is_empty() {
312 return Ok(());
313 }
314 let conn = self.conn()?;
315 let tx = conn
316 .unchecked_transaction()
317 .map_err(|e| CodememError::Storage(e.to_string()))?;
318
319 const COLS: usize = 14;
320 const BATCH: usize = 999 / COLS; for chunk in memories.chunks(BATCH) {
323 let placeholders = multi_row_placeholders(COLS, chunk.len());
324 let sql = format!(
325 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
326 );
327
328 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
329 Vec::with_capacity(chunk.len() * COLS);
330 for memory in chunk {
331 let tags_json = serde_json::to_string(&memory.tags)?;
332 let metadata_json = serde_json::to_string(&memory.metadata)?;
333 param_values.push(Box::new(memory.id.clone()));
334 param_values.push(Box::new(memory.content.clone()));
335 param_values.push(Box::new(memory.memory_type.to_string()));
336 param_values.push(Box::new(memory.importance));
337 param_values.push(Box::new(memory.confidence));
338 param_values.push(Box::new(memory.access_count as i64));
339 param_values.push(Box::new(memory.content_hash.clone()));
340 param_values.push(Box::new(tags_json));
341 param_values.push(Box::new(metadata_json));
342 param_values.push(Box::new(memory.namespace.clone()));
343 param_values.push(Box::new(memory.session_id.clone()));
344 param_values.push(Box::new(memory.created_at.timestamp()));
345 param_values.push(Box::new(memory.updated_at.timestamp()));
346 param_values.push(Box::new(memory.last_accessed_at.timestamp()));
347 }
348
349 let refs: Vec<&dyn rusqlite::types::ToSql> =
350 param_values.iter().map(|p| p.as_ref()).collect();
351 tx.execute(&sql, refs.as_slice())
352 .map_err(|e| CodememError::Storage(e.to_string()))?;
353 }
354
355 tx.commit()
356 .map_err(|e| CodememError::Storage(e.to_string()))?;
357 Ok(())
358 }
359
360 fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
361 if items.is_empty() {
362 return Ok(());
363 }
364 let conn = self.conn()?;
365 let tx = conn
366 .unchecked_transaction()
367 .map_err(|e| CodememError::Storage(e.to_string()))?;
368
369 const COLS: usize = 2;
370 const BATCH: usize = 999 / COLS; for chunk in items.chunks(BATCH) {
373 let placeholders = multi_row_placeholders(COLS, chunk.len());
374 let sql = format!(
375 "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
376 );
377
378 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
379 Vec::with_capacity(chunk.len() * COLS);
380 for (id, embedding) in chunk {
381 let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
382 param_values.push(Box::new(id.to_string()));
383 param_values.push(Box::new(blob));
384 }
385
386 let refs: Vec<&dyn rusqlite::types::ToSql> =
387 param_values.iter().map(|p| p.as_ref()).collect();
388 tx.execute(&sql, refs.as_slice())
389 .map_err(|e| CodememError::Storage(e.to_string()))?;
390 }
391
392 tx.commit()
393 .map_err(|e| CodememError::Storage(e.to_string()))?;
394 Ok(())
395 }
396
397 fn load_file_hashes(&self) -> Result<HashMap<String, String>, CodememError> {
398 let conn = self.conn()?;
399 let mut stmt = conn
400 .prepare("SELECT file_path, content_hash FROM file_hashes")
401 .map_err(|e| CodememError::Storage(e.to_string()))?;
402
403 let rows = stmt
404 .query_map([], |row| {
405 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
406 })
407 .map_err(|e| CodememError::Storage(e.to_string()))?
408 .collect::<Result<Vec<_>, _>>()
409 .map_err(|e| CodememError::Storage(e.to_string()))?;
410
411 Ok(rows.into_iter().collect())
412 }
413
414 fn save_file_hashes(&self, hashes: &HashMap<String, String>) -> Result<(), CodememError> {
415 let conn = self.conn()?;
416 let tx = conn
417 .unchecked_transaction()
418 .map_err(|e| CodememError::Storage(e.to_string()))?;
419
420 tx.execute("DELETE FROM file_hashes", [])
421 .map_err(|e| CodememError::Storage(e.to_string()))?;
422
423 for (path, hash) in hashes {
424 tx.execute(
425 "INSERT INTO file_hashes (file_path, content_hash) VALUES (?1, ?2)",
426 params![path, hash],
427 )
428 .map_err(|e| CodememError::Storage(e.to_string()))?;
429 }
430
431 tx.commit()
432 .map_err(|e| CodememError::Storage(e.to_string()))?;
433 Ok(())
434 }
435
436 fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
437 if nodes.is_empty() {
438 return Ok(());
439 }
440 let conn = self.conn()?;
441 let tx = conn
442 .unchecked_transaction()
443 .map_err(|e| CodememError::Storage(e.to_string()))?;
444
445 const COLS: usize = 7;
446 const BATCH: usize = 999 / COLS; for chunk in nodes.chunks(BATCH) {
449 let placeholders = multi_row_placeholders(COLS, chunk.len());
450 let sql = format!(
451 "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
452 );
453
454 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
455 Vec::with_capacity(chunk.len() * COLS);
456 for node in chunk {
457 let payload_json =
458 serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
459 param_values.push(Box::new(node.id.clone()));
460 param_values.push(Box::new(node.kind.to_string()));
461 param_values.push(Box::new(node.label.clone()));
462 param_values.push(Box::new(payload_json));
463 param_values.push(Box::new(node.centrality));
464 param_values.push(Box::new(node.memory_id.clone()));
465 param_values.push(Box::new(node.namespace.clone()));
466 }
467
468 let refs: Vec<&dyn rusqlite::types::ToSql> =
469 param_values.iter().map(|p| p.as_ref()).collect();
470 tx.execute(&sql, refs.as_slice())
471 .map_err(|e| CodememError::Storage(e.to_string()))?;
472 }
473
474 tx.commit()
475 .map_err(|e| CodememError::Storage(e.to_string()))?;
476 Ok(())
477 }
478
479 fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
480 if edges.is_empty() {
481 return Ok(());
482 }
483 let conn = self.conn()?;
484 let tx = conn
485 .unchecked_transaction()
486 .map_err(|e| CodememError::Storage(e.to_string()))?;
487
488 const COLS: usize = 9;
489 const BATCH: usize = 999 / COLS; for chunk in edges.chunks(BATCH) {
492 let placeholders = multi_row_placeholders(COLS, chunk.len());
493 let sql = format!(
494 "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
495 );
496
497 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
498 Vec::with_capacity(chunk.len() * COLS);
499 for edge in chunk {
500 let props_json =
501 serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
502 param_values.push(Box::new(edge.id.clone()));
503 param_values.push(Box::new(edge.src.clone()));
504 param_values.push(Box::new(edge.dst.clone()));
505 param_values.push(Box::new(edge.relationship.to_string()));
506 param_values.push(Box::new(edge.weight));
507 param_values.push(Box::new(props_json));
508 param_values.push(Box::new(edge.created_at.timestamp()));
509 param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
510 param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
511 }
512
513 let refs: Vec<&dyn rusqlite::types::ToSql> =
514 param_values.iter().map(|p| p.as_ref()).collect();
515 tx.execute(&sql, refs.as_slice())
516 .map_err(|e| CodememError::Storage(e.to_string()))?;
517 }
518
519 tx.commit()
520 .map_err(|e| CodememError::Storage(e.to_string()))?;
521 Ok(())
522 }
523
524 fn get_stale_memories_for_decay(
525 &self,
526 threshold_ts: i64,
527 ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
528 let conn = self.conn()?;
529 let mut stmt = conn
530 .prepare(
531 "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
532 )
533 .map_err(|e| CodememError::Storage(e.to_string()))?;
534
535 let rows = stmt
536 .query_map(params![threshold_ts], |row| {
537 Ok((
538 row.get::<_, String>(0)?,
539 row.get::<_, f64>(1)?,
540 row.get::<_, u32>(2)?,
541 row.get::<_, i64>(3)?,
542 ))
543 })
544 .map_err(|e| CodememError::Storage(e.to_string()))?
545 .collect::<Result<Vec<_>, _>>()
546 .map_err(|e| CodememError::Storage(e.to_string()))?;
547
548 Ok(rows)
549 }
550
551 fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
552 if updates.is_empty() {
553 return Ok(0);
554 }
555 let conn = self.conn()?;
556 let tx = conn
557 .unchecked_transaction()
558 .map_err(|e| CodememError::Storage(e.to_string()))?;
559
560 let mut count = 0usize;
561 for (id, importance) in updates {
562 let rows = tx
563 .execute(
564 "UPDATE memories SET importance = ?1 WHERE id = ?2",
565 params![importance, id],
566 )
567 .map_err(|e| CodememError::Storage(e.to_string()))?;
568 count += rows;
569 }
570
571 tx.commit()
572 .map_err(|e| CodememError::Storage(e.to_string()))?;
573 Ok(count)
574 }
575
576 fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
577 let conn = self.conn()?;
578 let count: i64 = if let Some(ns) = namespace {
579 conn.query_row(
580 "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
581 params![ns],
582 |row| row.get(0),
583 )
584 .map_err(|e| CodememError::Storage(e.to_string()))?
585 } else {
586 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
587 .map_err(|e| CodememError::Storage(e.to_string()))?
588 };
589 Ok(count as usize)
590 }
591
592 fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
595 let conn = self.conn()?;
596 let mut stmt = conn
597 .prepare(
598 "SELECT m.id, m.content FROM memories m
599 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
600 WHERE me.memory_id IS NULL",
601 )
602 .map_err(|e| CodememError::Storage(e.to_string()))?;
603
604 let rows = stmt
605 .query_map([], |row| {
606 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
607 })
608 .map_err(|e| CodememError::Storage(e.to_string()))?
609 .collect::<Result<Vec<_>, _>>()
610 .map_err(|e| CodememError::Storage(e.to_string()))?;
611
612 Ok(rows)
613 }
614
615 fn search_graph_nodes(
616 &self,
617 query: &str,
618 namespace: Option<&str>,
619 limit: usize,
620 ) -> Result<Vec<GraphNode>, CodememError> {
621 let conn = self.conn()?;
622 let escaped = query
623 .to_lowercase()
624 .replace('\\', "\\\\")
625 .replace('%', "\\%")
626 .replace('_', "\\_");
627 let pattern = format!("%{escaped}%");
628
629 let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
630 if let Some(ns) = namespace {
631 (
632 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
633 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
634 ORDER BY centrality DESC LIMIT ?3"
635 .to_string(),
636 vec![
637 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
638 Box::new(ns.to_string()),
639 Box::new(limit as i64),
640 ],
641 )
642 } else {
643 (
644 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
645 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
646 ORDER BY centrality DESC LIMIT ?2"
647 .to_string(),
648 vec![
649 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
650 Box::new(limit as i64),
651 ],
652 )
653 };
654
655 let refs: Vec<&dyn rusqlite::types::ToSql> =
656 params_vec.iter().map(|p| p.as_ref()).collect();
657 let mut stmt = conn
658 .prepare(&sql)
659 .map_err(|e| CodememError::Storage(e.to_string()))?;
660
661 let rows = stmt
662 .query_map(refs.as_slice(), |row| {
663 let kind_str: String = row.get(1)?;
664 let payload_str: String = row.get(3)?;
665 Ok(GraphNode {
666 id: row.get(0)?,
667 kind: kind_str.parse().unwrap_or(NodeKind::Memory),
668 label: row.get(2)?,
669 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
670 centrality: row.get(4)?,
671 memory_id: row.get(5)?,
672 namespace: row.get(6)?,
673 })
674 })
675 .map_err(|e| CodememError::Storage(e.to_string()))?
676 .collect::<Result<Vec<_>, _>>()
677 .map_err(|e| CodememError::Storage(e.to_string()))?;
678
679 Ok(rows)
680 }
681
682 fn list_memories_filtered(
683 &self,
684 namespace: Option<&str>,
685 memory_type: Option<&str>,
686 ) -> Result<Vec<MemoryNode>, CodememError> {
687 let conn = self.conn()?;
688 let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
689 content_hash, tags, metadata, namespace, session_id, created_at, updated_at, \
690 last_accessed_at FROM memories WHERE 1=1"
691 .to_string();
692 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
693
694 if let Some(ns) = namespace {
695 param_values.push(Box::new(ns.to_string()));
696 sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
697 }
698 if let Some(mt) = memory_type {
699 param_values.push(Box::new(mt.to_string()));
700 sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
701 }
702 sql.push_str(" ORDER BY created_at DESC");
703
704 let refs: Vec<&dyn rusqlite::types::ToSql> =
705 param_values.iter().map(|p| p.as_ref()).collect();
706 let mut stmt = conn
707 .prepare(&sql)
708 .map_err(|e| CodememError::Storage(e.to_string()))?;
709
710 let rows = stmt
711 .query_map(refs.as_slice(), |row| {
712 Ok(MemoryRow {
713 id: row.get(0)?,
714 content: row.get(1)?,
715 memory_type: row.get(2)?,
716 importance: row.get(3)?,
717 confidence: row.get(4)?,
718 access_count: row.get(5)?,
719 content_hash: row.get(6)?,
720 tags: row.get(7)?,
721 metadata: row.get(8)?,
722 namespace: row.get(9)?,
723 session_id: row.get(10)?,
724 created_at: row.get(11)?,
725 updated_at: row.get(12)?,
726 last_accessed_at: row.get(13)?,
727 })
728 })
729 .map_err(|e| CodememError::Storage(e.to_string()))?;
730
731 let mut result = Vec::new();
732 for row in rows {
733 let mr = row.map_err(|e| CodememError::Storage(e.to_string()))?;
734 result.push(mr.into_memory_node()?);
735 }
736
737 Ok(result)
738 }
739
740 delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
743 delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
744 delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
745 delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
746 delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
747 delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
748 delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
749
750 delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
753
754 fn begin_transaction(&self) -> Result<(), CodememError> {
757 let conn = self.conn()?;
758 conn.execute_batch("BEGIN IMMEDIATE")
759 .map_err(|e| CodememError::Storage(e.to_string()))?;
760 self.in_transaction
761 .store(true, std::sync::atomic::Ordering::Release);
762 Ok(())
763 }
764
765 fn commit_transaction(&self) -> Result<(), CodememError> {
766 let conn = self.conn()?;
767 conn.execute_batch("COMMIT")
768 .map_err(|e| CodememError::Storage(e.to_string()))?;
769 self.in_transaction
772 .store(false, std::sync::atomic::Ordering::Release);
773 Ok(())
774 }
775
776 fn rollback_transaction(&self) -> Result<(), CodememError> {
777 self.in_transaction
778 .store(false, std::sync::atomic::Ordering::Release);
779 let conn = self.conn()?;
780 conn.execute_batch("ROLLBACK")
781 .map_err(|e| CodememError::Storage(e.to_string()))?;
782 Ok(())
783 }
784}
785
786#[cfg(test)]
787#[path = "tests/backend_tests.rs"]
788mod tests;