Skip to main content

codemem_storage/
backend.rs

1//! `StorageBackend` trait implementation for Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5    CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6    Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11/// Generate SQL placeholders for multi-row INSERT: "(?1,?2,...),(?3,?4,...)"
12fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13    let mut s = String::new();
14    for r in 0..rows {
15        if r > 0 {
16            s.push(',');
17        }
18        s.push('(');
19        for c in 0..cols {
20            if c > 0 {
21                s.push(',');
22            }
23            s.push('?');
24            s.push_str(&(r * cols + c + 1).to_string());
25        }
26        s.push(')');
27    }
28    s
29}
30
31/// Macro to delegate pure-forwarding trait methods to `Storage` inherent methods.
32macro_rules! delegate_storage {
33    // &self, no args
34    ($method:ident(&self) -> $ret:ty) => {
35        fn $method(&self) -> $ret {
36            Storage::$method(self)
37        }
38    };
39    // &self, one arg
40    ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41        fn $method(&self, $a1: $t1) -> $ret {
42            Storage::$method(self, $a1)
43        }
44    };
45    // &self, two args
46    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47        fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48            Storage::$method(self, $a1, $a2)
49        }
50    };
51    // &self, three args
52    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54            Storage::$method(self, $a1, $a2, $a3)
55        }
56    };
57    // &self, five args
58    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60            Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61        }
62    };
63}
64
65impl StorageBackend for Storage {
66    // ── Memory CRUD (delegated) ───────────────────────────────────────
67
68    delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69    delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70    delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71    delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72    delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74    /// M1: Override with transactional cascade delete.
75    fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76        // Delegates to Storage::delete_memory_cascade which wraps all
77        // deletes (memory + graph nodes/edges + embedding) in a single transaction.
78        Storage::delete_memory_cascade(self, id)
79    }
80
81    /// Override with transactional batch cascade delete.
82    fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83        Storage::delete_memories_batch_cascade(self, ids)
84    }
85
86    delegate_storage!(delete_expired_memories(&self) -> Result<usize, CodememError>);
87    delegate_storage!(expire_memories_for_file(&self, file_path: &str) -> Result<usize, CodememError>);
88    delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
89    delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
90    delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
91    delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
92    delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
93
94    fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
95        if ids.is_empty() {
96            return Ok(Vec::new());
97        }
98        let conn = self.conn()?;
99
100        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
101        let sql = format!(
102            "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
103            placeholders.join(",")
104        );
105
106        let mut stmt = conn.prepare(&sql).storage_err()?;
107
108        let params: Vec<&dyn rusqlite::types::ToSql> = ids
109            .iter()
110            .map(|id| id as &dyn rusqlite::types::ToSql)
111            .collect();
112
113        let rows = stmt
114            .query_map(params.as_slice(), MemoryRow::from_row)
115            .storage_err()?;
116
117        let mut memories = Vec::new();
118        for row in rows {
119            let row = row.storage_err()?;
120            memories.push(row.into_memory_node()?);
121        }
122        Ok(memories)
123    }
124
125    // ── Embedding Persistence (delegated where possible) ──────────────
126
127    delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
128    delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
129
130    fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
131        let conn = self.conn()?;
132        let deleted = conn
133            .execute(
134                "DELETE FROM memory_embeddings WHERE memory_id = ?1",
135                [memory_id],
136            )
137            .storage_err()?;
138        Ok(deleted > 0)
139    }
140
141    fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
142        let conn = self.conn()?;
143        let mut stmt = conn
144            .prepare("SELECT memory_id, embedding FROM memory_embeddings")
145            .storage_err()?;
146        let rows = stmt
147            .query_map([], |row| {
148                let id: String = row.get(0)?;
149                let blob: Vec<u8> = row.get(1)?;
150                Ok((id, blob))
151            })
152            .storage_err()?;
153        let mut result = Vec::new();
154        for row in rows {
155            let (id, blob) = row.storage_err()?;
156            let floats: Vec<f32> = blob
157                .chunks_exact(4)
158                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
159                .collect();
160            result.push((id, floats));
161        }
162        Ok(result)
163    }
164
165    // ── Graph Node/Edge Persistence (delegated) ───────────────────────
166
167    delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
168    delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
169    delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
170    delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
171    delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
172    delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
173    delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
174    delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
175    delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
176    delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
177    // ── Sessions (delegated where possible) ───────────────────────────
178
179    delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
180    delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
181
182    fn list_sessions(
183        &self,
184        namespace: Option<&str>,
185        limit: usize,
186    ) -> Result<Vec<Session>, CodememError> {
187        self.list_sessions_with_limit(namespace, limit)
188    }
189
190    // ── Consolidation (delegated) ─────────────────────────────────────
191
192    delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
193    delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
194
195    // ── Pattern Detection (delegated) ─────────────────────────────────
196
197    delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
198    delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199    delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
200    delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
201
202    // ── Bulk Operations ───────────────────────────────────────────────
203
204    fn decay_stale_memories(
205        &self,
206        threshold_ts: i64,
207        decay_factor: f64,
208    ) -> Result<usize, CodememError> {
209        let conn = self.conn()?;
210        let rows = conn
211            .execute(
212                "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
213                params![decay_factor, threshold_ts],
214            )
215            .storage_err()?;
216        Ok(rows)
217    }
218
219    fn list_memories_for_creative(
220        &self,
221    ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
222        let conn = self.conn()?;
223        let mut stmt = conn
224            .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
225            .storage_err()?;
226
227        let rows = stmt
228            .query_map([], |row| {
229                Ok((
230                    row.get::<_, String>(0)?,
231                    row.get::<_, String>(1)?,
232                    row.get::<_, String>(2)?,
233                ))
234            })
235            .storage_err()?
236            .collect::<Result<Vec<_>, _>>()
237            .storage_err()?;
238
239        Ok(rows
240            .into_iter()
241            .map(|(id, mtype, tags_json)| {
242                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
243                (id, mtype, tags)
244            })
245            .collect())
246    }
247
248    fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
249        let conn = self.conn()?;
250        let mut stmt = conn
251            .prepare(
252                "SELECT a.id, b.id, 1.0 as similarity
253                 FROM memories a
254                 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
255                 WHERE a.id < b.id",
256            )
257            .storage_err()?;
258
259        let rows = stmt
260            .query_map([], |row| {
261                Ok((
262                    row.get::<_, String>(0)?,
263                    row.get::<_, String>(1)?,
264                    row.get::<_, f64>(2)?,
265                ))
266            })
267            .storage_err()?
268            .collect::<Result<Vec<_>, _>>()
269            .storage_err()?;
270
271        Ok(rows)
272    }
273
274    fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
275        let conn = self.conn()?;
276        let mut stmt = conn
277            .prepare(
278                "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
279            )
280            .storage_err()?;
281
282        let ids = stmt
283            .query_map(params![importance_threshold], |row| row.get(0))
284            .storage_err()?
285            .collect::<Result<Vec<String>, _>>()
286            .storage_err()?;
287
288        Ok(ids)
289    }
290
291    // ── Batch Operations ──────────────────────────────────────────────
292
293    fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
294        if memories.is_empty() {
295            return Ok(());
296        }
297        let conn = self.conn()?;
298        let tx = conn.unchecked_transaction().storage_err()?;
299
300        const COLS: usize = 17;
301        const BATCH: usize = 999 / COLS; // 58
302
303        for chunk in memories.chunks(BATCH) {
304            let placeholders = multi_row_placeholders(COLS, chunk.len());
305            let sql = format!(
306                "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
307            );
308
309            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
310                Vec::with_capacity(chunk.len() * COLS);
311            for memory in chunk {
312                let tags_json = serde_json::to_string(&memory.tags)?;
313                let metadata_json = serde_json::to_string(&memory.metadata)?;
314                param_values.push(Box::new(memory.id.clone()));
315                param_values.push(Box::new(memory.content.clone()));
316                param_values.push(Box::new(memory.memory_type.to_string()));
317                param_values.push(Box::new(memory.importance));
318                param_values.push(Box::new(memory.confidence));
319                param_values.push(Box::new(memory.access_count as i64));
320                param_values.push(Box::new(memory.content_hash.clone()));
321                param_values.push(Box::new(tags_json));
322                param_values.push(Box::new(metadata_json));
323                param_values.push(Box::new(memory.namespace.clone()));
324                param_values.push(Box::new(memory.session_id.clone()));
325                param_values.push(Box::new(memory.repo.clone()));
326                param_values.push(Box::new(memory.git_ref.clone()));
327                param_values.push(Box::new(memory.expires_at.map(|dt| dt.timestamp())));
328                param_values.push(Box::new(memory.created_at.timestamp()));
329                param_values.push(Box::new(memory.updated_at.timestamp()));
330                param_values.push(Box::new(memory.last_accessed_at.timestamp()));
331            }
332
333            let refs: Vec<&dyn rusqlite::types::ToSql> =
334                param_values.iter().map(|p| p.as_ref()).collect();
335            tx.execute(&sql, refs.as_slice()).storage_err()?;
336        }
337
338        tx.commit().storage_err()?;
339        Ok(())
340    }
341
342    fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
343        if items.is_empty() {
344            return Ok(());
345        }
346        let conn = self.conn()?;
347        let tx = conn.unchecked_transaction().storage_err()?;
348
349        const COLS: usize = 2;
350        const BATCH: usize = 999 / COLS; // 499
351
352        for chunk in items.chunks(BATCH) {
353            let placeholders = multi_row_placeholders(COLS, chunk.len());
354            let sql = format!(
355                "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
356            );
357
358            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
359                Vec::with_capacity(chunk.len() * COLS);
360            for (id, embedding) in chunk {
361                let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
362                param_values.push(Box::new(id.to_string()));
363                param_values.push(Box::new(blob));
364            }
365
366            let refs: Vec<&dyn rusqlite::types::ToSql> =
367                param_values.iter().map(|p| p.as_ref()).collect();
368            tx.execute(&sql, refs.as_slice()).storage_err()?;
369        }
370
371        tx.commit().storage_err()?;
372        Ok(())
373    }
374
375    fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
376        let conn = self.conn()?;
377        let mut stmt = conn
378            .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
379            .storage_err()?;
380
381        let rows = stmt
382            .query_map(params![namespace], |row| {
383                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
384            })
385            .storage_err()?
386            .collect::<Result<Vec<_>, _>>()
387            .storage_err()?;
388
389        Ok(rows.into_iter().collect())
390    }
391
392    fn save_file_hashes(
393        &self,
394        namespace: &str,
395        hashes: &HashMap<String, String>,
396    ) -> Result<(), CodememError> {
397        let conn = self.conn()?;
398        let tx = conn.unchecked_transaction().storage_err()?;
399
400        tx.execute(
401            "DELETE FROM file_hashes WHERE namespace = ?1",
402            params![namespace],
403        )
404        .storage_err()?;
405
406        for (path, hash) in hashes {
407            tx.execute(
408                "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
409                params![namespace, path, hash],
410            )
411            .storage_err()?;
412        }
413
414        tx.commit().storage_err()?;
415        Ok(())
416    }
417
418    fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
419        if nodes.is_empty() {
420            return Ok(());
421        }
422        let conn = self.conn()?;
423        let tx = conn.unchecked_transaction().storage_err()?;
424
425        const COLS: usize = 7;
426        const BATCH: usize = 999 / COLS; // 142
427
428        for chunk in nodes.chunks(BATCH) {
429            let placeholders = multi_row_placeholders(COLS, chunk.len());
430            let sql = format!(
431                "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
432            );
433
434            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
435                Vec::with_capacity(chunk.len() * COLS);
436            for node in chunk {
437                let payload_json =
438                    serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
439                param_values.push(Box::new(node.id.clone()));
440                param_values.push(Box::new(node.kind.to_string()));
441                param_values.push(Box::new(node.label.clone()));
442                param_values.push(Box::new(payload_json));
443                param_values.push(Box::new(node.centrality));
444                param_values.push(Box::new(node.memory_id.clone()));
445                param_values.push(Box::new(node.namespace.clone()));
446            }
447
448            let refs: Vec<&dyn rusqlite::types::ToSql> =
449                param_values.iter().map(|p| p.as_ref()).collect();
450            tx.execute(&sql, refs.as_slice()).storage_err()?;
451        }
452
453        tx.commit().storage_err()?;
454        Ok(())
455    }
456
457    fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
458        if edges.is_empty() {
459            return Ok(());
460        }
461        let conn = self.conn()?;
462        let tx = conn.unchecked_transaction().storage_err()?;
463
464        const COLS: usize = 9;
465        const BATCH: usize = 999 / COLS; // 111
466
467        for chunk in edges.chunks(BATCH) {
468            let placeholders = multi_row_placeholders(COLS, chunk.len());
469            let sql = format!(
470                "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
471            );
472
473            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
474                Vec::with_capacity(chunk.len() * COLS);
475            for edge in chunk {
476                let props_json =
477                    serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
478                param_values.push(Box::new(edge.id.clone()));
479                param_values.push(Box::new(edge.src.clone()));
480                param_values.push(Box::new(edge.dst.clone()));
481                param_values.push(Box::new(edge.relationship.to_string()));
482                param_values.push(Box::new(edge.weight));
483                param_values.push(Box::new(props_json));
484                param_values.push(Box::new(edge.created_at.timestamp()));
485                param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
486                param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
487            }
488
489            let refs: Vec<&dyn rusqlite::types::ToSql> =
490                param_values.iter().map(|p| p.as_ref()).collect();
491            tx.execute(&sql, refs.as_slice()).storage_err()?;
492        }
493
494        tx.commit().storage_err()?;
495        Ok(())
496    }
497
498    fn get_stale_memories_for_decay(
499        &self,
500        threshold_ts: i64,
501    ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
502        let conn = self.conn()?;
503        let mut stmt = conn
504            .prepare(
505                "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
506            )
507            .storage_err()?;
508
509        let rows = stmt
510            .query_map(params![threshold_ts], |row| {
511                Ok((
512                    row.get::<_, String>(0)?,
513                    row.get::<_, f64>(1)?,
514                    row.get::<_, u32>(2)?,
515                    row.get::<_, i64>(3)?,
516                ))
517            })
518            .storage_err()?
519            .collect::<Result<Vec<_>, _>>()
520            .storage_err()?;
521
522        Ok(rows)
523    }
524
525    fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
526        if updates.is_empty() {
527            return Ok(0);
528        }
529        let conn = self.conn()?;
530        let tx = conn.unchecked_transaction().storage_err()?;
531
532        let mut count = 0usize;
533        for (id, importance) in updates {
534            let rows = tx
535                .execute(
536                    "UPDATE memories SET importance = ?1 WHERE id = ?2",
537                    params![importance, id],
538                )
539                .storage_err()?;
540            count += rows;
541        }
542
543        tx.commit().storage_err()?;
544        Ok(count)
545    }
546
547    fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
548        let conn = self.conn()?;
549        let count: i64 = if let Some(ns) = namespace {
550            conn.query_row(
551                "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
552                params![ns],
553                |row| row.get(0),
554            )
555            .storage_err()?
556        } else {
557            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
558                .storage_err()?
559        };
560        Ok(count as usize)
561    }
562
563    // ── Query Helpers ─────────────────────────────────────────────────
564
565    fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
566        let conn = self.conn()?;
567        let mut stmt = conn
568            .prepare(
569                "SELECT m.id, m.content FROM memories m
570                 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
571                 WHERE me.memory_id IS NULL",
572            )
573            .storage_err()?;
574
575        let rows = stmt
576            .query_map([], |row| {
577                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
578            })
579            .storage_err()?
580            .collect::<Result<Vec<_>, _>>()
581            .storage_err()?;
582
583        Ok(rows)
584    }
585
586    fn search_graph_nodes(
587        &self,
588        query: &str,
589        namespace: Option<&str>,
590        limit: usize,
591    ) -> Result<Vec<GraphNode>, CodememError> {
592        let conn = self.conn()?;
593        let escaped = query
594            .to_lowercase()
595            .replace('\\', "\\\\")
596            .replace('%', "\\%")
597            .replace('_', "\\_");
598        let pattern = format!("%{escaped}%");
599
600        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
601            if let Some(ns) = namespace {
602                (
603                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
604                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
605                 ORDER BY centrality DESC LIMIT ?3"
606                        .to_string(),
607                    vec![
608                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
609                        Box::new(ns.to_string()),
610                        Box::new(limit as i64),
611                    ],
612                )
613            } else {
614                (
615                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
616                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
617                 ORDER BY centrality DESC LIMIT ?2"
618                        .to_string(),
619                    vec![
620                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
621                        Box::new(limit as i64),
622                    ],
623                )
624            };
625
626        let refs: Vec<&dyn rusqlite::types::ToSql> =
627            params_vec.iter().map(|p| p.as_ref()).collect();
628        let mut stmt = conn.prepare(&sql).storage_err()?;
629
630        let rows = stmt
631            .query_map(refs.as_slice(), |row| {
632                let kind_str: String = row.get(1)?;
633                let payload_str: String = row.get(3)?;
634                Ok(GraphNode {
635                    id: row.get(0)?,
636                    kind: kind_str.parse().unwrap_or(NodeKind::Memory),
637                    label: row.get(2)?,
638                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
639                    centrality: row.get(4)?,
640                    memory_id: row.get(5)?,
641                    namespace: row.get(6)?,
642                })
643            })
644            .storage_err()?
645            .collect::<Result<Vec<_>, _>>()
646            .storage_err()?;
647
648        Ok(rows)
649    }
650
651    fn list_memories_by_tag(
652        &self,
653        tag: &str,
654        namespace: Option<&str>,
655        limit: usize,
656    ) -> Result<Vec<MemoryNode>, CodememError> {
657        Storage::list_memories_by_tag(self, tag, namespace, limit)
658    }
659
660    fn list_memories_filtered(
661        &self,
662        namespace: Option<&str>,
663        memory_type: Option<&str>,
664    ) -> Result<Vec<MemoryNode>, CodememError> {
665        let conn = self.conn()?;
666        let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
667                        content_hash, tags, metadata, namespace, session_id, repo, git_ref, expires_at, created_at, updated_at, \
668                        last_accessed_at FROM memories WHERE (expires_at IS NULL OR expires_at > ?1)"
669            .to_string();
670        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
671        // ?1 is the expiry timestamp
672        param_values.push(Box::new(chrono::Utc::now().timestamp()));
673
674        if let Some(ns) = namespace {
675            param_values.push(Box::new(ns.to_string()));
676            sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
677        }
678        if let Some(mt) = memory_type {
679            param_values.push(Box::new(mt.to_string()));
680            sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
681        }
682        sql.push_str(" ORDER BY created_at DESC LIMIT 10000");
683
684        let refs: Vec<&dyn rusqlite::types::ToSql> =
685            param_values.iter().map(|p| p.as_ref()).collect();
686        let mut stmt = conn.prepare(&sql).storage_err()?;
687
688        let rows = stmt
689            .query_map(refs.as_slice(), MemoryRow::from_row)
690            .storage_err()?;
691
692        let mut result = Vec::new();
693        for row in rows {
694            let mr = row.storage_err()?;
695            result.push(mr.into_memory_node()?);
696        }
697
698        Ok(result)
699    }
700
701    // ── Session Activity (delegated) ──────────────────────────────────
702
703    delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
704    delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
705    delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
706    delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
707    delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
708    delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
709    delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
710
711    // ── Stats (delegated) ─────────────────────────────────────────────
712
713    delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
714
715    // ── Transaction Control ──────────────────────────────────────────
716
717    fn begin_transaction(&self) -> Result<(), CodememError> {
718        let conn = self.conn()?;
719        conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
720        self.in_transaction
721            .store(true, std::sync::atomic::Ordering::Release);
722        Ok(())
723    }
724
725    fn commit_transaction(&self) -> Result<(), CodememError> {
726        let conn = self.conn()?;
727        conn.execute_batch("COMMIT").storage_err()?;
728        // Clear flag after COMMIT succeeds — if COMMIT fails, the flag
729        // stays set so callers know a transaction is still active.
730        self.in_transaction
731            .store(false, std::sync::atomic::Ordering::Release);
732        Ok(())
733    }
734
735    fn rollback_transaction(&self) -> Result<(), CodememError> {
736        let conn = self.conn()?;
737        conn.execute_batch("ROLLBACK").storage_err()?;
738        // Clear flag after ROLLBACK succeeds — mirrors commit_transaction's
739        // pattern. If ROLLBACK fails, the flag stays set so callers know a
740        // transaction is still active.
741        self.in_transaction
742            .store(false, std::sync::atomic::Ordering::Release);
743        Ok(())
744    }
745
746    // ── Repository Management ────────────────────────────────────────
747
748    fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
749        Storage::list_repos(self)
750    }
751
752    fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
753        Storage::add_repo(self, repo)
754    }
755
756    fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
757        Storage::get_repo(self, id)
758    }
759
760    fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
761        Storage::remove_repo(self, id)
762    }
763
764    fn update_repo_status(
765        &self,
766        id: &str,
767        status: &str,
768        indexed_at: Option<&str>,
769    ) -> Result<(), CodememError> {
770        Storage::update_repo_status(self, id, status, indexed_at)
771    }
772
773    // ── Cross-Repo Persistence ────────────────────────────────────────
774
775    fn graph_edges_for_namespace_with_cross(
776        &self,
777        namespace: &str,
778        include_cross_namespace: bool,
779    ) -> Result<Vec<Edge>, CodememError> {
780        Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
781    }
782
783    fn upsert_package_registry(
784        &self,
785        package_name: &str,
786        namespace: &str,
787        version: &str,
788        manifest: &str,
789    ) -> Result<(), CodememError> {
790        Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
791    }
792
793    fn store_unresolved_ref(
794        &self,
795        source_qualified_name: &str,
796        target_name: &str,
797        source_namespace: &str,
798        file_path: &str,
799        line: usize,
800        ref_kind: &str,
801        package_hint: Option<&str>,
802    ) -> Result<(), CodememError> {
803        use crate::cross_repo::UnresolvedRefEntry;
804        let entry = UnresolvedRefEntry {
805            id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
806            namespace: source_namespace.to_string(),
807            source_node: source_qualified_name.to_string(),
808            target_name: target_name.to_string(),
809            package_hint: package_hint.map(|s| s.to_string()),
810            ref_kind: ref_kind.to_string(),
811            file_path: Some(file_path.to_string()),
812            line: Some(line as i64),
813            created_at: chrono::Utc::now().timestamp(),
814        };
815        Storage::insert_unresolved_ref(self, &entry)
816    }
817
818    fn store_unresolved_refs_batch(
819        &self,
820        refs: &[codemem_core::UnresolvedRefData],
821    ) -> Result<usize, CodememError> {
822        use crate::cross_repo::UnresolvedRefEntry;
823        let now = chrono::Utc::now().timestamp();
824        let entries: Vec<UnresolvedRefEntry> = refs
825            .iter()
826            .map(|r| UnresolvedRefEntry {
827                id: format!(
828                    "uref:{}:{}:{}",
829                    r.namespace, r.source_qualified_name, r.target_name
830                ),
831                namespace: r.namespace.clone(),
832                source_node: r.source_qualified_name.clone(),
833                target_name: r.target_name.clone(),
834                package_hint: r.package_hint.clone(),
835                ref_kind: r.ref_kind.clone(),
836                file_path: Some(r.file_path.clone()),
837                line: Some(r.line as i64),
838                created_at: now,
839            })
840            .collect();
841        let count = entries.len();
842        Storage::insert_unresolved_refs_batch(self, &entries)?;
843        Ok(count)
844    }
845
846    fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
847        let conn = self.conn()?;
848        let mut stmt = conn
849            .prepare("SELECT package_name, namespace, manifest FROM package_registry")
850            .map_err(|e| CodememError::Storage(e.to_string()))?;
851        let rows = stmt
852            .query_map([], |row| {
853                Ok((
854                    row.get::<_, String>(0)?,
855                    row.get::<_, String>(1)?,
856                    row.get::<_, String>(2)?,
857                ))
858            })
859            .map_err(|e| CodememError::Storage(e.to_string()))?
860            .collect::<Result<Vec<_>, _>>()
861            .map_err(|e| CodememError::Storage(e.to_string()))?;
862        Ok(rows)
863    }
864
865    fn list_pending_unresolved_refs(
866        &self,
867    ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
868        let conn = self.conn()?;
869        let mut stmt = conn
870            .prepare(
871                "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
872                 FROM unresolved_refs",
873            )
874            .map_err(|e| CodememError::Storage(e.to_string()))?;
875        let rows = stmt
876            .query_map([], |row| {
877                Ok(codemem_core::PendingUnresolvedRef {
878                    id: row.get::<_, String>(0)?,
879                    source_node: row.get::<_, String>(1)?,
880                    target_name: row.get::<_, String>(2)?,
881                    namespace: row.get::<_, String>(3)?,
882                    file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
883                    line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
884                    ref_kind: row.get::<_, String>(6)?,
885                    package_hint: row.get::<_, Option<String>>(7)?,
886                })
887            })
888            .map_err(|e| CodememError::Storage(e.to_string()))?
889            .collect::<Result<Vec<_>, _>>()
890            .map_err(|e| CodememError::Storage(e.to_string()))?;
891        Ok(rows)
892    }
893
894    fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
895        Storage::delete_unresolved_ref(self, id)
896    }
897
898    fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
899        let conn = self.conn()?;
900        let count: i64 = conn
901            .query_row(
902                "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
903                rusqlite::params![namespace],
904                |row| row.get(0),
905            )
906            .map_err(|e| CodememError::Storage(e.to_string()))?;
907        Ok(count as usize)
908    }
909
910    fn list_registered_packages_for_namespace(
911        &self,
912        namespace: &str,
913    ) -> Result<Vec<(String, String, String)>, CodememError> {
914        let entries = Storage::get_packages_for_namespace(self, namespace)?;
915        Ok(entries
916            .into_iter()
917            .map(|e| (e.package_name, e.namespace, e.manifest))
918            .collect())
919    }
920
921    fn store_api_endpoint(
922        &self,
923        method: &str,
924        path: &str,
925        handler_symbol: &str,
926        namespace: &str,
927    ) -> Result<(), CodememError> {
928        use crate::cross_repo::ApiEndpointEntry;
929        let entry = ApiEndpointEntry {
930            id: format!("ep:{}:{}:{}", namespace, method, path),
931            namespace: namespace.to_string(),
932            method: Some(method.to_string()),
933            path: path.to_string(),
934            handler: Some(handler_symbol.to_string()),
935            schema: "{}".to_string(),
936        };
937        Storage::upsert_api_endpoint(self, &entry)
938    }
939
940    fn store_api_client_call(
941        &self,
942        library: &str,
943        method: Option<&str>,
944        caller_symbol: &str,
945        namespace: &str,
946    ) -> Result<(), CodememError> {
947        let id = format!("client:{caller_symbol}:{library}");
948        Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
949    }
950
951    fn list_api_endpoints(
952        &self,
953        namespace: &str,
954    ) -> Result<Vec<(String, String, String, String)>, CodememError> {
955        let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
956        Ok(entries
957            .into_iter()
958            .map(|e| {
959                (
960                    e.method.unwrap_or_default(),
961                    e.path,
962                    e.handler.unwrap_or_default(),
963                    e.namespace,
964                )
965            })
966            .collect())
967    }
968
969    fn store_event_channel(
970        &self,
971        channel: &str,
972        direction: &str,
973        protocol: &str,
974        handler: &str,
975        namespace: &str,
976        description: &str,
977    ) -> Result<(), CodememError> {
978        use crate::cross_repo::EventChannelEntry;
979        let entry = EventChannelEntry {
980            id: format!("ec:{namespace}:{direction}:{channel}"),
981            namespace: namespace.to_string(),
982            channel: channel.to_string(),
983            direction: direction.to_string(),
984            protocol: protocol.to_string(),
985            message_schema: "{}".to_string(),
986            description: description.to_string(),
987            handler: handler.to_string(),
988            spec_file: String::new(),
989        };
990        Storage::upsert_event_channel(self, &entry)
991    }
992
993    fn list_event_channels(
994        &self,
995        namespace: &str,
996    ) -> Result<Vec<(String, String, String, String, String)>, CodememError> {
997        let entries = Storage::get_event_channels_for_namespace(self, namespace)?;
998        Ok(entries
999            .into_iter()
1000            .map(|e| (e.channel, e.direction, e.protocol, e.handler, e.description))
1001            .collect())
1002    }
1003
1004    fn list_all_event_channels(
1005        &self,
1006    ) -> Result<Vec<(String, String, String, String, String, String)>, CodememError> {
1007        let entries = Storage::get_all_event_channels(self)?;
1008        Ok(entries
1009            .into_iter()
1010            .map(|e| {
1011                (
1012                    e.channel,
1013                    e.direction,
1014                    e.protocol,
1015                    e.handler,
1016                    e.namespace,
1017                    e.description,
1018                )
1019            })
1020            .collect())
1021    }
1022}
1023
1024#[cfg(test)]
1025#[path = "tests/backend_tests.rs"]
1026mod tests;