1use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5 CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6 Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13 let mut s = String::new();
14 for r in 0..rows {
15 if r > 0 {
16 s.push(',');
17 }
18 s.push('(');
19 for c in 0..cols {
20 if c > 0 {
21 s.push(',');
22 }
23 s.push('?');
24 s.push_str(&(r * cols + c + 1).to_string());
25 }
26 s.push(')');
27 }
28 s
29}
30
31macro_rules! delegate_storage {
33 ($method:ident(&self) -> $ret:ty) => {
35 fn $method(&self) -> $ret {
36 Storage::$method(self)
37 }
38 };
39 ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41 fn $method(&self, $a1: $t1) -> $ret {
42 Storage::$method(self, $a1)
43 }
44 };
45 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47 fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48 Storage::$method(self, $a1, $a2)
49 }
50 };
51 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54 Storage::$method(self, $a1, $a2, $a3)
55 }
56 };
57 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60 Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61 }
62 };
63}
64
65impl StorageBackend for Storage {
66 delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69 delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70 delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71 delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72 delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74 fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76 Storage::delete_memory_cascade(self, id)
79 }
80
81 fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83 Storage::delete_memories_batch_cascade(self, ids)
84 }
85
86 delegate_storage!(delete_expired_memories(&self) -> Result<usize, CodememError>);
87 delegate_storage!(expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError>);
88 delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
89 delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
90 delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
91 delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
92 delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
93
94 fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
95 if ids.is_empty() {
96 return Ok(Vec::new());
97 }
98 let conn = self.conn()?;
99
100 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
101 let sql = format!(
102 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
103 placeholders.join(",")
104 );
105
106 let mut stmt = conn.prepare(&sql).storage_err()?;
107
108 let params: Vec<&dyn rusqlite::types::ToSql> = ids
109 .iter()
110 .map(|id| id as &dyn rusqlite::types::ToSql)
111 .collect();
112
113 let rows = stmt
114 .query_map(params.as_slice(), MemoryRow::from_row)
115 .storage_err()?;
116
117 let mut memories = Vec::new();
118 for row in rows {
119 let row = row.storage_err()?;
120 memories.push(row.into_memory_node()?);
121 }
122 Ok(memories)
123 }
124
125 delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
128 delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
129
130 fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
131 let conn = self.conn()?;
132 let deleted = conn
133 .execute(
134 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
135 [memory_id],
136 )
137 .storage_err()?;
138 Ok(deleted > 0)
139 }
140
141 fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
142 let conn = self.conn()?;
143 let mut stmt = conn
144 .prepare("SELECT memory_id, embedding FROM memory_embeddings")
145 .storage_err()?;
146 let rows = stmt
147 .query_map([], |row| {
148 let id: String = row.get(0)?;
149 let blob: Vec<u8> = row.get(1)?;
150 Ok((id, blob))
151 })
152 .storage_err()?;
153 let mut result = Vec::new();
154 for row in rows {
155 let (id, blob) = row.storage_err()?;
156 let floats: Vec<f32> = blob
157 .chunks_exact(4)
158 .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
159 .collect();
160 result.push((id, floats));
161 }
162 Ok(result)
163 }
164
165 delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
168 delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
169 delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
170 delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
171 delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
172 delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
173 delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
174 delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
175 delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
176 delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
177 delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
180 delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
181
182 fn list_sessions(
183 &self,
184 namespace: Option<&str>,
185 limit: usize,
186 ) -> Result<Vec<Session>, CodememError> {
187 self.list_sessions_with_limit(namespace, limit)
188 }
189
190 delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
193 delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
194
195 delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
198 delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199 delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
200 delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
201
202 fn decay_stale_memories(
205 &self,
206 threshold_ts: i64,
207 decay_factor: f64,
208 ) -> Result<usize, CodememError> {
209 let conn = self.conn()?;
210 let rows = conn
211 .execute(
212 "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
213 params![decay_factor, threshold_ts],
214 )
215 .storage_err()?;
216 Ok(rows)
217 }
218
219 fn list_memories_for_creative(
220 &self,
221 ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
222 let conn = self.conn()?;
223 let mut stmt = conn
224 .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
225 .storage_err()?;
226
227 let rows = stmt
228 .query_map([], |row| {
229 Ok((
230 row.get::<_, String>(0)?,
231 row.get::<_, String>(1)?,
232 row.get::<_, String>(2)?,
233 ))
234 })
235 .storage_err()?
236 .collect::<Result<Vec<_>, _>>()
237 .storage_err()?;
238
239 Ok(rows
240 .into_iter()
241 .map(|(id, mtype, tags_json)| {
242 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
243 (id, mtype, tags)
244 })
245 .collect())
246 }
247
248 fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
249 let conn = self.conn()?;
250 let mut stmt = conn
251 .prepare(
252 "SELECT a.id, b.id, 1.0 as similarity
253 FROM memories a
254 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
255 WHERE a.id < b.id",
256 )
257 .storage_err()?;
258
259 let rows = stmt
260 .query_map([], |row| {
261 Ok((
262 row.get::<_, String>(0)?,
263 row.get::<_, String>(1)?,
264 row.get::<_, f64>(2)?,
265 ))
266 })
267 .storage_err()?
268 .collect::<Result<Vec<_>, _>>()
269 .storage_err()?;
270
271 Ok(rows)
272 }
273
274 fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
275 let conn = self.conn()?;
276 let mut stmt = conn
277 .prepare(
278 "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
279 )
280 .storage_err()?;
281
282 let ids = stmt
283 .query_map(params![importance_threshold], |row| row.get(0))
284 .storage_err()?
285 .collect::<Result<Vec<String>, _>>()
286 .storage_err()?;
287
288 Ok(ids)
289 }
290
291 fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
294 if memories.is_empty() {
295 return Ok(());
296 }
297 let conn = self.conn()?;
298 let tx = conn.unchecked_transaction().storage_err()?;
299
300 const COLS: usize = 17;
301 const BATCH: usize = 999 / COLS; for chunk in memories.chunks(BATCH) {
304 let placeholders = multi_row_placeholders(COLS, chunk.len());
305 let sql = format!(
306 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
307 );
308
309 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
310 Vec::with_capacity(chunk.len() * COLS);
311 for memory in chunk {
312 let tags_json = serde_json::to_string(&memory.tags)?;
313 let metadata_json = serde_json::to_string(&memory.metadata)?;
314 param_values.push(Box::new(memory.id.clone()));
315 param_values.push(Box::new(memory.content.clone()));
316 param_values.push(Box::new(memory.memory_type.to_string()));
317 param_values.push(Box::new(memory.importance));
318 param_values.push(Box::new(memory.confidence));
319 param_values.push(Box::new(memory.access_count as i64));
320 param_values.push(Box::new(memory.content_hash.clone()));
321 param_values.push(Box::new(tags_json));
322 param_values.push(Box::new(metadata_json));
323 param_values.push(Box::new(memory.namespace.clone()));
324 param_values.push(Box::new(memory.session_id.clone()));
325 param_values.push(Box::new(memory.repo.clone()));
326 param_values.push(Box::new(memory.git_ref.clone()));
327 param_values.push(Box::new(memory.expires_at.map(|dt| dt.timestamp())));
328 param_values.push(Box::new(memory.created_at.timestamp()));
329 param_values.push(Box::new(memory.updated_at.timestamp()));
330 param_values.push(Box::new(memory.last_accessed_at.timestamp()));
331 }
332
333 let refs: Vec<&dyn rusqlite::types::ToSql> =
334 param_values.iter().map(|p| p.as_ref()).collect();
335 tx.execute(&sql, refs.as_slice()).storage_err()?;
336 }
337
338 tx.commit().storage_err()?;
339 Ok(())
340 }
341
342 fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
343 if items.is_empty() {
344 return Ok(());
345 }
346 let conn = self.conn()?;
347 let tx = conn.unchecked_transaction().storage_err()?;
348
349 const COLS: usize = 2;
350 const BATCH: usize = 999 / COLS; for chunk in items.chunks(BATCH) {
353 let placeholders = multi_row_placeholders(COLS, chunk.len());
354 let sql = format!(
355 "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
356 );
357
358 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
359 Vec::with_capacity(chunk.len() * COLS);
360 for (id, embedding) in chunk {
361 let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
362 param_values.push(Box::new(id.to_string()));
363 param_values.push(Box::new(blob));
364 }
365
366 let refs: Vec<&dyn rusqlite::types::ToSql> =
367 param_values.iter().map(|p| p.as_ref()).collect();
368 tx.execute(&sql, refs.as_slice()).storage_err()?;
369 }
370
371 tx.commit().storage_err()?;
372 Ok(())
373 }
374
375 fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
376 let conn = self.conn()?;
377 let mut stmt = conn
378 .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
379 .storage_err()?;
380
381 let rows = stmt
382 .query_map(params![namespace], |row| {
383 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
384 })
385 .storage_err()?
386 .collect::<Result<Vec<_>, _>>()
387 .storage_err()?;
388
389 Ok(rows.into_iter().collect())
390 }
391
392 fn save_file_hashes(
393 &self,
394 namespace: &str,
395 hashes: &HashMap<String, String>,
396 ) -> Result<(), CodememError> {
397 let conn = self.conn()?;
398 let tx = conn.unchecked_transaction().storage_err()?;
399
400 tx.execute(
401 "DELETE FROM file_hashes WHERE namespace = ?1",
402 params![namespace],
403 )
404 .storage_err()?;
405
406 for (path, hash) in hashes {
407 tx.execute(
408 "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
409 params![namespace, path, hash],
410 )
411 .storage_err()?;
412 }
413
414 tx.commit().storage_err()?;
415 Ok(())
416 }
417
418 fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
419 if nodes.is_empty() {
420 return Ok(());
421 }
422 let conn = self.conn()?;
423 let tx = conn.unchecked_transaction().storage_err()?;
424
425 const COLS: usize = 7;
426 const BATCH: usize = 999 / COLS; for chunk in nodes.chunks(BATCH) {
429 let placeholders = multi_row_placeholders(COLS, chunk.len());
430 let sql = format!(
431 "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
432 );
433
434 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
435 Vec::with_capacity(chunk.len() * COLS);
436 for node in chunk {
437 let payload_json =
438 serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
439 param_values.push(Box::new(node.id.clone()));
440 param_values.push(Box::new(node.kind.to_string()));
441 param_values.push(Box::new(node.label.clone()));
442 param_values.push(Box::new(payload_json));
443 param_values.push(Box::new(node.centrality));
444 param_values.push(Box::new(node.memory_id.clone()));
445 param_values.push(Box::new(node.namespace.clone()));
446 }
447
448 let refs: Vec<&dyn rusqlite::types::ToSql> =
449 param_values.iter().map(|p| p.as_ref()).collect();
450 tx.execute(&sql, refs.as_slice()).storage_err()?;
451 }
452
453 tx.commit().storage_err()?;
454 Ok(())
455 }
456
457 fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
458 if edges.is_empty() {
459 return Ok(());
460 }
461 let conn = self.conn()?;
462 let tx = conn.unchecked_transaction().storage_err()?;
463
464 const COLS: usize = 9;
465 const BATCH: usize = 999 / COLS; for chunk in edges.chunks(BATCH) {
468 let placeholders = multi_row_placeholders(COLS, chunk.len());
469 let sql = format!(
470 "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
471 );
472
473 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
474 Vec::with_capacity(chunk.len() * COLS);
475 for edge in chunk {
476 let props_json =
477 serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
478 param_values.push(Box::new(edge.id.clone()));
479 param_values.push(Box::new(edge.src.clone()));
480 param_values.push(Box::new(edge.dst.clone()));
481 param_values.push(Box::new(edge.relationship.to_string()));
482 param_values.push(Box::new(edge.weight));
483 param_values.push(Box::new(props_json));
484 param_values.push(Box::new(edge.created_at.timestamp()));
485 param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
486 param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
487 }
488
489 let refs: Vec<&dyn rusqlite::types::ToSql> =
490 param_values.iter().map(|p| p.as_ref()).collect();
491 tx.execute(&sql, refs.as_slice()).storage_err()?;
492 }
493
494 tx.commit().storage_err()?;
495 Ok(())
496 }
497
498 fn get_stale_memories_for_decay(
499 &self,
500 threshold_ts: i64,
501 ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
502 let conn = self.conn()?;
503 let mut stmt = conn
504 .prepare(
505 "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
506 )
507 .storage_err()?;
508
509 let rows = stmt
510 .query_map(params![threshold_ts], |row| {
511 Ok((
512 row.get::<_, String>(0)?,
513 row.get::<_, f64>(1)?,
514 row.get::<_, u32>(2)?,
515 row.get::<_, i64>(3)?,
516 ))
517 })
518 .storage_err()?
519 .collect::<Result<Vec<_>, _>>()
520 .storage_err()?;
521
522 Ok(rows)
523 }
524
525 fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
526 if updates.is_empty() {
527 return Ok(0);
528 }
529 let conn = self.conn()?;
530 let tx = conn.unchecked_transaction().storage_err()?;
531
532 let mut count = 0usize;
533 for (id, importance) in updates {
534 let rows = tx
535 .execute(
536 "UPDATE memories SET importance = ?1 WHERE id = ?2",
537 params![importance, id],
538 )
539 .storage_err()?;
540 count += rows;
541 }
542
543 tx.commit().storage_err()?;
544 Ok(count)
545 }
546
547 fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
548 let conn = self.conn()?;
549 let count: i64 = if let Some(ns) = namespace {
550 conn.query_row(
551 "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
552 params![ns],
553 |row| row.get(0),
554 )
555 .storage_err()?
556 } else {
557 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
558 .storage_err()?
559 };
560 Ok(count as usize)
561 }
562
563 fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
566 let conn = self.conn()?;
567 let mut stmt = conn
568 .prepare(
569 "SELECT m.id, m.content FROM memories m
570 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
571 WHERE me.memory_id IS NULL",
572 )
573 .storage_err()?;
574
575 let rows = stmt
576 .query_map([], |row| {
577 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
578 })
579 .storage_err()?
580 .collect::<Result<Vec<_>, _>>()
581 .storage_err()?;
582
583 Ok(rows)
584 }
585
586 fn search_graph_nodes(
587 &self,
588 query: &str,
589 namespace: Option<&str>,
590 limit: usize,
591 ) -> Result<Vec<GraphNode>, CodememError> {
592 let conn = self.conn()?;
593 let escaped = query
594 .to_lowercase()
595 .replace('\\', "\\\\")
596 .replace('%', "\\%")
597 .replace('_', "\\_");
598 let pattern = format!("%{escaped}%");
599
600 let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
601 if let Some(ns) = namespace {
602 (
603 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
604 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
605 ORDER BY centrality DESC LIMIT ?3"
606 .to_string(),
607 vec![
608 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
609 Box::new(ns.to_string()),
610 Box::new(limit as i64),
611 ],
612 )
613 } else {
614 (
615 "SELECT id, kind, label, payload, centrality, memory_id, namespace \
616 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
617 ORDER BY centrality DESC LIMIT ?2"
618 .to_string(),
619 vec![
620 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
621 Box::new(limit as i64),
622 ],
623 )
624 };
625
626 let refs: Vec<&dyn rusqlite::types::ToSql> =
627 params_vec.iter().map(|p| p.as_ref()).collect();
628 let mut stmt = conn.prepare(&sql).storage_err()?;
629
630 let rows = stmt
631 .query_map(refs.as_slice(), |row| {
632 let kind_str: String = row.get(1)?;
633 let payload_str: String = row.get(3)?;
634 Ok(GraphNode {
635 id: row.get(0)?,
636 kind: kind_str.parse().unwrap_or(NodeKind::Memory),
637 label: row.get(2)?,
638 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
639 centrality: row.get(4)?,
640 memory_id: row.get(5)?,
641 namespace: row.get(6)?,
642 })
643 })
644 .storage_err()?
645 .collect::<Result<Vec<_>, _>>()
646 .storage_err()?;
647
648 Ok(rows)
649 }
650
651 fn list_memories_by_tag(
652 &self,
653 tag: &str,
654 namespace: Option<&str>,
655 limit: usize,
656 ) -> Result<Vec<MemoryNode>, CodememError> {
657 Storage::list_memories_by_tag(self, tag, namespace, limit)
658 }
659
660 fn list_memories_filtered(
661 &self,
662 namespace: Option<&str>,
663 memory_type: Option<&str>,
664 ) -> Result<Vec<MemoryNode>, CodememError> {
665 let conn = self.conn()?;
666 let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
667 content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, \
668 last_accessed_at FROM memories WHERE (expires_at IS NULL OR expires_at > ?1)"
669 .to_string();
670 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
671 param_values.push(Box::new(chrono::Utc::now().timestamp()));
673
674 if let Some(ns) = namespace {
675 param_values.push(Box::new(ns.to_string()));
676 sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
677 }
678 if let Some(mt) = memory_type {
679 param_values.push(Box::new(mt.to_string()));
680 sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
681 }
682 sql.push_str(" ORDER BY created_at DESC LIMIT 10000");
683
684 let refs: Vec<&dyn rusqlite::types::ToSql> =
685 param_values.iter().map(|p| p.as_ref()).collect();
686 let mut stmt = conn.prepare(&sql).storage_err()?;
687
688 let rows = stmt
689 .query_map(refs.as_slice(), MemoryRow::from_row)
690 .storage_err()?;
691
692 let mut result = Vec::new();
693 for row in rows {
694 let mr = row.storage_err()?;
695 result.push(mr.into_memory_node()?);
696 }
697
698 Ok(result)
699 }
700
701 delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
704 delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
705 delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
706 delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
707 delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
708 delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
709 delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
710
711 delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
714
715 fn begin_transaction(&self) -> Result<(), CodememError> {
718 let conn = self.conn()?;
719 conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
720 self.in_transaction
721 .store(true, std::sync::atomic::Ordering::Release);
722 Ok(())
723 }
724
725 fn commit_transaction(&self) -> Result<(), CodememError> {
726 let conn = self.conn()?;
727 conn.execute_batch("COMMIT").storage_err()?;
728 self.in_transaction
731 .store(false, std::sync::atomic::Ordering::Release);
732 Ok(())
733 }
734
735 fn rollback_transaction(&self) -> Result<(), CodememError> {
736 let conn = self.conn()?;
737 conn.execute_batch("ROLLBACK").storage_err()?;
738 self.in_transaction
742 .store(false, std::sync::atomic::Ordering::Release);
743 Ok(())
744 }
745
746 fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
749 Storage::list_repos(self)
750 }
751
752 fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
753 Storage::add_repo(self, repo)
754 }
755
756 fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
757 Storage::get_repo(self, id)
758 }
759
760 fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
761 Storage::remove_repo(self, id)
762 }
763
764 fn update_repo_status(
765 &self,
766 id: &str,
767 status: &str,
768 indexed_at: Option<&str>,
769 ) -> Result<(), CodememError> {
770 Storage::update_repo_status(self, id, status, indexed_at)
771 }
772
773 fn graph_edges_for_namespace_with_cross(
776 &self,
777 namespace: &str,
778 include_cross_namespace: bool,
779 ) -> Result<Vec<Edge>, CodememError> {
780 Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
781 }
782
783 fn upsert_package_registry(
784 &self,
785 package_name: &str,
786 namespace: &str,
787 version: &str,
788 manifest: &str,
789 ) -> Result<(), CodememError> {
790 Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
791 }
792
793 fn store_unresolved_ref(
794 &self,
795 source_qualified_name: &str,
796 target_name: &str,
797 source_namespace: &str,
798 file_path: &str,
799 line: usize,
800 ref_kind: &str,
801 package_hint: Option<&str>,
802 ) -> Result<(), CodememError> {
803 use crate::cross_repo::UnresolvedRefEntry;
804 let entry = UnresolvedRefEntry {
805 id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
806 namespace: source_namespace.to_string(),
807 source_node: source_qualified_name.to_string(),
808 target_name: target_name.to_string(),
809 package_hint: package_hint.map(|s| s.to_string()),
810 ref_kind: ref_kind.to_string(),
811 file_path: Some(file_path.to_string()),
812 line: Some(line as i64),
813 created_at: chrono::Utc::now().timestamp(),
814 };
815 Storage::insert_unresolved_ref(self, &entry)
816 }
817
818 fn store_unresolved_refs_batch(
819 &self,
820 refs: &[codemem_core::UnresolvedRefData],
821 ) -> Result<usize, CodememError> {
822 use crate::cross_repo::UnresolvedRefEntry;
823 let now = chrono::Utc::now().timestamp();
824 let entries: Vec<UnresolvedRefEntry> = refs
825 .iter()
826 .map(|r| UnresolvedRefEntry {
827 id: format!(
828 "uref:{}:{}:{}",
829 r.namespace, r.source_qualified_name, r.target_name
830 ),
831 namespace: r.namespace.clone(),
832 source_node: r.source_qualified_name.clone(),
833 target_name: r.target_name.clone(),
834 package_hint: r.package_hint.clone(),
835 ref_kind: r.ref_kind.clone(),
836 file_path: Some(r.file_path.clone()),
837 line: Some(r.line as i64),
838 created_at: now,
839 })
840 .collect();
841 let count = entries.len();
842 Storage::insert_unresolved_refs_batch(self, &entries)?;
843 Ok(count)
844 }
845
846 fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
847 let conn = self.conn()?;
848 let mut stmt = conn
849 .prepare("SELECT package_name, namespace, manifest FROM package_registry")
850 .map_err(|e| CodememError::Storage(e.to_string()))?;
851 let rows = stmt
852 .query_map([], |row| {
853 Ok((
854 row.get::<_, String>(0)?,
855 row.get::<_, String>(1)?,
856 row.get::<_, String>(2)?,
857 ))
858 })
859 .map_err(|e| CodememError::Storage(e.to_string()))?
860 .collect::<Result<Vec<_>, _>>()
861 .map_err(|e| CodememError::Storage(e.to_string()))?;
862 Ok(rows)
863 }
864
865 fn list_pending_unresolved_refs(
866 &self,
867 ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
868 let conn = self.conn()?;
869 let mut stmt = conn
870 .prepare(
871 "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
872 FROM unresolved_refs",
873 )
874 .map_err(|e| CodememError::Storage(e.to_string()))?;
875 let rows = stmt
876 .query_map([], |row| {
877 Ok(codemem_core::PendingUnresolvedRef {
878 id: row.get::<_, String>(0)?,
879 source_node: row.get::<_, String>(1)?,
880 target_name: row.get::<_, String>(2)?,
881 namespace: row.get::<_, String>(3)?,
882 file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
883 line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
884 ref_kind: row.get::<_, String>(6)?,
885 package_hint: row.get::<_, Option<String>>(7)?,
886 })
887 })
888 .map_err(|e| CodememError::Storage(e.to_string()))?
889 .collect::<Result<Vec<_>, _>>()
890 .map_err(|e| CodememError::Storage(e.to_string()))?;
891 Ok(rows)
892 }
893
894 fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
895 Storage::delete_unresolved_ref(self, id)
896 }
897
898 fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
899 let conn = self.conn()?;
900 let count: i64 = conn
901 .query_row(
902 "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
903 rusqlite::params![namespace],
904 |row| row.get(0),
905 )
906 .map_err(|e| CodememError::Storage(e.to_string()))?;
907 Ok(count as usize)
908 }
909
910 fn list_registered_packages_for_namespace(
911 &self,
912 namespace: &str,
913 ) -> Result<Vec<(String, String, String)>, CodememError> {
914 let entries = Storage::get_packages_for_namespace(self, namespace)?;
915 Ok(entries
916 .into_iter()
917 .map(|e| (e.package_name, e.namespace, e.manifest))
918 .collect())
919 }
920
921 fn store_api_endpoint(
922 &self,
923 method: &str,
924 path: &str,
925 handler_symbol: &str,
926 namespace: &str,
927 ) -> Result<(), CodememError> {
928 use crate::cross_repo::ApiEndpointEntry;
929 let entry = ApiEndpointEntry {
930 id: format!("ep:{}:{}:{}", namespace, method, path),
931 namespace: namespace.to_string(),
932 method: Some(method.to_string()),
933 path: path.to_string(),
934 handler: Some(handler_symbol.to_string()),
935 schema: "{}".to_string(),
936 };
937 Storage::upsert_api_endpoint(self, &entry)
938 }
939
940 fn store_api_client_call(
941 &self,
942 library: &str,
943 method: Option<&str>,
944 caller_symbol: &str,
945 namespace: &str,
946 ) -> Result<(), CodememError> {
947 let id = format!("client:{caller_symbol}:{library}");
948 Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
949 }
950
951 fn list_api_endpoints(
952 &self,
953 namespace: &str,
954 ) -> Result<Vec<(String, String, String, String)>, CodememError> {
955 let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
956 Ok(entries
957 .into_iter()
958 .map(|e| {
959 (
960 e.method.unwrap_or_default(),
961 e.path,
962 e.handler.unwrap_or_default(),
963 e.namespace,
964 )
965 })
966 .collect())
967 }
968
969 fn store_event_channel(
970 &self,
971 channel: &str,
972 direction: &str,
973 protocol: &str,
974 handler: &str,
975 namespace: &str,
976 description: &str,
977 ) -> Result<(), CodememError> {
978 use crate::cross_repo::EventChannelEntry;
979 let entry = EventChannelEntry {
980 id: format!("ec:{namespace}:{direction}:{channel}"),
981 namespace: namespace.to_string(),
982 channel: channel.to_string(),
983 direction: direction.to_string(),
984 protocol: protocol.to_string(),
985 message_schema: "{}".to_string(),
986 description: description.to_string(),
987 handler: handler.to_string(),
988 spec_file: String::new(),
989 };
990 Storage::upsert_event_channel(self, &entry)
991 }
992
993 fn list_event_channels(
994 &self,
995 namespace: &str,
996 ) -> Result<Vec<(String, String, String, String, String)>, CodememError> {
997 let entries = Storage::get_event_channels_for_namespace(self, namespace)?;
998 Ok(entries
999 .into_iter()
1000 .map(|e| (e.channel, e.direction, e.protocol, e.handler, e.description))
1001 .collect())
1002 }
1003
1004 fn list_all_event_channels(
1005 &self,
1006 ) -> Result<Vec<(String, String, String, String, String, String)>, CodememError> {
1007 let entries = Storage::get_all_event_channels(self)?;
1008 Ok(entries
1009 .into_iter()
1010 .map(|e| {
1011 (
1012 e.channel,
1013 e.direction,
1014 e.protocol,
1015 e.handler,
1016 e.namespace,
1017 e.description,
1018 )
1019 })
1020 .collect())
1021 }
1022}
1023
1024#[cfg(test)]
1025#[path = "tests/backend_tests.rs"]
1026mod tests;