1use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5 CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6 Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13 let mut s = String::new();
14 for r in 0..rows {
15 if r > 0 {
16 s.push(',');
17 }
18 s.push('(');
19 for c in 0..cols {
20 if c > 0 {
21 s.push(',');
22 }
23 s.push('?');
24 s.push_str(&(r * cols + c + 1).to_string());
25 }
26 s.push(')');
27 }
28 s
29}
30
31macro_rules! delegate_storage {
33 ($method:ident(&self) -> $ret:ty) => {
35 fn $method(&self) -> $ret {
36 Storage::$method(self)
37 }
38 };
39 ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41 fn $method(&self, $a1: $t1) -> $ret {
42 Storage::$method(self, $a1)
43 }
44 };
45 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47 fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48 Storage::$method(self, $a1, $a2)
49 }
50 };
51 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54 Storage::$method(self, $a1, $a2, $a3)
55 }
56 };
57 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60 Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61 }
62 };
63}
64
65impl StorageBackend for Storage {
66 delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69 delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70 delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71 delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72 delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74 fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76 Storage::delete_memory_cascade(self, id)
79 }
80
81 fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83 Storage::delete_memories_batch_cascade(self, ids)
84 }
85
86 delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
87 delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
88 delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
89 delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
90 delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
91
92 fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
93 if ids.is_empty() {
94 return Ok(Vec::new());
95 }
96 let conn = self.conn()?;
97
98 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
99 let sql = format!(
100 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
101 placeholders.join(",")
102 );
103
104 let mut stmt = conn.prepare(&sql).storage_err()?;
105
106 let params: Vec<&dyn rusqlite::types::ToSql> = ids
107 .iter()
108 .map(|id| id as &dyn rusqlite::types::ToSql)
109 .collect();
110
111 let rows = stmt
112 .query_map(params.as_slice(), MemoryRow::from_row)
113 .storage_err()?;
114
115 let mut memories = Vec::new();
116 for row in rows {
117 let row = row.storage_err()?;
118 memories.push(row.into_memory_node()?);
119 }
120 Ok(memories)
121 }
122
123 delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
126 delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
127
128 fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
129 let conn = self.conn()?;
130 let deleted = conn
131 .execute(
132 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
133 [memory_id],
134 )
135 .storage_err()?;
136 Ok(deleted > 0)
137 }
138
139 fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
140 let conn = self.conn()?;
141 let mut stmt = conn
142 .prepare("SELECT memory_id, embedding FROM memory_embeddings")
143 .storage_err()?;
144 let rows = stmt
145 .query_map([], |row| {
146 let id: String = row.get(0)?;
147 let blob: Vec<u8> = row.get(1)?;
148 Ok((id, blob))
149 })
150 .storage_err()?;
151 let mut result = Vec::new();
152 for row in rows {
153 let (id, blob) = row.storage_err()?;
154 let floats: Vec<f32> = blob
155 .chunks_exact(4)
156 .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
157 .collect();
158 result.push((id, floats));
159 }
160 Ok(result)
161 }
162
163 delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
166 delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
167 delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
168 delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
169 delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
170 delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
171 delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
172 delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
173 delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
174 delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
175 delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
178 delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
179
180 fn list_sessions(
181 &self,
182 namespace: Option<&str>,
183 limit: usize,
184 ) -> Result<Vec<Session>, CodememError> {
185 self.list_sessions_with_limit(namespace, limit)
186 }
187
188 delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
191 delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
192
193 delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
196 delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
197 delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
198 delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199
200 fn decay_stale_memories(
203 &self,
204 threshold_ts: i64,
205 decay_factor: f64,
206 ) -> Result<usize, CodememError> {
207 let conn = self.conn()?;
208 let rows = conn
209 .execute(
210 "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
211 params![decay_factor, threshold_ts],
212 )
213 .storage_err()?;
214 Ok(rows)
215 }
216
217 fn list_memories_for_creative(
218 &self,
219 ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
220 let conn = self.conn()?;
221 let mut stmt = conn
222 .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
223 .storage_err()?;
224
225 let rows = stmt
226 .query_map([], |row| {
227 Ok((
228 row.get::<_, String>(0)?,
229 row.get::<_, String>(1)?,
230 row.get::<_, String>(2)?,
231 ))
232 })
233 .storage_err()?
234 .collect::<Result<Vec<_>, _>>()
235 .storage_err()?;
236
237 Ok(rows
238 .into_iter()
239 .map(|(id, mtype, tags_json)| {
240 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
241 (id, mtype, tags)
242 })
243 .collect())
244 }
245
246 fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
247 let conn = self.conn()?;
248 let mut stmt = conn
249 .prepare(
250 "SELECT a.id, b.id, 1.0 as similarity
251 FROM memories a
252 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
253 WHERE a.id < b.id",
254 )
255 .storage_err()?;
256
257 let rows = stmt
258 .query_map([], |row| {
259 Ok((
260 row.get::<_, String>(0)?,
261 row.get::<_, String>(1)?,
262 row.get::<_, f64>(2)?,
263 ))
264 })
265 .storage_err()?
266 .collect::<Result<Vec<_>, _>>()
267 .storage_err()?;
268
269 Ok(rows)
270 }
271
272 fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
273 let conn = self.conn()?;
274 let mut stmt = conn
275 .prepare(
276 "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
277 )
278 .storage_err()?;
279
280 let ids = stmt
281 .query_map(params![importance_threshold], |row| row.get(0))
282 .storage_err()?
283 .collect::<Result<Vec<String>, _>>()
284 .storage_err()?;
285
286 Ok(ids)
287 }
288
289 fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
292 if memories.is_empty() {
293 return Ok(());
294 }
295 let conn = self.conn()?;
296 let tx = conn.unchecked_transaction().storage_err()?;
297
298 const COLS: usize = 14;
299 const BATCH: usize = 999 / COLS; for chunk in memories.chunks(BATCH) {
302 let placeholders = multi_row_placeholders(COLS, chunk.len());
303 let sql = format!(
304 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
305 );
306
307 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
308 Vec::with_capacity(chunk.len() * COLS);
309 for memory in chunk {
310 let tags_json = serde_json::to_string(&memory.tags)?;
311 let metadata_json = serde_json::to_string(&memory.metadata)?;
312 param_values.push(Box::new(memory.id.clone()));
313 param_values.push(Box::new(memory.content.clone()));
314 param_values.push(Box::new(memory.memory_type.to_string()));
315 param_values.push(Box::new(memory.importance));
316 param_values.push(Box::new(memory.confidence));
317 param_values.push(Box::new(memory.access_count as i64));
318 param_values.push(Box::new(memory.content_hash.clone()));
319 param_values.push(Box::new(tags_json));
320 param_values.push(Box::new(metadata_json));
321 param_values.push(Box::new(memory.namespace.clone()));
322 param_values.push(Box::new(memory.session_id.clone()));
323 param_values.push(Box::new(memory.created_at.timestamp()));
324 param_values.push(Box::new(memory.updated_at.timestamp()));
325 param_values.push(Box::new(memory.last_accessed_at.timestamp()));
326 }
327
328 let refs: Vec<&dyn rusqlite::types::ToSql> =
329 param_values.iter().map(|p| p.as_ref()).collect();
330 tx.execute(&sql, refs.as_slice()).storage_err()?;
331 }
332
333 tx.commit().storage_err()?;
334 Ok(())
335 }
336
337 fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
338 if items.is_empty() {
339 return Ok(());
340 }
341 let conn = self.conn()?;
342 let tx = conn.unchecked_transaction().storage_err()?;
343
344 const COLS: usize = 2;
345 const BATCH: usize = 999 / COLS; for chunk in items.chunks(BATCH) {
348 let placeholders = multi_row_placeholders(COLS, chunk.len());
349 let sql = format!(
350 "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
351 );
352
353 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
354 Vec::with_capacity(chunk.len() * COLS);
355 for (id, embedding) in chunk {
356 let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
357 param_values.push(Box::new(id.to_string()));
358 param_values.push(Box::new(blob));
359 }
360
361 let refs: Vec<&dyn rusqlite::types::ToSql> =
362 param_values.iter().map(|p| p.as_ref()).collect();
363 tx.execute(&sql, refs.as_slice()).storage_err()?;
364 }
365
366 tx.commit().storage_err()?;
367 Ok(())
368 }
369
370 fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
371 let conn = self.conn()?;
372 let mut stmt = conn
373 .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
374 .storage_err()?;
375
376 let rows = stmt
377 .query_map(params![namespace], |row| {
378 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
379 })
380 .storage_err()?
381 .collect::<Result<Vec<_>, _>>()
382 .storage_err()?;
383
384 Ok(rows.into_iter().collect())
385 }
386
387 fn save_file_hashes(
388 &self,
389 namespace: &str,
390 hashes: &HashMap<String, String>,
391 ) -> Result<(), CodememError> {
392 let conn = self.conn()?;
393 let tx = conn.unchecked_transaction().storage_err()?;
394
395 tx.execute(
396 "DELETE FROM file_hashes WHERE namespace = ?1",
397 params![namespace],
398 )
399 .storage_err()?;
400
401 for (path, hash) in hashes {
402 tx.execute(
403 "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
404 params![namespace, path, hash],
405 )
406 .storage_err()?;
407 }
408
409 tx.commit().storage_err()?;
410 Ok(())
411 }
412
413 fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
414 if nodes.is_empty() {
415 return Ok(());
416 }
417 let conn = self.conn()?;
418 let tx = conn.unchecked_transaction().storage_err()?;
419
420 const COLS: usize = 7;
421 const BATCH: usize = 999 / COLS; for chunk in nodes.chunks(BATCH) {
424 let placeholders = multi_row_placeholders(COLS, chunk.len());
425 let sql = format!(
426 "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
427 );
428
429 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
430 Vec::with_capacity(chunk.len() * COLS);
431 for node in chunk {
432 let payload_json =
433 serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
434 param_values.push(Box::new(node.id.clone()));
435 param_values.push(Box::new(node.kind.to_string()));
436 param_values.push(Box::new(node.label.clone()));
437 param_values.push(Box::new(payload_json));
438 param_values.push(Box::new(node.centrality));
439 param_values.push(Box::new(node.memory_id.clone()));
440 param_values.push(Box::new(node.namespace.clone()));
441 }
442
443 let refs: Vec<&dyn rusqlite::types::ToSql> =
444 param_values.iter().map(|p| p.as_ref()).collect();
445 tx.execute(&sql, refs.as_slice()).storage_err()?;
446 }
447
448 tx.commit().storage_err()?;
449 Ok(())
450 }
451
452 fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
453 if edges.is_empty() {
454 return Ok(());
455 }
456 let conn = self.conn()?;
457 let tx = conn.unchecked_transaction().storage_err()?;
458
459 const COLS: usize = 9;
460 const BATCH: usize = 999 / COLS; for chunk in edges.chunks(BATCH) {
463 let placeholders = multi_row_placeholders(COLS, chunk.len());
464 let sql = format!(
465 "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
466 );
467
468 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
469 Vec::with_capacity(chunk.len() * COLS);
470 for edge in chunk {
471 let props_json =
472 serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
473 param_values.push(Box::new(edge.id.clone()));
474 param_values.push(Box::new(edge.src.clone()));
475 param_values.push(Box::new(edge.dst.clone()));
476 param_values.push(Box::new(edge.relationship.to_string()));
477 param_values.push(Box::new(edge.weight));
478 param_values.push(Box::new(props_json));
479 param_values.push(Box::new(edge.created_at.timestamp()));
480 param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
481 param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
482 }
483
484 let refs: Vec<&dyn rusqlite::types::ToSql> =
485 param_values.iter().map(|p| p.as_ref()).collect();
486 tx.execute(&sql, refs.as_slice()).storage_err()?;
487 }
488
489 tx.commit().storage_err()?;
490 Ok(())
491 }
492
493 fn get_stale_memories_for_decay(
494 &self,
495 threshold_ts: i64,
496 ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
497 let conn = self.conn()?;
498 let mut stmt = conn
499 .prepare(
500 "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
501 )
502 .storage_err()?;
503
504 let rows = stmt
505 .query_map(params![threshold_ts], |row| {
506 Ok((
507 row.get::<_, String>(0)?,
508 row.get::<_, f64>(1)?,
509 row.get::<_, u32>(2)?,
510 row.get::<_, i64>(3)?,
511 ))
512 })
513 .storage_err()?
514 .collect::<Result<Vec<_>, _>>()
515 .storage_err()?;
516
517 Ok(rows)
518 }
519
520 fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
521 if updates.is_empty() {
522 return Ok(0);
523 }
524 let conn = self.conn()?;
525 let tx = conn.unchecked_transaction().storage_err()?;
526
527 let mut count = 0usize;
528 for (id, importance) in updates {
529 let rows = tx
530 .execute(
531 "UPDATE memories SET importance = ?1 WHERE id = ?2",
532 params![importance, id],
533 )
534 .storage_err()?;
535 count += rows;
536 }
537
538 tx.commit().storage_err()?;
539 Ok(count)
540 }
541
542 fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
543 let conn = self.conn()?;
544 let count: i64 = if let Some(ns) = namespace {
545 conn.query_row(
546 "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
547 params![ns],
548 |row| row.get(0),
549 )
550 .storage_err()?
551 } else {
552 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
553 .storage_err()?
554 };
555 Ok(count as usize)
556 }
557
558 fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
561 let conn = self.conn()?;
562 let mut stmt = conn
563 .prepare(
564 "SELECT m.id, m.content FROM memories m
565 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
566 WHERE me.memory_id IS NULL",
567 )
568 .storage_err()?;
569
570 let rows = stmt
571 .query_map([], |row| {
572 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
573 })
574 .storage_err()?
575 .collect::<Result<Vec<_>, _>>()
576 .storage_err()?;
577
578 Ok(rows)
579 }
580
581 fn search_graph_nodes(
582 &self,
583 query: &str,
584 namespace: Option<&str>,
585 limit: usize,
586 ) -> Result<Vec<GraphNode>, CodememError> {
587 let conn = self.conn()?;
588 let escaped = query
589 .to_lowercase()
590 .replace('\\', "\\\\")
591 .replace('%', "\\%")
592 .replace('_', "\\_");
593 let pattern = format!("%{escaped}%");
594
595 let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
596 if let Some(ns) = namespace {
597 (
598 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
599 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
600 ORDER BY centrality DESC LIMIT ?3"
601 .to_string(),
602 vec![
603 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
604 Box::new(ns.to_string()),
605 Box::new(limit as i64),
606 ],
607 )
608 } else {
609 (
610 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
611 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
612 ORDER BY centrality DESC LIMIT ?2"
613 .to_string(),
614 vec![
615 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
616 Box::new(limit as i64),
617 ],
618 )
619 };
620
621 let refs: Vec<&dyn rusqlite::types::ToSql> =
622 params_vec.iter().map(|p| p.as_ref()).collect();
623 let mut stmt = conn.prepare(&sql).storage_err()?;
624
625 let rows = stmt
626 .query_map(refs.as_slice(), |row| {
627 let kind_str: String = row.get(1)?;
628 let payload_str: String = row.get(3)?;
629 Ok(GraphNode {
630 id: row.get(0)?,
631 kind: kind_str.parse().unwrap_or(NodeKind::Memory),
632 label: row.get(2)?,
633 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
634 centrality: row.get(4)?,
635 memory_id: row.get(5)?,
636 namespace: row.get(6)?,
637 })
638 })
639 .storage_err()?
640 .collect::<Result<Vec<_>, _>>()
641 .storage_err()?;
642
643 Ok(rows)
644 }
645
646 fn list_memories_by_tag(
647 &self,
648 tag: &str,
649 namespace: Option<&str>,
650 limit: usize,
651 ) -> Result<Vec<MemoryNode>, CodememError> {
652 Storage::list_memories_by_tag(self, tag, namespace, limit)
653 }
654
655 fn list_memories_filtered(
656 &self,
657 namespace: Option<&str>,
658 memory_type: Option<&str>,
659 ) -> Result<Vec<MemoryNode>, CodememError> {
660 let conn = self.conn()?;
661 let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
662 content_hash, tags, metadata, namespace, session_id, created_at, updated_at, \
663 last_accessed_at FROM memories WHERE 1=1"
664 .to_string();
665 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
666
667 if let Some(ns) = namespace {
668 param_values.push(Box::new(ns.to_string()));
669 sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
670 }
671 if let Some(mt) = memory_type {
672 param_values.push(Box::new(mt.to_string()));
673 sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
674 }
675 sql.push_str(" ORDER BY created_at DESC");
676
677 let refs: Vec<&dyn rusqlite::types::ToSql> =
678 param_values.iter().map(|p| p.as_ref()).collect();
679 let mut stmt = conn.prepare(&sql).storage_err()?;
680
681 let rows = stmt
682 .query_map(refs.as_slice(), MemoryRow::from_row)
683 .storage_err()?;
684
685 let mut result = Vec::new();
686 for row in rows {
687 let mr = row.storage_err()?;
688 result.push(mr.into_memory_node()?);
689 }
690
691 Ok(result)
692 }
693
694 delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
697 delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
698 delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
699 delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
700 delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
701 delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
702 delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
703
704 delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
707
708 fn begin_transaction(&self) -> Result<(), CodememError> {
711 let conn = self.conn()?;
712 conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
713 self.in_transaction
714 .store(true, std::sync::atomic::Ordering::Release);
715 Ok(())
716 }
717
718 fn commit_transaction(&self) -> Result<(), CodememError> {
719 let conn = self.conn()?;
720 conn.execute_batch("COMMIT").storage_err()?;
721 self.in_transaction
724 .store(false, std::sync::atomic::Ordering::Release);
725 Ok(())
726 }
727
728 fn rollback_transaction(&self) -> Result<(), CodememError> {
729 self.in_transaction
730 .store(false, std::sync::atomic::Ordering::Release);
731 let conn = self.conn()?;
732 conn.execute_batch("ROLLBACK").storage_err()?;
733 Ok(())
734 }
735
736 fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
739 Storage::list_repos(self)
740 }
741
742 fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
743 Storage::add_repo(self, repo)
744 }
745
746 fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
747 Storage::get_repo(self, id)
748 }
749
750 fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
751 Storage::remove_repo(self, id)
752 }
753
754 fn update_repo_status(
755 &self,
756 id: &str,
757 status: &str,
758 indexed_at: Option<&str>,
759 ) -> Result<(), CodememError> {
760 Storage::update_repo_status(self, id, status, indexed_at)
761 }
762
763 fn graph_edges_for_namespace_with_cross(
766 &self,
767 namespace: &str,
768 include_cross_namespace: bool,
769 ) -> Result<Vec<Edge>, CodememError> {
770 Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
771 }
772
773 fn upsert_package_registry(
774 &self,
775 package_name: &str,
776 namespace: &str,
777 version: &str,
778 manifest: &str,
779 ) -> Result<(), CodememError> {
780 Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
781 }
782
783 fn store_unresolved_ref(
784 &self,
785 source_qualified_name: &str,
786 target_name: &str,
787 source_namespace: &str,
788 file_path: &str,
789 line: usize,
790 ref_kind: &str,
791 package_hint: Option<&str>,
792 ) -> Result<(), CodememError> {
793 use crate::cross_repo::UnresolvedRefEntry;
794 let entry = UnresolvedRefEntry {
795 id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
796 namespace: source_namespace.to_string(),
797 source_node: source_qualified_name.to_string(),
798 target_name: target_name.to_string(),
799 package_hint: package_hint.map(|s| s.to_string()),
800 ref_kind: ref_kind.to_string(),
801 file_path: Some(file_path.to_string()),
802 line: Some(line as i64),
803 created_at: chrono::Utc::now().timestamp(),
804 };
805 Storage::insert_unresolved_ref(self, &entry)
806 }
807
808 fn store_unresolved_refs_batch(
809 &self,
810 refs: &[codemem_core::UnresolvedRefData],
811 ) -> Result<usize, CodememError> {
812 use crate::cross_repo::UnresolvedRefEntry;
813 let now = chrono::Utc::now().timestamp();
814 let entries: Vec<UnresolvedRefEntry> = refs
815 .iter()
816 .map(|r| UnresolvedRefEntry {
817 id: format!(
818 "uref:{}:{}:{}",
819 r.namespace, r.source_qualified_name, r.target_name
820 ),
821 namespace: r.namespace.clone(),
822 source_node: r.source_qualified_name.clone(),
823 target_name: r.target_name.clone(),
824 package_hint: r.package_hint.clone(),
825 ref_kind: r.ref_kind.clone(),
826 file_path: Some(r.file_path.clone()),
827 line: Some(r.line as i64),
828 created_at: now,
829 })
830 .collect();
831 let count = entries.len();
832 Storage::insert_unresolved_refs_batch(self, &entries)?;
833 Ok(count)
834 }
835
836 fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
837 let conn = self.conn()?;
838 let mut stmt = conn
839 .prepare("SELECT package_name, namespace, manifest FROM package_registry")
840 .map_err(|e| CodememError::Storage(e.to_string()))?;
841 let rows = stmt
842 .query_map([], |row| {
843 Ok((
844 row.get::<_, String>(0)?,
845 row.get::<_, String>(1)?,
846 row.get::<_, String>(2)?,
847 ))
848 })
849 .map_err(|e| CodememError::Storage(e.to_string()))?
850 .filter_map(|r| r.ok())
851 .collect();
852 Ok(rows)
853 }
854
855 fn list_pending_unresolved_refs(
856 &self,
857 ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
858 let conn = self.conn()?;
859 let mut stmt = conn
860 .prepare(
861 "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
862 FROM unresolved_refs",
863 )
864 .map_err(|e| CodememError::Storage(e.to_string()))?;
865 let rows = stmt
866 .query_map([], |row| {
867 Ok(codemem_core::PendingUnresolvedRef {
868 id: row.get::<_, String>(0)?,
869 source_node: row.get::<_, String>(1)?,
870 target_name: row.get::<_, String>(2)?,
871 namespace: row.get::<_, String>(3)?,
872 file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
873 line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
874 ref_kind: row.get::<_, String>(6)?,
875 package_hint: row.get::<_, Option<String>>(7)?,
876 })
877 })
878 .map_err(|e| CodememError::Storage(e.to_string()))?
879 .filter_map(|r| r.ok())
880 .collect();
881 Ok(rows)
882 }
883
884 fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
885 Storage::delete_unresolved_ref(self, id)
886 }
887
888 fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
889 let conn = self.conn()?;
890 let count: i64 = conn
891 .query_row(
892 "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
893 rusqlite::params![namespace],
894 |row| row.get(0),
895 )
896 .map_err(|e| CodememError::Storage(e.to_string()))?;
897 Ok(count as usize)
898 }
899
900 fn list_registered_packages_for_namespace(
901 &self,
902 namespace: &str,
903 ) -> Result<Vec<(String, String, String)>, CodememError> {
904 let entries = Storage::get_packages_for_namespace(self, namespace)?;
905 Ok(entries
906 .into_iter()
907 .map(|e| (e.package_name, e.namespace, e.manifest))
908 .collect())
909 }
910
911 fn store_api_endpoint(
912 &self,
913 method: &str,
914 path: &str,
915 handler_symbol: &str,
916 namespace: &str,
917 ) -> Result<(), CodememError> {
918 use crate::cross_repo::ApiEndpointEntry;
919 let entry = ApiEndpointEntry {
920 id: format!("ep:{}:{}:{}", namespace, method, path),
921 namespace: namespace.to_string(),
922 method: Some(method.to_string()),
923 path: path.to_string(),
924 handler: Some(handler_symbol.to_string()),
925 schema: "{}".to_string(),
926 };
927 Storage::upsert_api_endpoint(self, &entry)
928 }
929
930 fn store_api_client_call(
931 &self,
932 library: &str,
933 method: Option<&str>,
934 caller_symbol: &str,
935 namespace: &str,
936 ) -> Result<(), CodememError> {
937 let id = format!("client:{caller_symbol}:{library}");
938 Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
939 }
940
941 fn list_api_endpoints(
942 &self,
943 namespace: &str,
944 ) -> Result<Vec<(String, String, String, String)>, CodememError> {
945 let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
946 Ok(entries
947 .into_iter()
948 .map(|e| {
949 (
950 e.method.unwrap_or_default(),
951 e.path,
952 e.handler.unwrap_or_default(),
953 e.namespace,
954 )
955 })
956 .collect())
957 }
958}
959
960#[cfg(test)]
961#[path = "tests/backend_tests.rs"]
962mod tests;