Skip to main content

codemem_storage/
backend.rs

1//! `StorageBackend` trait implementation for Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5    CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6    Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11/// Generate SQL placeholders for multi-row INSERT: "(?1,?2,...),(?3,?4,...)"
12fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13    let mut s = String::new();
14    for r in 0..rows {
15        if r > 0 {
16            s.push(',');
17        }
18        s.push('(');
19        for c in 0..cols {
20            if c > 0 {
21                s.push(',');
22            }
23            s.push('?');
24            s.push_str(&(r * cols + c + 1).to_string());
25        }
26        s.push(')');
27    }
28    s
29}
30
31/// Macro to delegate pure-forwarding trait methods to `Storage` inherent methods.
32macro_rules! delegate_storage {
33    // &self, no args
34    ($method:ident(&self) -> $ret:ty) => {
35        fn $method(&self) -> $ret {
36            Storage::$method(self)
37        }
38    };
39    // &self, one arg
40    ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41        fn $method(&self, $a1: $t1) -> $ret {
42            Storage::$method(self, $a1)
43        }
44    };
45    // &self, two args
46    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47        fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48            Storage::$method(self, $a1, $a2)
49        }
50    };
51    // &self, three args
52    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54            Storage::$method(self, $a1, $a2, $a3)
55        }
56    };
57    // &self, five args
58    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60            Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61        }
62    };
63}
64
65impl StorageBackend for Storage {
66    // ── Memory CRUD (delegated) ───────────────────────────────────────
67
68    delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69    delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70    delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71    delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72    delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74    /// M1: Override with transactional cascade delete.
75    fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76        // Delegates to Storage::delete_memory_cascade which wraps all
77        // deletes (memory + graph nodes/edges + embedding) in a single transaction.
78        Storage::delete_memory_cascade(self, id)
79    }
80
81    /// Override with transactional batch cascade delete.
82    fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83        Storage::delete_memories_batch_cascade(self, ids)
84    }
85
86    delegate_storage!(delete_expired_memories(&self) -> Result<usize, CodememError>);
87    delegate_storage!(expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError>);
88    delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
89    delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
90    delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
91    delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
92    delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
93
94    fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
95        if ids.is_empty() {
96            return Ok(Vec::new());
97        }
98        let conn = self.conn()?;
99
100        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
101        let sql = format!(
102            "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
103            placeholders.join(",")
104        );
105
106        let mut stmt = conn.prepare(&sql).storage_err()?;
107
108        let params: Vec<&dyn rusqlite::types::ToSql> = ids
109            .iter()
110            .map(|id| id as &dyn rusqlite::types::ToSql)
111            .collect();
112
113        let rows = stmt
114            .query_map(params.as_slice(), MemoryRow::from_row)
115            .storage_err()?;
116
117        let mut memories = Vec::new();
118        for row in rows {
119            let row = row.storage_err()?;
120            memories.push(row.into_memory_node()?);
121        }
122        Ok(memories)
123    }
124
125    // ── Embedding Persistence (delegated where possible) ──────────────
126
127    delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
128    delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
129
130    fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
131        let conn = self.conn()?;
132        let deleted = conn
133            .execute(
134                "DELETE FROM memory_embeddings WHERE memory_id = ?1",
135                [memory_id],
136            )
137            .storage_err()?;
138        Ok(deleted > 0)
139    }
140
141    fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
142        let conn = self.conn()?;
143        let mut stmt = conn
144            .prepare("SELECT memory_id, embedding FROM memory_embeddings")
145            .storage_err()?;
146        let rows = stmt
147            .query_map([], |row| {
148                let id: String = row.get(0)?;
149                let blob: Vec<u8> = row.get(1)?;
150                Ok((id, blob))
151            })
152            .storage_err()?;
153        let mut result = Vec::new();
154        for row in rows {
155            let (id, blob) = row.storage_err()?;
156            let floats: Vec<f32> = blob
157                .chunks_exact(4)
158                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
159                .collect();
160            result.push((id, floats));
161        }
162        Ok(result)
163    }
164
165    // ── Graph Node/Edge Persistence (delegated) ───────────────────────
166
167    delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
168    delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
169    delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
170    delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
171    delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
172    delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
173    delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
174    delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
175    delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
176    delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
177    // ── Sessions (delegated where possible) ───────────────────────────
178
179    delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
180    delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
181
182    fn list_sessions(
183        &self,
184        namespace: Option<&str>,
185        limit: usize,
186    ) -> Result<Vec<Session>, CodememError> {
187        self.list_sessions_with_limit(namespace, limit)
188    }
189
190    // ── Consolidation (delegated) ─────────────────────────────────────
191
192    delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
193    delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
194
195    // ── Pattern Detection (delegated) ─────────────────────────────────
196
197    delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
198    delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199    delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
200    delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
201
202    // ── Bulk Operations ───────────────────────────────────────────────
203
204    fn decay_stale_memories(
205        &self,
206        threshold_ts: i64,
207        decay_factor: f64,
208    ) -> Result<usize, CodememError> {
209        let conn = self.conn()?;
210        let rows = conn
211            .execute(
212                "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
213                params![decay_factor, threshold_ts],
214            )
215            .storage_err()?;
216        Ok(rows)
217    }
218
219    fn list_memories_for_creative(
220        &self,
221    ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
222        let conn = self.conn()?;
223        let mut stmt = conn
224            .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
225            .storage_err()?;
226
227        let rows = stmt
228            .query_map([], |row| {
229                Ok((
230                    row.get::<_, String>(0)?,
231                    row.get::<_, String>(1)?,
232                    row.get::<_, String>(2)?,
233                ))
234            })
235            .storage_err()?
236            .collect::<Result<Vec<_>, _>>()
237            .storage_err()?;
238
239        Ok(rows
240            .into_iter()
241            .map(|(id, mtype, tags_json)| {
242                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
243                (id, mtype, tags)
244            })
245            .collect())
246    }
247
248    fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
249        let conn = self.conn()?;
250        let mut stmt = conn
251            .prepare(
252                "SELECT a.id, b.id, 1.0 as similarity
253                 FROM memories a
254                 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
255                 WHERE a.id < b.id",
256            )
257            .storage_err()?;
258
259        let rows = stmt
260            .query_map([], |row| {
261                Ok((
262                    row.get::<_, String>(0)?,
263                    row.get::<_, String>(1)?,
264                    row.get::<_, f64>(2)?,
265                ))
266            })
267            .storage_err()?
268            .collect::<Result<Vec<_>, _>>()
269            .storage_err()?;
270
271        Ok(rows)
272    }
273
274    fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
275        let conn = self.conn()?;
276        let mut stmt = conn
277            .prepare(
278                "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
279            )
280            .storage_err()?;
281
282        let ids = stmt
283            .query_map(params![importance_threshold], |row| row.get(0))
284            .storage_err()?
285            .collect::<Result<Vec<String>, _>>()
286            .storage_err()?;
287
288        Ok(ids)
289    }
290
291    // ── Batch Operations ──────────────────────────────────────────────
292
293    fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
294        if memories.is_empty() {
295            return Ok(());
296        }
297        let conn = self.conn()?;
298        let tx = conn.unchecked_transaction().storage_err()?;
299
300        const COLS: usize = 17;
301        const BATCH: usize = 999 / COLS; // 58
302
303        for chunk in memories.chunks(BATCH) {
304            let placeholders = multi_row_placeholders(COLS, chunk.len());
305            let sql = format!(
306                "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
307            );
308
309            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
310                Vec::with_capacity(chunk.len() * COLS);
311            for memory in chunk {
312                let tags_json = serde_json::to_string(&memory.tags)?;
313                let metadata_json = serde_json::to_string(&memory.metadata)?;
314                param_values.push(Box::new(memory.id.clone()));
315                param_values.push(Box::new(memory.content.clone()));
316                param_values.push(Box::new(memory.memory_type.to_string()));
317                param_values.push(Box::new(memory.importance));
318                param_values.push(Box::new(memory.confidence));
319                param_values.push(Box::new(memory.access_count as i64));
320                param_values.push(Box::new(memory.content_hash.clone()));
321                param_values.push(Box::new(tags_json));
322                param_values.push(Box::new(metadata_json));
323                param_values.push(Box::new(memory.namespace.clone()));
324                param_values.push(Box::new(memory.session_id.clone()));
325                param_values.push(Box::new(memory.repo.clone()));
326                param_values.push(Box::new(memory.git_ref.clone()));
327                param_values.push(Box::new(memory.expires_at.map(|dt| dt.timestamp())));
328                param_values.push(Box::new(memory.created_at.timestamp()));
329                param_values.push(Box::new(memory.updated_at.timestamp()));
330                param_values.push(Box::new(memory.last_accessed_at.timestamp()));
331            }
332
333            let refs: Vec<&dyn rusqlite::types::ToSql> =
334                param_values.iter().map(|p| p.as_ref()).collect();
335            tx.execute(&sql, refs.as_slice()).storage_err()?;
336        }
337
338        tx.commit().storage_err()?;
339        Ok(())
340    }
341
342    fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
343        if items.is_empty() {
344            return Ok(());
345        }
346        let conn = self.conn()?;
347        let tx = conn.unchecked_transaction().storage_err()?;
348
349        const COLS: usize = 2;
350        const BATCH: usize = 999 / COLS; // 499
351
352        for chunk in items.chunks(BATCH) {
353            let placeholders = multi_row_placeholders(COLS, chunk.len());
354            let sql = format!(
355                "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
356            );
357
358            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
359                Vec::with_capacity(chunk.len() * COLS);
360            for (id, embedding) in chunk {
361                let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
362                param_values.push(Box::new(id.to_string()));
363                param_values.push(Box::new(blob));
364            }
365
366            let refs: Vec<&dyn rusqlite::types::ToSql> =
367                param_values.iter().map(|p| p.as_ref()).collect();
368            tx.execute(&sql, refs.as_slice()).storage_err()?;
369        }
370
371        tx.commit().storage_err()?;
372        Ok(())
373    }
374
375    fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
376        let conn = self.conn()?;
377        let mut stmt = conn
378            .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
379            .storage_err()?;
380
381        let rows = stmt
382            .query_map(params![namespace], |row| {
383                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
384            })
385            .storage_err()?
386            .collect::<Result<Vec<_>, _>>()
387            .storage_err()?;
388
389        Ok(rows.into_iter().collect())
390    }
391
392    fn save_file_hashes(
393        &self,
394        namespace: &str,
395        hashes: &HashMap<String, String>,
396    ) -> Result<(), CodememError> {
397        let conn = self.conn()?;
398        let tx = conn.unchecked_transaction().storage_err()?;
399
400        tx.execute(
401            "DELETE FROM file_hashes WHERE namespace = ?1",
402            params![namespace],
403        )
404        .storage_err()?;
405
406        for (path, hash) in hashes {
407            tx.execute(
408                "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
409                params![namespace, path, hash],
410            )
411            .storage_err()?;
412        }
413
414        tx.commit().storage_err()?;
415        Ok(())
416    }
417
418    fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
419        if nodes.is_empty() {
420            return Ok(());
421        }
422        let conn = self.conn()?;
423        let tx = conn.unchecked_transaction().storage_err()?;
424
425        const COLS: usize = 9;
426        const BATCH: usize = 999 / COLS; // 111
427
428        for chunk in nodes.chunks(BATCH) {
429            let placeholders = multi_row_placeholders(COLS, chunk.len());
430            let sql = format!(
431                "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to) VALUES {placeholders}"
432            );
433
434            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
435                Vec::with_capacity(chunk.len() * COLS);
436            for node in chunk {
437                let payload_json =
438                    serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
439                param_values.push(Box::new(node.id.clone()));
440                param_values.push(Box::new(node.kind.to_string()));
441                param_values.push(Box::new(node.label.clone()));
442                param_values.push(Box::new(payload_json));
443                param_values.push(Box::new(node.centrality));
444                param_values.push(Box::new(node.memory_id.clone()));
445                param_values.push(Box::new(node.namespace.clone()));
446                param_values.push(Box::new(node.valid_from.map(|dt| dt.timestamp())));
447                param_values.push(Box::new(node.valid_to.map(|dt| dt.timestamp())));
448            }
449
450            let refs: Vec<&dyn rusqlite::types::ToSql> =
451                param_values.iter().map(|p| p.as_ref()).collect();
452            tx.execute(&sql, refs.as_slice()).storage_err()?;
453        }
454
455        tx.commit().storage_err()?;
456        Ok(())
457    }
458
459    fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
460        if edges.is_empty() {
461            return Ok(());
462        }
463        let conn = self.conn()?;
464        let tx = conn.unchecked_transaction().storage_err()?;
465
466        const COLS: usize = 9;
467        const BATCH: usize = 999 / COLS; // 111
468
469        for chunk in edges.chunks(BATCH) {
470            let placeholders = multi_row_placeholders(COLS, chunk.len());
471            let sql = format!(
472                "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
473            );
474
475            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
476                Vec::with_capacity(chunk.len() * COLS);
477            for edge in chunk {
478                let props_json =
479                    serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
480                param_values.push(Box::new(edge.id.clone()));
481                param_values.push(Box::new(edge.src.clone()));
482                param_values.push(Box::new(edge.dst.clone()));
483                param_values.push(Box::new(edge.relationship.to_string()));
484                param_values.push(Box::new(edge.weight));
485                param_values.push(Box::new(props_json));
486                param_values.push(Box::new(edge.created_at.timestamp()));
487                param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
488                param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
489            }
490
491            let refs: Vec<&dyn rusqlite::types::ToSql> =
492                param_values.iter().map(|p| p.as_ref()).collect();
493            tx.execute(&sql, refs.as_slice()).storage_err()?;
494        }
495
496        tx.commit().storage_err()?;
497        Ok(())
498    }
499
500    fn get_stale_memories_for_decay(
501        &self,
502        threshold_ts: i64,
503    ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
504        let conn = self.conn()?;
505        let mut stmt = conn
506            .prepare(
507                "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
508            )
509            .storage_err()?;
510
511        let rows = stmt
512            .query_map(params![threshold_ts], |row| {
513                Ok((
514                    row.get::<_, String>(0)?,
515                    row.get::<_, f64>(1)?,
516                    row.get::<_, u32>(2)?,
517                    row.get::<_, i64>(3)?,
518                ))
519            })
520            .storage_err()?
521            .collect::<Result<Vec<_>, _>>()
522            .storage_err()?;
523
524        Ok(rows)
525    }
526
527    fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
528        if updates.is_empty() {
529            return Ok(0);
530        }
531        let conn = self.conn()?;
532        let tx = conn.unchecked_transaction().storage_err()?;
533
534        let mut count = 0usize;
535        for (id, importance) in updates {
536            let rows = tx
537                .execute(
538                    "UPDATE memories SET importance = ?1 WHERE id = ?2",
539                    params![importance, id],
540                )
541                .storage_err()?;
542            count += rows;
543        }
544
545        tx.commit().storage_err()?;
546        Ok(count)
547    }
548
549    fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
550        let conn = self.conn()?;
551        let count: i64 = if let Some(ns) = namespace {
552            conn.query_row(
553                "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
554                params![ns],
555                |row| row.get(0),
556            )
557            .storage_err()?
558        } else {
559            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
560                .storage_err()?
561        };
562        Ok(count as usize)
563    }
564
565    // ── Query Helpers ─────────────────────────────────────────────────
566
567    fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
568        let conn = self.conn()?;
569        let mut stmt = conn
570            .prepare(
571                "SELECT m.id, m.content FROM memories m
572                 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
573                 WHERE me.memory_id IS NULL",
574            )
575            .storage_err()?;
576
577        let rows = stmt
578            .query_map([], |row| {
579                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
580            })
581            .storage_err()?
582            .collect::<Result<Vec<_>, _>>()
583            .storage_err()?;
584
585        Ok(rows)
586    }
587
588    fn search_graph_nodes(
589        &self,
590        query: &str,
591        namespace: Option<&str>,
592        limit: usize,
593    ) -> Result<Vec<GraphNode>, CodememError> {
594        let conn = self.conn()?;
595        let escaped = query
596            .to_lowercase()
597            .replace('\\', "\\\\")
598            .replace('%', "\\%")
599            .replace('_', "\\_");
600        let pattern = format!("%{escaped}%");
601
602        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(ns) =
603            namespace
604        {
605            (
606                    "SELECT id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to \
607                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
608                 ORDER BY centrality DESC LIMIT ?3"
609                        .to_string(),
610                    vec![
611                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
612                        Box::new(ns.to_string()),
613                        Box::new(limit as i64),
614                    ],
615                )
616        } else {
617            (
618                    "SELECT id, kind, label, payload, centrality, memory_id, namespace, valid_from, valid_to \
619                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
620                 ORDER BY centrality DESC LIMIT ?2"
621                        .to_string(),
622                    vec![
623                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
624                        Box::new(limit as i64),
625                    ],
626                )
627        };
628
629        let refs: Vec<&dyn rusqlite::types::ToSql> =
630            params_vec.iter().map(|p| p.as_ref()).collect();
631        let mut stmt = conn.prepare(&sql).storage_err()?;
632
633        let rows = stmt
634            .query_map(refs.as_slice(), |row| {
635                let kind_str: String = row.get(1)?;
636                let payload_str: String = row.get(3)?;
637                let valid_from_ts: Option<i64> = row.get(7)?;
638                let valid_to_ts: Option<i64> = row.get(8)?;
639                Ok(GraphNode {
640                    id: row.get(0)?,
641                    kind: kind_str.parse().unwrap_or(NodeKind::Memory),
642                    label: row.get(2)?,
643                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
644                    centrality: row.get(4)?,
645                    memory_id: row.get(5)?,
646                    namespace: row.get(6)?,
647                    valid_from: valid_from_ts
648                        .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)),
649                    valid_to: valid_to_ts.and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)),
650                })
651            })
652            .storage_err()?
653            .collect::<Result<Vec<_>, _>>()
654            .storage_err()?;
655
656        Ok(rows)
657    }
658
659    fn list_memories_by_tag(
660        &self,
661        tag: &str,
662        namespace: Option<&str>,
663        limit: usize,
664    ) -> Result<Vec<MemoryNode>, CodememError> {
665        Storage::list_memories_by_tag(self, tag, namespace, limit)
666    }
667
668    fn list_memories_filtered(
669        &self,
670        namespace: Option<&str>,
671        memory_type: Option<&str>,
672    ) -> Result<Vec<MemoryNode>, CodememError> {
673        let conn = self.conn()?;
674        let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
675                        content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, \
676                        last_accessed_at FROM memories WHERE (expires_at IS NULL OR expires_at > ?1)"
677            .to_string();
678        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
679        // ?1 is the expiry timestamp
680        param_values.push(Box::new(chrono::Utc::now().timestamp()));
681
682        if let Some(ns) = namespace {
683            param_values.push(Box::new(ns.to_string()));
684            sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
685        }
686        if let Some(mt) = memory_type {
687            param_values.push(Box::new(mt.to_string()));
688            sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
689        }
690        sql.push_str(" ORDER BY created_at DESC LIMIT 10000");
691
692        let refs: Vec<&dyn rusqlite::types::ToSql> =
693            param_values.iter().map(|p| p.as_ref()).collect();
694        let mut stmt = conn.prepare(&sql).storage_err()?;
695
696        let rows = stmt
697            .query_map(refs.as_slice(), MemoryRow::from_row)
698            .storage_err()?;
699
700        let mut result = Vec::new();
701        for row in rows {
702            let mr = row.storage_err()?;
703            result.push(mr.into_memory_node()?);
704        }
705
706        Ok(result)
707    }
708
709    // ── Session Activity (delegated) ──────────────────────────────────
710
711    delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
712    delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
713    delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
714    delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
715    delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
716    delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
717    delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
718
719    // ── Stats (delegated) ─────────────────────────────────────────────
720
721    delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
722
723    // ── Transaction Control ──────────────────────────────────────────
724
725    fn begin_transaction(&self) -> Result<(), CodememError> {
726        let conn = self.conn()?;
727        conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
728        self.in_transaction
729            .store(true, std::sync::atomic::Ordering::Release);
730        Ok(())
731    }
732
733    fn commit_transaction(&self) -> Result<(), CodememError> {
734        let conn = self.conn()?;
735        conn.execute_batch("COMMIT").storage_err()?;
736        // Clear flag after COMMIT succeeds — if COMMIT fails, the flag
737        // stays set so callers know a transaction is still active.
738        self.in_transaction
739            .store(false, std::sync::atomic::Ordering::Release);
740        Ok(())
741    }
742
743    fn rollback_transaction(&self) -> Result<(), CodememError> {
744        let conn = self.conn()?;
745        conn.execute_batch("ROLLBACK").storage_err()?;
746        // Clear flag after ROLLBACK succeeds — mirrors commit_transaction's
747        // pattern. If ROLLBACK fails, the flag stays set so callers know a
748        // transaction is still active.
749        self.in_transaction
750            .store(false, std::sync::atomic::Ordering::Release);
751        Ok(())
752    }
753
754    // ── Repository Management ────────────────────────────────────────
755
756    fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
757        Storage::list_repos(self)
758    }
759
760    fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
761        Storage::add_repo(self, repo)
762    }
763
764    fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
765        Storage::get_repo(self, id)
766    }
767
768    fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
769        Storage::remove_repo(self, id)
770    }
771
772    fn update_repo_status(
773        &self,
774        id: &str,
775        status: &str,
776        indexed_at: Option<&str>,
777    ) -> Result<(), CodememError> {
778        Storage::update_repo_status(self, id, status, indexed_at)
779    }
780
781    // ── Cross-Repo Persistence ────────────────────────────────────────
782
783    fn graph_edges_for_namespace_with_cross(
784        &self,
785        namespace: &str,
786        include_cross_namespace: bool,
787    ) -> Result<Vec<Edge>, CodememError> {
788        Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
789    }
790
791    fn upsert_package_registry(
792        &self,
793        package_name: &str,
794        namespace: &str,
795        version: &str,
796        manifest: &str,
797    ) -> Result<(), CodememError> {
798        Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
799    }
800
801    fn store_unresolved_ref(
802        &self,
803        source_qualified_name: &str,
804        target_name: &str,
805        source_namespace: &str,
806        file_path: &str,
807        line: usize,
808        ref_kind: &str,
809        package_hint: Option<&str>,
810    ) -> Result<(), CodememError> {
811        use crate::cross_repo::UnresolvedRefEntry;
812        let entry = UnresolvedRefEntry {
813            id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
814            namespace: source_namespace.to_string(),
815            source_node: source_qualified_name.to_string(),
816            target_name: target_name.to_string(),
817            package_hint: package_hint.map(|s| s.to_string()),
818            ref_kind: ref_kind.to_string(),
819            file_path: Some(file_path.to_string()),
820            line: Some(line as i64),
821            created_at: chrono::Utc::now().timestamp(),
822        };
823        Storage::insert_unresolved_ref(self, &entry)
824    }
825
826    fn store_unresolved_refs_batch(
827        &self,
828        refs: &[codemem_core::UnresolvedRefData],
829    ) -> Result<usize, CodememError> {
830        use crate::cross_repo::UnresolvedRefEntry;
831        let now = chrono::Utc::now().timestamp();
832        let entries: Vec<UnresolvedRefEntry> = refs
833            .iter()
834            .map(|r| UnresolvedRefEntry {
835                id: format!(
836                    "uref:{}:{}:{}",
837                    r.namespace, r.source_qualified_name, r.target_name
838                ),
839                namespace: r.namespace.clone(),
840                source_node: r.source_qualified_name.clone(),
841                target_name: r.target_name.clone(),
842                package_hint: r.package_hint.clone(),
843                ref_kind: r.ref_kind.clone(),
844                file_path: Some(r.file_path.clone()),
845                line: Some(r.line as i64),
846                created_at: now,
847            })
848            .collect();
849        let count = entries.len();
850        Storage::insert_unresolved_refs_batch(self, &entries)?;
851        Ok(count)
852    }
853
854    fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
855        let conn = self.conn()?;
856        let mut stmt = conn
857            .prepare("SELECT package_name, namespace, manifest FROM package_registry")
858            .map_err(|e| CodememError::Storage(e.to_string()))?;
859        let rows = stmt
860            .query_map([], |row| {
861                Ok((
862                    row.get::<_, String>(0)?,
863                    row.get::<_, String>(1)?,
864                    row.get::<_, String>(2)?,
865                ))
866            })
867            .map_err(|e| CodememError::Storage(e.to_string()))?
868            .collect::<Result<Vec<_>, _>>()
869            .map_err(|e| CodememError::Storage(e.to_string()))?;
870        Ok(rows)
871    }
872
873    fn list_pending_unresolved_refs(
874        &self,
875    ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
876        let conn = self.conn()?;
877        let mut stmt = conn
878            .prepare(
879                "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
880                 FROM unresolved_refs",
881            )
882            .map_err(|e| CodememError::Storage(e.to_string()))?;
883        let rows = stmt
884            .query_map([], |row| {
885                Ok(codemem_core::PendingUnresolvedRef {
886                    id: row.get::<_, String>(0)?,
887                    source_node: row.get::<_, String>(1)?,
888                    target_name: row.get::<_, String>(2)?,
889                    namespace: row.get::<_, String>(3)?,
890                    file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
891                    line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
892                    ref_kind: row.get::<_, String>(6)?,
893                    package_hint: row.get::<_, Option<String>>(7)?,
894                })
895            })
896            .map_err(|e| CodememError::Storage(e.to_string()))?
897            .collect::<Result<Vec<_>, _>>()
898            .map_err(|e| CodememError::Storage(e.to_string()))?;
899        Ok(rows)
900    }
901
902    fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
903        Storage::delete_unresolved_ref(self, id)
904    }
905
906    fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
907        let conn = self.conn()?;
908        let count: i64 = conn
909            .query_row(
910                "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
911                rusqlite::params![namespace],
912                |row| row.get(0),
913            )
914            .map_err(|e| CodememError::Storage(e.to_string()))?;
915        Ok(count as usize)
916    }
917
918    fn list_registered_packages_for_namespace(
919        &self,
920        namespace: &str,
921    ) -> Result<Vec<(String, String, String)>, CodememError> {
922        let entries = Storage::get_packages_for_namespace(self, namespace)?;
923        Ok(entries
924            .into_iter()
925            .map(|e| (e.package_name, e.namespace, e.manifest))
926            .collect())
927    }
928
929    fn store_api_endpoint(
930        &self,
931        method: &str,
932        path: &str,
933        handler_symbol: &str,
934        namespace: &str,
935    ) -> Result<(), CodememError> {
936        use crate::cross_repo::ApiEndpointEntry;
937        let entry = ApiEndpointEntry {
938            id: format!("ep:{}:{}:{}", namespace, method, path),
939            namespace: namespace.to_string(),
940            method: Some(method.to_string()),
941            path: path.to_string(),
942            handler: Some(handler_symbol.to_string()),
943            schema: "{}".to_string(),
944        };
945        Storage::upsert_api_endpoint(self, &entry)
946    }
947
948    fn store_api_client_call(
949        &self,
950        library: &str,
951        method: Option<&str>,
952        caller_symbol: &str,
953        namespace: &str,
954    ) -> Result<(), CodememError> {
955        let id = format!("client:{caller_symbol}:{library}");
956        Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
957    }
958
959    fn list_api_endpoints(
960        &self,
961        namespace: &str,
962    ) -> Result<Vec<(String, String, String, String)>, CodememError> {
963        let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
964        Ok(entries
965            .into_iter()
966            .map(|e| {
967                (
968                    e.method.unwrap_or_default(),
969                    e.path,
970                    e.handler.unwrap_or_default(),
971                    e.namespace,
972                )
973            })
974            .collect())
975    }
976
977    fn store_event_channel(
978        &self,
979        channel: &str,
980        direction: &str,
981        protocol: &str,
982        handler: &str,
983        namespace: &str,
984        description: &str,
985    ) -> Result<(), CodememError> {
986        use crate::cross_repo::EventChannelEntry;
987        let entry = EventChannelEntry {
988            id: format!("ec:{namespace}:{direction}:{channel}"),
989            namespace: namespace.to_string(),
990            channel: channel.to_string(),
991            direction: direction.to_string(),
992            protocol: protocol.to_string(),
993            message_schema: "{}".to_string(),
994            description: description.to_string(),
995            handler: handler.to_string(),
996            spec_file: String::new(),
997        };
998        Storage::upsert_event_channel(self, &entry)
999    }
1000
1001    fn list_event_channels(
1002        &self,
1003        namespace: &str,
1004    ) -> Result<Vec<(String, String, String, String, String)>, CodememError> {
1005        let entries = Storage::get_event_channels_for_namespace(self, namespace)?;
1006        Ok(entries
1007            .into_iter()
1008            .map(|e| (e.channel, e.direction, e.protocol, e.handler, e.description))
1009            .collect())
1010    }
1011
1012    fn list_all_event_channels(
1013        &self,
1014    ) -> Result<Vec<(String, String, String, String, String, String)>, CodememError> {
1015        let entries = Storage::get_all_event_channels(self)?;
1016        Ok(entries
1017            .into_iter()
1018            .map(|e| {
1019                (
1020                    e.channel,
1021                    e.direction,
1022                    e.protocol,
1023                    e.handler,
1024                    e.namespace,
1025                    e.description,
1026                )
1027            })
1028            .collect())
1029    }
1030}
1031
1032#[cfg(test)]
1033#[path = "tests/backend_tests.rs"]
1034mod tests;