1use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5 CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6 Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13 let mut s = String::new();
14 for r in 0..rows {
15 if r > 0 {
16 s.push(',');
17 }
18 s.push('(');
19 for c in 0..cols {
20 if c > 0 {
21 s.push(',');
22 }
23 s.push('?');
24 s.push_str(&(r * cols + c + 1).to_string());
25 }
26 s.push(')');
27 }
28 s
29}
30
31macro_rules! delegate_storage {
33 ($method:ident(&self) -> $ret:ty) => {
35 fn $method(&self) -> $ret {
36 Storage::$method(self)
37 }
38 };
39 ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41 fn $method(&self, $a1: $t1) -> $ret {
42 Storage::$method(self, $a1)
43 }
44 };
45 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47 fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48 Storage::$method(self, $a1, $a2)
49 }
50 };
51 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54 Storage::$method(self, $a1, $a2, $a3)
55 }
56 };
57 ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59 fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60 Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61 }
62 };
63}
64
65impl StorageBackend for Storage {
66 delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69 delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70 delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71 delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72 delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74 fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76 Storage::delete_memory_cascade(self, id)
79 }
80
81 fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83 Storage::delete_memories_batch_cascade(self, ids)
84 }
85
86 delegate_storage!(delete_expired_memories(&self) -> Result<usize, CodememError>);
87 delegate_storage!(expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError>);
88 delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
89 delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
90 delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
91 delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
92 delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
93
94 fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
95 if ids.is_empty() {
96 return Ok(Vec::new());
97 }
98 let conn = self.conn()?;
99
100 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
101 let sql = format!(
102 "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
103 placeholders.join(",")
104 );
105
106 let mut stmt = conn.prepare(&sql).storage_err()?;
107
108 let params: Vec<&dyn rusqlite::types::ToSql> = ids
109 .iter()
110 .map(|id| id as &dyn rusqlite::types::ToSql)
111 .collect();
112
113 let rows = stmt
114 .query_map(params.as_slice(), MemoryRow::from_row)
115 .storage_err()?;
116
117 let mut memories = Vec::new();
118 for row in rows {
119 let row = row.storage_err()?;
120 memories.push(row.into_memory_node()?);
121 }
122 Ok(memories)
123 }
124
125 delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
128 delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
129
130 fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
131 let conn = self.conn()?;
132 let deleted = conn
133 .execute(
134 "DELETE FROM memory_embeddings WHERE memory_id = ?1",
135 [memory_id],
136 )
137 .storage_err()?;
138 Ok(deleted > 0)
139 }
140
141 fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
142 let conn = self.conn()?;
143 let mut stmt = conn
144 .prepare("SELECT memory_id, embedding FROM memory_embeddings")
145 .storage_err()?;
146 let rows = stmt
147 .query_map([], |row| {
148 let id: String = row.get(0)?;
149 let blob: Vec<u8> = row.get(1)?;
150 Ok((id, blob))
151 })
152 .storage_err()?;
153 let mut result = Vec::new();
154 for row in rows {
155 let (id, blob) = row.storage_err()?;
156 let floats: Vec<f32> = blob
157 .chunks_exact(4)
158 .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
159 .collect();
160 result.push((id, floats));
161 }
162 Ok(result)
163 }
164
165 delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
168 delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
169 delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
170 delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
171 delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
172 delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
173 delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
174 delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
175 delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
176 delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
177 delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
180 delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
181
182 fn list_sessions(
183 &self,
184 namespace: Option<&str>,
185 limit: usize,
186 ) -> Result<Vec<Session>, CodememError> {
187 self.list_sessions_with_limit(namespace, limit)
188 }
189
190 delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
193 delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
194
195 delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
198 delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199 delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
200 delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
201
202 fn decay_stale_memories(
205 &self,
206 threshold_ts: i64,
207 decay_factor: f64,
208 ) -> Result<usize, CodememError> {
209 let conn = self.conn()?;
210 let rows = conn
211 .execute(
212 "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
213 params![decay_factor, threshold_ts],
214 )
215 .storage_err()?;
216 Ok(rows)
217 }
218
219 fn list_memories_for_creative(
220 &self,
221 ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
222 let conn = self.conn()?;
223 let mut stmt = conn
224 .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
225 .storage_err()?;
226
227 let rows = stmt
228 .query_map([], |row| {
229 Ok((
230 row.get::<_, String>(0)?,
231 row.get::<_, String>(1)?,
232 row.get::<_, String>(2)?,
233 ))
234 })
235 .storage_err()?
236 .collect::<Result<Vec<_>, _>>()
237 .storage_err()?;
238
239 Ok(rows
240 .into_iter()
241 .map(|(id, mtype, tags_json)| {
242 let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
243 (id, mtype, tags)
244 })
245 .collect())
246 }
247
248 fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
249 let conn = self.conn()?;
250 let mut stmt = conn
251 .prepare(
252 "SELECT a.id, b.id, 1.0 as similarity
253 FROM memories a
254 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
255 WHERE a.id < b.id",
256 )
257 .storage_err()?;
258
259 let rows = stmt
260 .query_map([], |row| {
261 Ok((
262 row.get::<_, String>(0)?,
263 row.get::<_, String>(1)?,
264 row.get::<_, f64>(2)?,
265 ))
266 })
267 .storage_err()?
268 .collect::<Result<Vec<_>, _>>()
269 .storage_err()?;
270
271 Ok(rows)
272 }
273
274 fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
275 let conn = self.conn()?;
276 let mut stmt = conn
277 .prepare(
278 "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
279 )
280 .storage_err()?;
281
282 let ids = stmt
283 .query_map(params![importance_threshold], |row| row.get(0))
284 .storage_err()?
285 .collect::<Result<Vec<String>, _>>()
286 .storage_err()?;
287
288 Ok(ids)
289 }
290
291 fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
294 if memories.is_empty() {
295 return Ok(());
296 }
297 let conn = self.conn()?;
298 let tx = conn.unchecked_transaction().storage_err()?;
299
300 const COLS: usize = 17;
301 const BATCH: usize = 999 / COLS; for chunk in memories.chunks(BATCH) {
304 let placeholders = multi_row_placeholders(COLS, chunk.len());
305 let sql = format!(
306 "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
307 );
308
309 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
310 Vec::with_capacity(chunk.len() * COLS);
311 for memory in chunk {
312 let tags_json = serde_json::to_string(&memory.tags)?;
313 let metadata_json = serde_json::to_string(&memory.metadata)?;
314 param_values.push(Box::new(memory.id.clone()));
315 param_values.push(Box::new(memory.content.clone()));
316 param_values.push(Box::new(memory.memory_type.to_string()));
317 param_values.push(Box::new(memory.importance));
318 param_values.push(Box::new(memory.confidence));
319 param_values.push(Box::new(memory.access_count as i64));
320 param_values.push(Box::new(memory.content_hash.clone()));
321 param_values.push(Box::new(tags_json));
322 param_values.push(Box::new(metadata_json));
323 param_values.push(Box::new(memory.namespace.clone()));
324 param_values.push(Box::new(memory.session_id.clone()));
325 param_values.push(Box::new(memory.repo.clone()));
326 param_values.push(Box::new(memory.git_ref.clone()));
327 param_values.push(Box::new(memory.expires_at.map(|dt| dt.timestamp())));
328 param_values.push(Box::new(memory.created_at.timestamp()));
329 param_values.push(Box::new(memory.updated_at.timestamp()));
330 param_values.push(Box::new(memory.last_accessed_at.timestamp()));
331 }
332
333 let refs: Vec<&dyn rusqlite::types::ToSql> =
334 param_values.iter().map(|p| p.as_ref()).collect();
335 tx.execute(&sql, refs.as_slice()).storage_err()?;
336 }
337
338 tx.commit().storage_err()?;
339 Ok(())
340 }
341
342 fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
343 if items.is_empty() {
344 return Ok(());
345 }
346 let conn = self.conn()?;
347 let tx = conn.unchecked_transaction().storage_err()?;
348
349 const COLS: usize = 2;
350 const BATCH: usize = 999 / COLS; for chunk in items.chunks(BATCH) {
353 let placeholders = multi_row_placeholders(COLS, chunk.len());
354 let sql = format!(
355 "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
356 );
357
358 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
359 Vec::with_capacity(chunk.len() * COLS);
360 for (id, embedding) in chunk {
361 let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
362 param_values.push(Box::new(id.to_string()));
363 param_values.push(Box::new(blob));
364 }
365
366 let refs: Vec<&dyn rusqlite::types::ToSql> =
367 param_values.iter().map(|p| p.as_ref()).collect();
368 tx.execute(&sql, refs.as_slice()).storage_err()?;
369 }
370
371 tx.commit().storage_err()?;
372 Ok(())
373 }
374
375 fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
376 let conn = self.conn()?;
377 let mut stmt = conn
378 .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
379 .storage_err()?;
380
381 let rows = stmt
382 .query_map(params![namespace], |row| {
383 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
384 })
385 .storage_err()?
386 .collect::<Result<Vec<_>, _>>()
387 .storage_err()?;
388
389 Ok(rows.into_iter().collect())
390 }
391
392 fn save_file_hashes(
393 &self,
394 namespace: &str,
395 hashes: &HashMap<String, String>,
396 ) -> Result<(), CodememError> {
397 let conn = self.conn()?;
398 let tx = conn.unchecked_transaction().storage_err()?;
399
400 tx.execute(
401 "DELETE FROM file_hashes WHERE namespace = ?1",
402 params![namespace],
403 )
404 .storage_err()?;
405
406 for (path, hash) in hashes {
407 tx.execute(
408 "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
409 params![namespace, path, hash],
410 )
411 .storage_err()?;
412 }
413
414 tx.commit().storage_err()?;
415 Ok(())
416 }
417
418 fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
419 if nodes.is_empty() {
420 return Ok(());
421 }
422 let conn = self.conn()?;
423 let tx = conn.unchecked_transaction().storage_err()?;
424
425 const COLS: usize = 9;
426 const BATCH: usize = 999 / COLS; for chunk in nodes.chunks(BATCH) {
429 let placeholders = multi_row_placeholders(COLS, chunk.len());
430 let sql = format!(
431 "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to) VALUES {placeholders}"
432 );
433
434 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
435 Vec::with_capacity(chunk.len() * COLS);
436 for node in chunk {
437 let payload_json =
438 serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
439 param_values.push(Box::new(node.id.clone()));
440 param_values.push(Box::new(node.kind.to_string()));
441 param_values.push(Box::new(node.label.clone()));
442 param_values.push(Box::new(payload_json));
443 param_values.push(Box::new(node.centrality));
444 param_values.push(Box::new(node.memory_id.clone()));
445 param_values.push(Box::new(node.namespace.clone()));
446 param_values.push(Box::new(node.valid_from.map(|dt| dt.timestamp())));
447 param_values.push(Box::new(node.valid_to.map(|dt| dt.timestamp())));
448 }
449
450 let refs: Vec<&dyn rusqlite::types::ToSql> =
451 param_values.iter().map(|p| p.as_ref()).collect();
452 tx.execute(&sql, refs.as_slice()).storage_err()?;
453 }
454
455 tx.commit().storage_err()?;
456 Ok(())
457 }
458
459 fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
460 if edges.is_empty() {
461 return Ok(());
462 }
463 let conn = self.conn()?;
464 let tx = conn.unchecked_transaction().storage_err()?;
465
466 const COLS: usize = 9;
467 const BATCH: usize = 999 / COLS; for chunk in edges.chunks(BATCH) {
470 let placeholders = multi_row_placeholders(COLS, chunk.len());
471 let sql = format!(
472 "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
473 );
474
475 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
476 Vec::with_capacity(chunk.len() * COLS);
477 for edge in chunk {
478 let props_json =
479 serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
480 param_values.push(Box::new(edge.id.clone()));
481 param_values.push(Box::new(edge.src.clone()));
482 param_values.push(Box::new(edge.dst.clone()));
483 param_values.push(Box::new(edge.relationship.to_string()));
484 param_values.push(Box::new(edge.weight));
485 param_values.push(Box::new(props_json));
486 param_values.push(Box::new(edge.created_at.timestamp()));
487 param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
488 param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
489 }
490
491 let refs: Vec<&dyn rusqlite::types::ToSql> =
492 param_values.iter().map(|p| p.as_ref()).collect();
493 tx.execute(&sql, refs.as_slice()).storage_err()?;
494 }
495
496 tx.commit().storage_err()?;
497 Ok(())
498 }
499
500 fn get_stale_memories_for_decay(
501 &self,
502 threshold_ts: i64,
503 ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
504 let conn = self.conn()?;
505 let mut stmt = conn
506 .prepare(
507 "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
508 )
509 .storage_err()?;
510
511 let rows = stmt
512 .query_map(params![threshold_ts], |row| {
513 Ok((
514 row.get::<_, String>(0)?,
515 row.get::<_, f64>(1)?,
516 row.get::<_, u32>(2)?,
517 row.get::<_, i64>(3)?,
518 ))
519 })
520 .storage_err()?
521 .collect::<Result<Vec<_>, _>>()
522 .storage_err()?;
523
524 Ok(rows)
525 }
526
527 fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
528 if updates.is_empty() {
529 return Ok(0);
530 }
531 let conn = self.conn()?;
532 let tx = conn.unchecked_transaction().storage_err()?;
533
534 let mut count = 0usize;
535 for (id, importance) in updates {
536 let rows = tx
537 .execute(
538 "UPDATE memories SET importance = ?1 WHERE id = ?2",
539 params![importance, id],
540 )
541 .storage_err()?;
542 count += rows;
543 }
544
545 tx.commit().storage_err()?;
546 Ok(count)
547 }
548
549 fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
550 let conn = self.conn()?;
551 let count: i64 = if let Some(ns) = namespace {
552 conn.query_row(
553 "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
554 params![ns],
555 |row| row.get(0),
556 )
557 .storage_err()?
558 } else {
559 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
560 .storage_err()?
561 };
562 Ok(count as usize)
563 }
564
565 fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
568 let conn = self.conn()?;
569 let mut stmt = conn
570 .prepare(
571 "SELECT m.id, m.content FROM memories m
572 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
573 WHERE me.memory_id IS NULL",
574 )
575 .storage_err()?;
576
577 let rows = stmt
578 .query_map([], |row| {
579 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
580 })
581 .storage_err()?
582 .collect::<Result<Vec<_>, _>>()
583 .storage_err()?;
584
585 Ok(rows)
586 }
587
588 fn search_graph_nodes(
589 &self,
590 query: &str,
591 namespace: Option<&str>,
592 limit: usize,
593 ) -> Result<Vec<GraphNode>, CodememError> {
594 let conn = self.conn()?;
595 let escaped = query
596 .to_lowercase()
597 .replace('\\', "\\\\")
598 .replace('%', "\\%")
599 .replace('_', "\\_");
600 let pattern = format!("%{escaped}%");
601
602 let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(ns) =
603 namespace
604 {
605 (
606 "SELECT id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to \
607 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
608 ORDER BY centrality DESC LIMIT ?3"
609 .to_string(),
610 vec![
611 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
612 Box::new(ns.to_string()),
613 Box::new(limit as i64),
614 ],
615 )
616 } else {
617 (
618 "SELECT id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to \
619 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
620 ORDER BY centrality DESC LIMIT ?2"
621 .to_string(),
622 vec![
623 Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
624 Box::new(limit as i64),
625 ],
626 )
627 };
628
629 let refs: Vec<&dyn rusqlite::types::ToSql> =
630 params_vec.iter().map(|p| p.as_ref()).collect();
631 let mut stmt = conn.prepare(&sql).storage_err()?;
632
633 let rows = stmt
634 .query_map(refs.as_slice(), |row| {
635 let kind_str: String = row.get(1)?;
636 let payload_str: String = row.get(3)?;
637 let valid_from_ts: Option<i64> = row.get(7)?;
638 let valid_to_ts: Option<i64> = row.get(8)?;
639 Ok(GraphNode {
640 id: row.get(0)?,
641 kind: kind_str.parse().unwrap_or(NodeKind::Memory),
642 label: row.get(2)?,
643 payload: serde_json::from_str(&payload_str).unwrap_or_default(),
644 centrality: row.get(4)?,
645 memory_id: row.get(5)?,
646 namespace: row.get(6)?,
647 valid_from: valid_from_ts
648 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)),
649 valid_to: valid_to_ts.and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)),
650 })
651 })
652 .storage_err()?
653 .collect::<Result<Vec<_>, _>>()
654 .storage_err()?;
655
656 Ok(rows)
657 }
658
659 fn list_memories_by_tag(
660 &self,
661 tag: &str,
662 namespace: Option<&str>,
663 limit: usize,
664 ) -> Result<Vec<MemoryNode>, CodememError> {
665 Storage::list_memories_by_tag(self, tag, namespace, limit)
666 }
667
668 fn list_memories_filtered(
669 &self,
670 namespace: Option<&str>,
671 memory_type: Option<&str>,
672 ) -> Result<Vec<MemoryNode>, CodememError> {
673 let conn = self.conn()?;
674 let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
675 content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, \
676 last_accessed_at FROM memories WHERE (expires_at IS NULL OR expires_at > ?1)"
677 .to_string();
678 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
679 param_values.push(Box::new(chrono::Utc::now().timestamp()));
681
682 if let Some(ns) = namespace {
683 param_values.push(Box::new(ns.to_string()));
684 sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
685 }
686 if let Some(mt) = memory_type {
687 param_values.push(Box::new(mt.to_string()));
688 sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
689 }
690 sql.push_str(" ORDER BY created_at DESC LIMIT 10000");
691
692 let refs: Vec<&dyn rusqlite::types::ToSql> =
693 param_values.iter().map(|p| p.as_ref()).collect();
694 let mut stmt = conn.prepare(&sql).storage_err()?;
695
696 let rows = stmt
697 .query_map(refs.as_slice(), MemoryRow::from_row)
698 .storage_err()?;
699
700 let mut result = Vec::new();
701 for row in rows {
702 let mr = row.storage_err()?;
703 result.push(mr.into_memory_node()?);
704 }
705
706 Ok(result)
707 }
708
709 delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
712 delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
713 delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
714 delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
715 delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
716 delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
717 delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
718
719 delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
722
723 fn begin_transaction(&self) -> Result<(), CodememError> {
726 let conn = self.conn()?;
727 conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
728 self.in_transaction
729 .store(true, std::sync::atomic::Ordering::Release);
730 Ok(())
731 }
732
733 fn commit_transaction(&self) -> Result<(), CodememError> {
734 let conn = self.conn()?;
735 conn.execute_batch("COMMIT").storage_err()?;
736 self.in_transaction
739 .store(false, std::sync::atomic::Ordering::Release);
740 Ok(())
741 }
742
743 fn rollback_transaction(&self) -> Result<(), CodememError> {
744 let conn = self.conn()?;
745 conn.execute_batch("ROLLBACK").storage_err()?;
746 self.in_transaction
750 .store(false, std::sync::atomic::Ordering::Release);
751 Ok(())
752 }
753
754 fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
757 Storage::list_repos(self)
758 }
759
760 fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
761 Storage::add_repo(self, repo)
762 }
763
764 fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
765 Storage::get_repo(self, id)
766 }
767
768 fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
769 Storage::remove_repo(self, id)
770 }
771
772 fn update_repo_status(
773 &self,
774 id: &str,
775 status: &str,
776 indexed_at: Option<&str>,
777 ) -> Result<(), CodememError> {
778 Storage::update_repo_status(self, id, status, indexed_at)
779 }
780
781 fn graph_edges_for_namespace_with_cross(
784 &self,
785 namespace: &str,
786 include_cross_namespace: bool,
787 ) -> Result<Vec<Edge>, CodememError> {
788 Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
789 }
790
791 fn upsert_package_registry(
792 &self,
793 package_name: &str,
794 namespace: &str,
795 version: &str,
796 manifest: &str,
797 ) -> Result<(), CodememError> {
798 Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
799 }
800
801 fn store_unresolved_ref(
802 &self,
803 source_qualified_name: &str,
804 target_name: &str,
805 source_namespace: &str,
806 file_path: &str,
807 line: usize,
808 ref_kind: &str,
809 package_hint: Option<&str>,
810 ) -> Result<(), CodememError> {
811 use crate::cross_repo::UnresolvedRefEntry;
812 let entry = UnresolvedRefEntry {
813 id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
814 namespace: source_namespace.to_string(),
815 source_node: source_qualified_name.to_string(),
816 target_name: target_name.to_string(),
817 package_hint: package_hint.map(|s| s.to_string()),
818 ref_kind: ref_kind.to_string(),
819 file_path: Some(file_path.to_string()),
820 line: Some(line as i64),
821 created_at: chrono::Utc::now().timestamp(),
822 };
823 Storage::insert_unresolved_ref(self, &entry)
824 }
825
826 fn store_unresolved_refs_batch(
827 &self,
828 refs: &[codemem_core::UnresolvedRefData],
829 ) -> Result<usize, CodememError> {
830 use crate::cross_repo::UnresolvedRefEntry;
831 let now = chrono::Utc::now().timestamp();
832 let entries: Vec<UnresolvedRefEntry> = refs
833 .iter()
834 .map(|r| UnresolvedRefEntry {
835 id: format!(
836 "uref:{}:{}:{}",
837 r.namespace, r.source_qualified_name, r.target_name
838 ),
839 namespace: r.namespace.clone(),
840 source_node: r.source_qualified_name.clone(),
841 target_name: r.target_name.clone(),
842 package_hint: r.package_hint.clone(),
843 ref_kind: r.ref_kind.clone(),
844 file_path: Some(r.file_path.clone()),
845 line: Some(r.line as i64),
846 created_at: now,
847 })
848 .collect();
849 let count = entries.len();
850 Storage::insert_unresolved_refs_batch(self, &entries)?;
851 Ok(count)
852 }
853
854 fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
855 let conn = self.conn()?;
856 let mut stmt = conn
857 .prepare("SELECT package_name, namespace, manifest FROM package_registry")
858 .map_err(|e| CodememError::Storage(e.to_string()))?;
859 let rows = stmt
860 .query_map([], |row| {
861 Ok((
862 row.get::<_, String>(0)?,
863 row.get::<_, String>(1)?,
864 row.get::<_, String>(2)?,
865 ))
866 })
867 .map_err(|e| CodememError::Storage(e.to_string()))?
868 .collect::<Result<Vec<_>, _>>()
869 .map_err(|e| CodememError::Storage(e.to_string()))?;
870 Ok(rows)
871 }
872
873 fn list_pending_unresolved_refs(
874 &self,
875 ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
876 let conn = self.conn()?;
877 let mut stmt = conn
878 .prepare(
879 "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
880 FROM unresolved_refs",
881 )
882 .map_err(|e| CodememError::Storage(e.to_string()))?;
883 let rows = stmt
884 .query_map([], |row| {
885 Ok(codemem_core::PendingUnresolvedRef {
886 id: row.get::<_, String>(0)?,
887 source_node: row.get::<_, String>(1)?,
888 target_name: row.get::<_, String>(2)?,
889 namespace: row.get::<_, String>(3)?,
890 file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
891 line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
892 ref_kind: row.get::<_, String>(6)?,
893 package_hint: row.get::<_, Option<String>>(7)?,
894 })
895 })
896 .map_err(|e| CodememError::Storage(e.to_string()))?
897 .collect::<Result<Vec<_>, _>>()
898 .map_err(|e| CodememError::Storage(e.to_string()))?;
899 Ok(rows)
900 }
901
902 fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
903 Storage::delete_unresolved_ref(self, id)
904 }
905
906 fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
907 let conn = self.conn()?;
908 let count: i64 = conn
909 .query_row(
910 "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
911 rusqlite::params![namespace],
912 |row| row.get(0),
913 )
914 .map_err(|e| CodememError::Storage(e.to_string()))?;
915 Ok(count as usize)
916 }
917
918 fn list_registered_packages_for_namespace(
919 &self,
920 namespace: &str,
921 ) -> Result<Vec<(String, String, String)>, CodememError> {
922 let entries = Storage::get_packages_for_namespace(self, namespace)?;
923 Ok(entries
924 .into_iter()
925 .map(|e| (e.package_name, e.namespace, e.manifest))
926 .collect())
927 }
928
929 fn store_api_endpoint(
930 &self,
931 method: &str,
932 path: &str,
933 handler_symbol: &str,
934 namespace: &str,
935 ) -> Result<(), CodememError> {
936 use crate::cross_repo::ApiEndpointEntry;
937 let entry = ApiEndpointEntry {
938 id: format!("ep:{}:{}:{}", namespace, method, path),
939 namespace: namespace.to_string(),
940 method: Some(method.to_string()),
941 path: path.to_string(),
942 handler: Some(handler_symbol.to_string()),
943 schema: "{}".to_string(),
944 };
945 Storage::upsert_api_endpoint(self, &entry)
946 }
947
948 fn store_api_client_call(
949 &self,
950 library: &str,
951 method: Option<&str>,
952 caller_symbol: &str,
953 namespace: &str,
954 ) -> Result<(), CodememError> {
955 let id = format!("client:{caller_symbol}:{library}");
956 Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
957 }
958
959 fn list_api_endpoints(
960 &self,
961 namespace: &str,
962 ) -> Result<Vec<(String, String, String, String)>, CodememError> {
963 let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
964 Ok(entries
965 .into_iter()
966 .map(|e| {
967 (
968 e.method.unwrap_or_default(),
969 e.path,
970 e.handler.unwrap_or_default(),
971 e.namespace,
972 )
973 })
974 .collect())
975 }
976
977 fn store_event_channel(
978 &self,
979 channel: &str,
980 direction: &str,
981 protocol: &str,
982 handler: &str,
983 namespace: &str,
984 description: &str,
985 ) -> Result<(), CodememError> {
986 use crate::cross_repo::EventChannelEntry;
987 let entry = EventChannelEntry {
988 id: format!("ec:{namespace}:{direction}:{channel}"),
989 namespace: namespace.to_string(),
990 channel: channel.to_string(),
991 direction: direction.to_string(),
992 protocol: protocol.to_string(),
993 message_schema: "{}".to_string(),
994 description: description.to_string(),
995 handler: handler.to_string(),
996 spec_file: String::new(),
997 };
998 Storage::upsert_event_channel(self, &entry)
999 }
1000
1001 fn list_event_channels(
1002 &self,
1003 namespace: &str,
1004 ) -> Result<Vec<(String, String, String, String, String)>, CodememError> {
1005 let entries = Storage::get_event_channels_for_namespace(self, namespace)?;
1006 Ok(entries
1007 .into_iter()
1008 .map(|e| (e.channel, e.direction, e.protocol, e.handler, e.description))
1009 .collect())
1010 }
1011
1012 fn list_all_event_channels(
1013 &self,
1014 ) -> Result<Vec<(String, String, String, String, String, String)>, CodememError> {
1015 let entries = Storage::get_all_event_channels(self)?;
1016 Ok(entries
1017 .into_iter()
1018 .map(|e| {
1019 (
1020 e.channel,
1021 e.direction,
1022 e.protocol,
1023 e.handler,
1024 e.namespace,
1025 e.description,
1026 )
1027 })
1028 .collect())
1029 }
1030}
1031
1032#[cfg(test)]
1033#[path = "tests/backend_tests.rs"]
1034mod tests;