Skip to main content

codemem_storage/
backend.rs

1//! `StorageBackend` trait implementation for Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5    CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6    Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11/// Generate SQL placeholders for multi-row INSERT: "(?1,?2,...),(?3,?4,...)"
12fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13    let mut s = String::new();
14    for r in 0..rows {
15        if r > 0 {
16            s.push(',');
17        }
18        s.push('(');
19        for c in 0..cols {
20            if c > 0 {
21                s.push(',');
22            }
23            s.push('?');
24            s.push_str(&(r * cols + c + 1).to_string());
25        }
26        s.push(')');
27    }
28    s
29}
30
31/// Macro to delegate pure-forwarding trait methods to `Storage` inherent methods.
32macro_rules! delegate_storage {
33    // &self, no args
34    ($method:ident(&self) -> $ret:ty) => {
35        fn $method(&self) -> $ret {
36            Storage::$method(self)
37        }
38    };
39    // &self, one arg
40    ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41        fn $method(&self, $a1: $t1) -> $ret {
42            Storage::$method(self, $a1)
43        }
44    };
45    // &self, two args
46    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47        fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48            Storage::$method(self, $a1, $a2)
49        }
50    };
51    // &self, three args
52    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54            Storage::$method(self, $a1, $a2, $a3)
55        }
56    };
57    // &self, five args
58    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60            Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61        }
62    };
63}
64
65impl StorageBackend for Storage {
66    // ── Memory CRUD (delegated) ───────────────────────────────────────
67
68    delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69    delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70    delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71    delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72    delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74    /// M1: Override with transactional cascade delete.
75    fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76        // Delegates to Storage::delete_memory_cascade which wraps all
77        // deletes (memory + graph nodes/edges + embedding) in a single transaction.
78        Storage::delete_memory_cascade(self, id)
79    }
80
81    /// Override with transactional batch cascade delete.
82    fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83        Storage::delete_memories_batch_cascade(self, ids)
84    }
85
86    delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
87    delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
88    delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
89    delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
90    delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
91
92    fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
93        if ids.is_empty() {
94            return Ok(Vec::new());
95        }
96        let conn = self.conn()?;
97
98        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
99        let sql = format!(
100            "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
101            placeholders.join(",")
102        );
103
104        let mut stmt = conn.prepare(&sql).storage_err()?;
105
106        let params: Vec<&dyn rusqlite::types::ToSql> = ids
107            .iter()
108            .map(|id| id as &dyn rusqlite::types::ToSql)
109            .collect();
110
111        let rows = stmt
112            .query_map(params.as_slice(), MemoryRow::from_row)
113            .storage_err()?;
114
115        let mut memories = Vec::new();
116        for row in rows {
117            let row = row.storage_err()?;
118            memories.push(row.into_memory_node()?);
119        }
120        Ok(memories)
121    }
122
123    // ── Embedding Persistence (delegated where possible) ──────────────
124
125    delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
126    delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
127
128    fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
129        let conn = self.conn()?;
130        let deleted = conn
131            .execute(
132                "DELETE FROM memory_embeddings WHERE memory_id = ?1",
133                [memory_id],
134            )
135            .storage_err()?;
136        Ok(deleted > 0)
137    }
138
139    fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
140        let conn = self.conn()?;
141        let mut stmt = conn
142            .prepare("SELECT memory_id, embedding FROM memory_embeddings")
143            .storage_err()?;
144        let rows = stmt
145            .query_map([], |row| {
146                let id: String = row.get(0)?;
147                let blob: Vec<u8> = row.get(1)?;
148                Ok((id, blob))
149            })
150            .storage_err()?;
151        let mut result = Vec::new();
152        for row in rows {
153            let (id, blob) = row.storage_err()?;
154            let floats: Vec<f32> = blob
155                .chunks_exact(4)
156                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
157                .collect();
158            result.push((id, floats));
159        }
160        Ok(result)
161    }
162
163    // ── Graph Node/Edge Persistence (delegated) ───────────────────────
164
165    delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
166    delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
167    delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
168    delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
169    delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
170    delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
171    delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
172    delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
173    delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
174
175    // ── Sessions (delegated where possible) ───────────────────────────
176
177    delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
178    delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
179
180    fn list_sessions(
181        &self,
182        namespace: Option<&str>,
183        limit: usize,
184    ) -> Result<Vec<Session>, CodememError> {
185        self.list_sessions_with_limit(namespace, limit)
186    }
187
188    // ── Consolidation (delegated) ─────────────────────────────────────
189
190    delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
191    delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
192
193    // ── Pattern Detection (delegated) ─────────────────────────────────
194
195    delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
196    delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
197    delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
198    delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199
200    // ── Bulk Operations ───────────────────────────────────────────────
201
202    fn decay_stale_memories(
203        &self,
204        threshold_ts: i64,
205        decay_factor: f64,
206    ) -> Result<usize, CodememError> {
207        let conn = self.conn()?;
208        let rows = conn
209            .execute(
210                "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
211                params![decay_factor, threshold_ts],
212            )
213            .storage_err()?;
214        Ok(rows)
215    }
216
217    fn list_memories_for_creative(
218        &self,
219    ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
220        let conn = self.conn()?;
221        let mut stmt = conn
222            .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
223            .storage_err()?;
224
225        let rows = stmt
226            .query_map([], |row| {
227                Ok((
228                    row.get::<_, String>(0)?,
229                    row.get::<_, String>(1)?,
230                    row.get::<_, String>(2)?,
231                ))
232            })
233            .storage_err()?
234            .collect::<Result<Vec<_>, _>>()
235            .storage_err()?;
236
237        Ok(rows
238            .into_iter()
239            .map(|(id, mtype, tags_json)| {
240                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
241                (id, mtype, tags)
242            })
243            .collect())
244    }
245
246    fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
247        let conn = self.conn()?;
248        let mut stmt = conn
249            .prepare(
250                "SELECT a.id, b.id, 1.0 as similarity
251                 FROM memories a
252                 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
253                 WHERE a.id < b.id",
254            )
255            .storage_err()?;
256
257        let rows = stmt
258            .query_map([], |row| {
259                Ok((
260                    row.get::<_, String>(0)?,
261                    row.get::<_, String>(1)?,
262                    row.get::<_, f64>(2)?,
263                ))
264            })
265            .storage_err()?
266            .collect::<Result<Vec<_>, _>>()
267            .storage_err()?;
268
269        Ok(rows)
270    }
271
272    fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
273        let conn = self.conn()?;
274        let mut stmt = conn
275            .prepare(
276                "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
277            )
278            .storage_err()?;
279
280        let ids = stmt
281            .query_map(params![importance_threshold], |row| row.get(0))
282            .storage_err()?
283            .collect::<Result<Vec<String>, _>>()
284            .storage_err()?;
285
286        Ok(ids)
287    }
288
289    // ── Batch Operations ──────────────────────────────────────────────
290
291    fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
292        if memories.is_empty() {
293            return Ok(());
294        }
295        let conn = self.conn()?;
296        let tx = conn.unchecked_transaction().storage_err()?;
297
298        const COLS: usize = 14;
299        const BATCH: usize = 999 / COLS; // 71
300
301        for chunk in memories.chunks(BATCH) {
302            let placeholders = multi_row_placeholders(COLS, chunk.len());
303            let sql = format!(
304                "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
305            );
306
307            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
308                Vec::with_capacity(chunk.len() * COLS);
309            for memory in chunk {
310                let tags_json = serde_json::to_string(&memory.tags)?;
311                let metadata_json = serde_json::to_string(&memory.metadata)?;
312                param_values.push(Box::new(memory.id.clone()));
313                param_values.push(Box::new(memory.content.clone()));
314                param_values.push(Box::new(memory.memory_type.to_string()));
315                param_values.push(Box::new(memory.importance));
316                param_values.push(Box::new(memory.confidence));
317                param_values.push(Box::new(memory.access_count as i64));
318                param_values.push(Box::new(memory.content_hash.clone()));
319                param_values.push(Box::new(tags_json));
320                param_values.push(Box::new(metadata_json));
321                param_values.push(Box::new(memory.namespace.clone()));
322                param_values.push(Box::new(memory.session_id.clone()));
323                param_values.push(Box::new(memory.created_at.timestamp()));
324                param_values.push(Box::new(memory.updated_at.timestamp()));
325                param_values.push(Box::new(memory.last_accessed_at.timestamp()));
326            }
327
328            let refs: Vec<&dyn rusqlite::types::ToSql> =
329                param_values.iter().map(|p| p.as_ref()).collect();
330            tx.execute(&sql, refs.as_slice()).storage_err()?;
331        }
332
333        tx.commit().storage_err()?;
334        Ok(())
335    }
336
337    fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
338        if items.is_empty() {
339            return Ok(());
340        }
341        let conn = self.conn()?;
342        let tx = conn.unchecked_transaction().storage_err()?;
343
344        const COLS: usize = 2;
345        const BATCH: usize = 999 / COLS; // 499
346
347        for chunk in items.chunks(BATCH) {
348            let placeholders = multi_row_placeholders(COLS, chunk.len());
349            let sql = format!(
350                "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
351            );
352
353            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
354                Vec::with_capacity(chunk.len() * COLS);
355            for (id, embedding) in chunk {
356                let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
357                param_values.push(Box::new(id.to_string()));
358                param_values.push(Box::new(blob));
359            }
360
361            let refs: Vec<&dyn rusqlite::types::ToSql> =
362                param_values.iter().map(|p| p.as_ref()).collect();
363            tx.execute(&sql, refs.as_slice()).storage_err()?;
364        }
365
366        tx.commit().storage_err()?;
367        Ok(())
368    }
369
370    fn load_file_hashes(&self) -> Result<HashMap<String, String>, CodememError> {
371        let conn = self.conn()?;
372        let mut stmt = conn
373            .prepare("SELECT file_path, content_hash FROM file_hashes")
374            .storage_err()?;
375
376        let rows = stmt
377            .query_map([], |row| {
378                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
379            })
380            .storage_err()?
381            .collect::<Result<Vec<_>, _>>()
382            .storage_err()?;
383
384        Ok(rows.into_iter().collect())
385    }
386
387    fn save_file_hashes(&self, hashes: &HashMap<String, String>) -> Result<(), CodememError> {
388        let conn = self.conn()?;
389        let tx = conn.unchecked_transaction().storage_err()?;
390
391        tx.execute("DELETE FROM file_hashes", []).storage_err()?;
392
393        for (path, hash) in hashes {
394            tx.execute(
395                "INSERT INTO file_hashes (file_path, content_hash) VALUES (?1, ?2)",
396                params![path, hash],
397            )
398            .storage_err()?;
399        }
400
401        tx.commit().storage_err()?;
402        Ok(())
403    }
404
405    fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
406        if nodes.is_empty() {
407            return Ok(());
408        }
409        let conn = self.conn()?;
410        let tx = conn.unchecked_transaction().storage_err()?;
411
412        const COLS: usize = 7;
413        const BATCH: usize = 999 / COLS; // 142
414
415        for chunk in nodes.chunks(BATCH) {
416            let placeholders = multi_row_placeholders(COLS, chunk.len());
417            let sql = format!(
418                "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
419            );
420
421            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
422                Vec::with_capacity(chunk.len() * COLS);
423            for node in chunk {
424                let payload_json =
425                    serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
426                param_values.push(Box::new(node.id.clone()));
427                param_values.push(Box::new(node.kind.to_string()));
428                param_values.push(Box::new(node.label.clone()));
429                param_values.push(Box::new(payload_json));
430                param_values.push(Box::new(node.centrality));
431                param_values.push(Box::new(node.memory_id.clone()));
432                param_values.push(Box::new(node.namespace.clone()));
433            }
434
435            let refs: Vec<&dyn rusqlite::types::ToSql> =
436                param_values.iter().map(|p| p.as_ref()).collect();
437            tx.execute(&sql, refs.as_slice()).storage_err()?;
438        }
439
440        tx.commit().storage_err()?;
441        Ok(())
442    }
443
444    fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
445        if edges.is_empty() {
446            return Ok(());
447        }
448        let conn = self.conn()?;
449        let tx = conn.unchecked_transaction().storage_err()?;
450
451        const COLS: usize = 9;
452        const BATCH: usize = 999 / COLS; // 111
453
454        for chunk in edges.chunks(BATCH) {
455            let placeholders = multi_row_placeholders(COLS, chunk.len());
456            let sql = format!(
457                "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
458            );
459
460            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
461                Vec::with_capacity(chunk.len() * COLS);
462            for edge in chunk {
463                let props_json =
464                    serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
465                param_values.push(Box::new(edge.id.clone()));
466                param_values.push(Box::new(edge.src.clone()));
467                param_values.push(Box::new(edge.dst.clone()));
468                param_values.push(Box::new(edge.relationship.to_string()));
469                param_values.push(Box::new(edge.weight));
470                param_values.push(Box::new(props_json));
471                param_values.push(Box::new(edge.created_at.timestamp()));
472                param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
473                param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
474            }
475
476            let refs: Vec<&dyn rusqlite::types::ToSql> =
477                param_values.iter().map(|p| p.as_ref()).collect();
478            tx.execute(&sql, refs.as_slice()).storage_err()?;
479        }
480
481        tx.commit().storage_err()?;
482        Ok(())
483    }
484
485    fn get_stale_memories_for_decay(
486        &self,
487        threshold_ts: i64,
488    ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
489        let conn = self.conn()?;
490        let mut stmt = conn
491            .prepare(
492                "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
493            )
494            .storage_err()?;
495
496        let rows = stmt
497            .query_map(params![threshold_ts], |row| {
498                Ok((
499                    row.get::<_, String>(0)?,
500                    row.get::<_, f64>(1)?,
501                    row.get::<_, u32>(2)?,
502                    row.get::<_, i64>(3)?,
503                ))
504            })
505            .storage_err()?
506            .collect::<Result<Vec<_>, _>>()
507            .storage_err()?;
508
509        Ok(rows)
510    }
511
512    fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
513        if updates.is_empty() {
514            return Ok(0);
515        }
516        let conn = self.conn()?;
517        let tx = conn.unchecked_transaction().storage_err()?;
518
519        let mut count = 0usize;
520        for (id, importance) in updates {
521            let rows = tx
522                .execute(
523                    "UPDATE memories SET importance = ?1 WHERE id = ?2",
524                    params![importance, id],
525                )
526                .storage_err()?;
527            count += rows;
528        }
529
530        tx.commit().storage_err()?;
531        Ok(count)
532    }
533
534    fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
535        let conn = self.conn()?;
536        let count: i64 = if let Some(ns) = namespace {
537            conn.query_row(
538                "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
539                params![ns],
540                |row| row.get(0),
541            )
542            .storage_err()?
543        } else {
544            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
545                .storage_err()?
546        };
547        Ok(count as usize)
548    }
549
550    // ── Query Helpers ─────────────────────────────────────────────────
551
552    fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
553        let conn = self.conn()?;
554        let mut stmt = conn
555            .prepare(
556                "SELECT m.id, m.content FROM memories m
557                 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
558                 WHERE me.memory_id IS NULL",
559            )
560            .storage_err()?;
561
562        let rows = stmt
563            .query_map([], |row| {
564                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
565            })
566            .storage_err()?
567            .collect::<Result<Vec<_>, _>>()
568            .storage_err()?;
569
570        Ok(rows)
571    }
572
573    fn search_graph_nodes(
574        &self,
575        query: &str,
576        namespace: Option<&str>,
577        limit: usize,
578    ) -> Result<Vec<GraphNode>, CodememError> {
579        let conn = self.conn()?;
580        let escaped = query
581            .to_lowercase()
582            .replace('\\', "\\\\")
583            .replace('%', "\\%")
584            .replace('_', "\\_");
585        let pattern = format!("%{escaped}%");
586
587        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
588            if let Some(ns) = namespace {
589                (
590                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
591                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
592                 ORDER BY centrality DESC LIMIT ?3"
593                        .to_string(),
594                    vec![
595                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
596                        Box::new(ns.to_string()),
597                        Box::new(limit as i64),
598                    ],
599                )
600            } else {
601                (
602                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
603                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
604                 ORDER BY centrality DESC LIMIT ?2"
605                        .to_string(),
606                    vec![
607                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
608                        Box::new(limit as i64),
609                    ],
610                )
611            };
612
613        let refs: Vec<&dyn rusqlite::types::ToSql> =
614            params_vec.iter().map(|p| p.as_ref()).collect();
615        let mut stmt = conn.prepare(&sql).storage_err()?;
616
617        let rows = stmt
618            .query_map(refs.as_slice(), |row| {
619                let kind_str: String = row.get(1)?;
620                let payload_str: String = row.get(3)?;
621                Ok(GraphNode {
622                    id: row.get(0)?,
623                    kind: kind_str.parse().unwrap_or(NodeKind::Memory),
624                    label: row.get(2)?,
625                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
626                    centrality: row.get(4)?,
627                    memory_id: row.get(5)?,
628                    namespace: row.get(6)?,
629                })
630            })
631            .storage_err()?
632            .collect::<Result<Vec<_>, _>>()
633            .storage_err()?;
634
635        Ok(rows)
636    }
637
638    fn list_memories_by_tag(
639        &self,
640        tag: &str,
641        namespace: Option<&str>,
642        limit: usize,
643    ) -> Result<Vec<MemoryNode>, CodememError> {
644        Storage::list_memories_by_tag(self, tag, namespace, limit)
645    }
646
647    fn list_memories_filtered(
648        &self,
649        namespace: Option<&str>,
650        memory_type: Option<&str>,
651    ) -> Result<Vec<MemoryNode>, CodememError> {
652        let conn = self.conn()?;
653        let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
654                        content_hash, tags, metadata, namespace, session_id, created_at, updated_at, \
655                        last_accessed_at FROM memories WHERE 1=1"
656            .to_string();
657        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
658
659        if let Some(ns) = namespace {
660            param_values.push(Box::new(ns.to_string()));
661            sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
662        }
663        if let Some(mt) = memory_type {
664            param_values.push(Box::new(mt.to_string()));
665            sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
666        }
667        sql.push_str(" ORDER BY created_at DESC");
668
669        let refs: Vec<&dyn rusqlite::types::ToSql> =
670            param_values.iter().map(|p| p.as_ref()).collect();
671        let mut stmt = conn.prepare(&sql).storage_err()?;
672
673        let rows = stmt
674            .query_map(refs.as_slice(), MemoryRow::from_row)
675            .storage_err()?;
676
677        let mut result = Vec::new();
678        for row in rows {
679            let mr = row.storage_err()?;
680            result.push(mr.into_memory_node()?);
681        }
682
683        Ok(result)
684    }
685
686    // ── Session Activity (delegated) ──────────────────────────────────
687
688    delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
689    delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
690    delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
691    delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
692    delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
693    delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
694    delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
695
696    // ── Stats (delegated) ─────────────────────────────────────────────
697
698    delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
699
700    // ── Transaction Control ──────────────────────────────────────────
701
702    fn begin_transaction(&self) -> Result<(), CodememError> {
703        let conn = self.conn()?;
704        conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
705        self.in_transaction
706            .store(true, std::sync::atomic::Ordering::Release);
707        Ok(())
708    }
709
710    fn commit_transaction(&self) -> Result<(), CodememError> {
711        let conn = self.conn()?;
712        conn.execute_batch("COMMIT").storage_err()?;
713        // Clear flag after COMMIT succeeds — if COMMIT fails, the flag
714        // stays set so callers know a transaction is still active.
715        self.in_transaction
716            .store(false, std::sync::atomic::Ordering::Release);
717        Ok(())
718    }
719
720    fn rollback_transaction(&self) -> Result<(), CodememError> {
721        self.in_transaction
722            .store(false, std::sync::atomic::Ordering::Release);
723        let conn = self.conn()?;
724        conn.execute_batch("ROLLBACK").storage_err()?;
725        Ok(())
726    }
727
728    // ── Repository Management ────────────────────────────────────────
729
730    fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
731        Storage::list_repos(self)
732    }
733
734    fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
735        Storage::add_repo(self, repo)
736    }
737
738    fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
739        Storage::get_repo(self, id)
740    }
741
742    fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
743        Storage::remove_repo(self, id)
744    }
745
746    fn update_repo_status(
747        &self,
748        id: &str,
749        status: &str,
750        indexed_at: Option<&str>,
751    ) -> Result<(), CodememError> {
752        Storage::update_repo_status(self, id, status, indexed_at)
753    }
754}
755
756#[cfg(test)]
757#[path = "tests/backend_tests.rs"]
758mod tests;