Skip to main content

codemem_storage/
backend.rs

1//! `StorageBackend` trait implementation for Storage.
2
3use crate::{MemoryRow, Storage};
4use codemem_core::{
5    CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Session,
6    StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11/// Generate SQL placeholders for multi-row INSERT: "(?1,?2,...),(?3,?4,...)"
12fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13    let mut s = String::new();
14    for r in 0..rows {
15        if r > 0 {
16            s.push(',');
17        }
18        s.push('(');
19        for c in 0..cols {
20            if c > 0 {
21                s.push(',');
22            }
23            s.push('?');
24            s.push_str(&(r * cols + c + 1).to_string());
25        }
26        s.push(')');
27    }
28    s
29}
30
31/// Macro to delegate pure-forwarding trait methods to `Storage` inherent methods.
32macro_rules! delegate_storage {
33    // &self, no args
34    ($method:ident(&self) -> $ret:ty) => {
35        fn $method(&self) -> $ret {
36            Storage::$method(self)
37        }
38    };
39    // &self, one arg
40    ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41        fn $method(&self, $a1: $t1) -> $ret {
42            Storage::$method(self, $a1)
43        }
44    };
45    // &self, two args
46    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47        fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48            Storage::$method(self, $a1, $a2)
49        }
50    };
51    // &self, three args
52    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54            Storage::$method(self, $a1, $a2, $a3)
55        }
56    };
57    // &self, five args
58    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60            Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61        }
62    };
63}
64
65impl StorageBackend for Storage {
66    // ── Memory CRUD (delegated) ───────────────────────────────────────
67
68    delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69    delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70    delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71    delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72    delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74    /// M1: Override with transactional cascade delete.
75    fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76        // Delegates to Storage::delete_memory_cascade which wraps all
77        // deletes (memory + graph nodes/edges + embedding) in a single transaction.
78        Storage::delete_memory_cascade(self, id)
79    }
80
81    /// Override with transactional batch cascade delete.
82    fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83        Storage::delete_memories_batch_cascade(self, ids)
84    }
85
86    delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
87    delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
88    delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
89    delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
90    delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
91
92    fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
93        if ids.is_empty() {
94            return Ok(Vec::new());
95        }
96        let conn = self.conn()?;
97
98        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
99        let sql = format!(
100            "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
101            placeholders.join(",")
102        );
103
104        let mut stmt = conn
105            .prepare(&sql)
106            .map_err(|e| CodememError::Storage(e.to_string()))?;
107
108        let params: Vec<&dyn rusqlite::types::ToSql> = ids
109            .iter()
110            .map(|id| id as &dyn rusqlite::types::ToSql)
111            .collect();
112
113        let rows = stmt
114            .query_map(params.as_slice(), |row| {
115                Ok(MemoryRow {
116                    id: row.get(0)?,
117                    content: row.get(1)?,
118                    memory_type: row.get(2)?,
119                    importance: row.get(3)?,
120                    confidence: row.get(4)?,
121                    access_count: row.get(5)?,
122                    content_hash: row.get(6)?,
123                    tags: row.get(7)?,
124                    metadata: row.get(8)?,
125                    namespace: row.get(9)?,
126                    session_id: row.get(10)?,
127                    created_at: row.get(11)?,
128                    updated_at: row.get(12)?,
129                    last_accessed_at: row.get(13)?,
130                })
131            })
132            .map_err(|e| CodememError::Storage(e.to_string()))?;
133
134        let mut memories = Vec::new();
135        for row in rows {
136            let row = row.map_err(|e| CodememError::Storage(e.to_string()))?;
137            memories.push(row.into_memory_node()?);
138        }
139        Ok(memories)
140    }
141
142    // ── Embedding Persistence (delegated where possible) ──────────────
143
144    delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
145    delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
146
147    fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
148        let conn = self.conn()?;
149        let deleted = conn
150            .execute(
151                "DELETE FROM memory_embeddings WHERE memory_id = ?1",
152                [memory_id],
153            )
154            .map_err(|e| CodememError::Storage(e.to_string()))?;
155        Ok(deleted > 0)
156    }
157
158    fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
159        let conn = self.conn()?;
160        let mut stmt = conn
161            .prepare("SELECT memory_id, embedding FROM memory_embeddings")
162            .map_err(|e| CodememError::Storage(e.to_string()))?;
163        let rows = stmt
164            .query_map([], |row| {
165                let id: String = row.get(0)?;
166                let blob: Vec<u8> = row.get(1)?;
167                Ok((id, blob))
168            })
169            .map_err(|e| CodememError::Storage(e.to_string()))?;
170        let mut result = Vec::new();
171        for row in rows {
172            let (id, blob) = row.map_err(|e| CodememError::Storage(e.to_string()))?;
173            let floats: Vec<f32> = blob
174                .chunks_exact(4)
175                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
176                .collect();
177            result.push((id, floats));
178        }
179        Ok(result)
180    }
181
182    // ── Graph Node/Edge Persistence (delegated) ───────────────────────
183
184    delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
185    delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
186    delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
187    delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
188    delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
189    delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
190    delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
191    delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
192    delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
193
194    // ── Sessions (delegated where possible) ───────────────────────────
195
196    delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
197    delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
198
199    fn list_sessions(
200        &self,
201        namespace: Option<&str>,
202        limit: usize,
203    ) -> Result<Vec<Session>, CodememError> {
204        self.list_sessions_with_limit(namespace, limit)
205    }
206
207    // ── Consolidation (delegated) ─────────────────────────────────────
208
209    delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
210    delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
211
212    // ── Pattern Detection (delegated) ─────────────────────────────────
213
214    delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
215    delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
216    delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
217    delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
218
219    // ── Bulk Operations ───────────────────────────────────────────────
220
221    fn decay_stale_memories(
222        &self,
223        threshold_ts: i64,
224        decay_factor: f64,
225    ) -> Result<usize, CodememError> {
226        let conn = self.conn()?;
227        let rows = conn
228            .execute(
229                "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
230                params![decay_factor, threshold_ts],
231            )
232            .map_err(|e| CodememError::Storage(e.to_string()))?;
233        Ok(rows)
234    }
235
236    fn list_memories_for_creative(
237        &self,
238    ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
239        let conn = self.conn()?;
240        let mut stmt = conn
241            .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
242            .map_err(|e| CodememError::Storage(e.to_string()))?;
243
244        let rows = stmt
245            .query_map([], |row| {
246                Ok((
247                    row.get::<_, String>(0)?,
248                    row.get::<_, String>(1)?,
249                    row.get::<_, String>(2)?,
250                ))
251            })
252            .map_err(|e| CodememError::Storage(e.to_string()))?
253            .collect::<Result<Vec<_>, _>>()
254            .map_err(|e| CodememError::Storage(e.to_string()))?;
255
256        Ok(rows
257            .into_iter()
258            .map(|(id, mtype, tags_json)| {
259                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
260                (id, mtype, tags)
261            })
262            .collect())
263    }
264
265    fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
266        let conn = self.conn()?;
267        let mut stmt = conn
268            .prepare(
269                "SELECT a.id, b.id, 1.0 as similarity
270                 FROM memories a
271                 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
272                 WHERE a.id < b.id",
273            )
274            .map_err(|e| CodememError::Storage(e.to_string()))?;
275
276        let rows = stmt
277            .query_map([], |row| {
278                Ok((
279                    row.get::<_, String>(0)?,
280                    row.get::<_, String>(1)?,
281                    row.get::<_, f64>(2)?,
282                ))
283            })
284            .map_err(|e| CodememError::Storage(e.to_string()))?
285            .collect::<Result<Vec<_>, _>>()
286            .map_err(|e| CodememError::Storage(e.to_string()))?;
287
288        Ok(rows)
289    }
290
291    fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
292        let conn = self.conn()?;
293        let mut stmt = conn
294            .prepare(
295                "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
296            )
297            .map_err(|e| CodememError::Storage(e.to_string()))?;
298
299        let ids = stmt
300            .query_map(params![importance_threshold], |row| row.get(0))
301            .map_err(|e| CodememError::Storage(e.to_string()))?
302            .collect::<Result<Vec<String>, _>>()
303            .map_err(|e| CodememError::Storage(e.to_string()))?;
304
305        Ok(ids)
306    }
307
308    // ── Batch Operations ──────────────────────────────────────────────
309
310    fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
311        if memories.is_empty() {
312            return Ok(());
313        }
314        let conn = self.conn()?;
315        let tx = conn
316            .unchecked_transaction()
317            .map_err(|e| CodememError::Storage(e.to_string()))?;
318
319        const COLS: usize = 14;
320        const BATCH: usize = 999 / COLS; // 71
321
322        for chunk in memories.chunks(BATCH) {
323            let placeholders = multi_row_placeholders(COLS, chunk.len());
324            let sql = format!(
325                "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
326            );
327
328            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
329                Vec::with_capacity(chunk.len() * COLS);
330            for memory in chunk {
331                let tags_json = serde_json::to_string(&memory.tags)?;
332                let metadata_json = serde_json::to_string(&memory.metadata)?;
333                param_values.push(Box::new(memory.id.clone()));
334                param_values.push(Box::new(memory.content.clone()));
335                param_values.push(Box::new(memory.memory_type.to_string()));
336                param_values.push(Box::new(memory.importance));
337                param_values.push(Box::new(memory.confidence));
338                param_values.push(Box::new(memory.access_count as i64));
339                param_values.push(Box::new(memory.content_hash.clone()));
340                param_values.push(Box::new(tags_json));
341                param_values.push(Box::new(metadata_json));
342                param_values.push(Box::new(memory.namespace.clone()));
343                param_values.push(Box::new(memory.session_id.clone()));
344                param_values.push(Box::new(memory.created_at.timestamp()));
345                param_values.push(Box::new(memory.updated_at.timestamp()));
346                param_values.push(Box::new(memory.last_accessed_at.timestamp()));
347            }
348
349            let refs: Vec<&dyn rusqlite::types::ToSql> =
350                param_values.iter().map(|p| p.as_ref()).collect();
351            tx.execute(&sql, refs.as_slice())
352                .map_err(|e| CodememError::Storage(e.to_string()))?;
353        }
354
355        tx.commit()
356            .map_err(|e| CodememError::Storage(e.to_string()))?;
357        Ok(())
358    }
359
360    fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
361        if items.is_empty() {
362            return Ok(());
363        }
364        let conn = self.conn()?;
365        let tx = conn
366            .unchecked_transaction()
367            .map_err(|e| CodememError::Storage(e.to_string()))?;
368
369        const COLS: usize = 2;
370        const BATCH: usize = 999 / COLS; // 499
371
372        for chunk in items.chunks(BATCH) {
373            let placeholders = multi_row_placeholders(COLS, chunk.len());
374            let sql = format!(
375                "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
376            );
377
378            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
379                Vec::with_capacity(chunk.len() * COLS);
380            for (id, embedding) in chunk {
381                let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
382                param_values.push(Box::new(id.to_string()));
383                param_values.push(Box::new(blob));
384            }
385
386            let refs: Vec<&dyn rusqlite::types::ToSql> =
387                param_values.iter().map(|p| p.as_ref()).collect();
388            tx.execute(&sql, refs.as_slice())
389                .map_err(|e| CodememError::Storage(e.to_string()))?;
390        }
391
392        tx.commit()
393            .map_err(|e| CodememError::Storage(e.to_string()))?;
394        Ok(())
395    }
396
397    fn load_file_hashes(&self) -> Result<HashMap<String, String>, CodememError> {
398        let conn = self.conn()?;
399        let mut stmt = conn
400            .prepare("SELECT file_path, content_hash FROM file_hashes")
401            .map_err(|e| CodememError::Storage(e.to_string()))?;
402
403        let rows = stmt
404            .query_map([], |row| {
405                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
406            })
407            .map_err(|e| CodememError::Storage(e.to_string()))?
408            .collect::<Result<Vec<_>, _>>()
409            .map_err(|e| CodememError::Storage(e.to_string()))?;
410
411        Ok(rows.into_iter().collect())
412    }
413
414    fn save_file_hashes(&self, hashes: &HashMap<String, String>) -> Result<(), CodememError> {
415        let conn = self.conn()?;
416        let tx = conn
417            .unchecked_transaction()
418            .map_err(|e| CodememError::Storage(e.to_string()))?;
419
420        tx.execute("DELETE FROM file_hashes", [])
421            .map_err(|e| CodememError::Storage(e.to_string()))?;
422
423        for (path, hash) in hashes {
424            tx.execute(
425                "INSERT INTO file_hashes (file_path, content_hash) VALUES (?1, ?2)",
426                params![path, hash],
427            )
428            .map_err(|e| CodememError::Storage(e.to_string()))?;
429        }
430
431        tx.commit()
432            .map_err(|e| CodememError::Storage(e.to_string()))?;
433        Ok(())
434    }
435
436    fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
437        if nodes.is_empty() {
438            return Ok(());
439        }
440        let conn = self.conn()?;
441        let tx = conn
442            .unchecked_transaction()
443            .map_err(|e| CodememError::Storage(e.to_string()))?;
444
445        const COLS: usize = 7;
446        const BATCH: usize = 999 / COLS; // 142
447
448        for chunk in nodes.chunks(BATCH) {
449            let placeholders = multi_row_placeholders(COLS, chunk.len());
450            let sql = format!(
451                "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
452            );
453
454            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
455                Vec::with_capacity(chunk.len() * COLS);
456            for node in chunk {
457                let payload_json =
458                    serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
459                param_values.push(Box::new(node.id.clone()));
460                param_values.push(Box::new(node.kind.to_string()));
461                param_values.push(Box::new(node.label.clone()));
462                param_values.push(Box::new(payload_json));
463                param_values.push(Box::new(node.centrality));
464                param_values.push(Box::new(node.memory_id.clone()));
465                param_values.push(Box::new(node.namespace.clone()));
466            }
467
468            let refs: Vec<&dyn rusqlite::types::ToSql> =
469                param_values.iter().map(|p| p.as_ref()).collect();
470            tx.execute(&sql, refs.as_slice())
471                .map_err(|e| CodememError::Storage(e.to_string()))?;
472        }
473
474        tx.commit()
475            .map_err(|e| CodememError::Storage(e.to_string()))?;
476        Ok(())
477    }
478
479    fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
480        if edges.is_empty() {
481            return Ok(());
482        }
483        let conn = self.conn()?;
484        let tx = conn
485            .unchecked_transaction()
486            .map_err(|e| CodememError::Storage(e.to_string()))?;
487
488        const COLS: usize = 9;
489        const BATCH: usize = 999 / COLS; // 111
490
491        for chunk in edges.chunks(BATCH) {
492            let placeholders = multi_row_placeholders(COLS, chunk.len());
493            let sql = format!(
494                "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
495            );
496
497            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
498                Vec::with_capacity(chunk.len() * COLS);
499            for edge in chunk {
500                let props_json =
501                    serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
502                param_values.push(Box::new(edge.id.clone()));
503                param_values.push(Box::new(edge.src.clone()));
504                param_values.push(Box::new(edge.dst.clone()));
505                param_values.push(Box::new(edge.relationship.to_string()));
506                param_values.push(Box::new(edge.weight));
507                param_values.push(Box::new(props_json));
508                param_values.push(Box::new(edge.created_at.timestamp()));
509                param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
510                param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
511            }
512
513            let refs: Vec<&dyn rusqlite::types::ToSql> =
514                param_values.iter().map(|p| p.as_ref()).collect();
515            tx.execute(&sql, refs.as_slice())
516                .map_err(|e| CodememError::Storage(e.to_string()))?;
517        }
518
519        tx.commit()
520            .map_err(|e| CodememError::Storage(e.to_string()))?;
521        Ok(())
522    }
523
524    fn get_stale_memories_for_decay(
525        &self,
526        threshold_ts: i64,
527    ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
528        let conn = self.conn()?;
529        let mut stmt = conn
530            .prepare(
531                "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
532            )
533            .map_err(|e| CodememError::Storage(e.to_string()))?;
534
535        let rows = stmt
536            .query_map(params![threshold_ts], |row| {
537                Ok((
538                    row.get::<_, String>(0)?,
539                    row.get::<_, f64>(1)?,
540                    row.get::<_, u32>(2)?,
541                    row.get::<_, i64>(3)?,
542                ))
543            })
544            .map_err(|e| CodememError::Storage(e.to_string()))?
545            .collect::<Result<Vec<_>, _>>()
546            .map_err(|e| CodememError::Storage(e.to_string()))?;
547
548        Ok(rows)
549    }
550
551    fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
552        if updates.is_empty() {
553            return Ok(0);
554        }
555        let conn = self.conn()?;
556        let tx = conn
557            .unchecked_transaction()
558            .map_err(|e| CodememError::Storage(e.to_string()))?;
559
560        let mut count = 0usize;
561        for (id, importance) in updates {
562            let rows = tx
563                .execute(
564                    "UPDATE memories SET importance = ?1 WHERE id = ?2",
565                    params![importance, id],
566                )
567                .map_err(|e| CodememError::Storage(e.to_string()))?;
568            count += rows;
569        }
570
571        tx.commit()
572            .map_err(|e| CodememError::Storage(e.to_string()))?;
573        Ok(count)
574    }
575
576    fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
577        let conn = self.conn()?;
578        let count: i64 = if let Some(ns) = namespace {
579            conn.query_row(
580                "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
581                params![ns],
582                |row| row.get(0),
583            )
584            .map_err(|e| CodememError::Storage(e.to_string()))?
585        } else {
586            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
587                .map_err(|e| CodememError::Storage(e.to_string()))?
588        };
589        Ok(count as usize)
590    }
591
592    // ── Query Helpers ─────────────────────────────────────────────────
593
594    fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
595        let conn = self.conn()?;
596        let mut stmt = conn
597            .prepare(
598                "SELECT m.id, m.content FROM memories m
599                 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
600                 WHERE me.memory_id IS NULL",
601            )
602            .map_err(|e| CodememError::Storage(e.to_string()))?;
603
604        let rows = stmt
605            .query_map([], |row| {
606                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
607            })
608            .map_err(|e| CodememError::Storage(e.to_string()))?
609            .collect::<Result<Vec<_>, _>>()
610            .map_err(|e| CodememError::Storage(e.to_string()))?;
611
612        Ok(rows)
613    }
614
615    fn search_graph_nodes(
616        &self,
617        query: &str,
618        namespace: Option<&str>,
619        limit: usize,
620    ) -> Result<Vec<GraphNode>, CodememError> {
621        let conn = self.conn()?;
622        let escaped = query
623            .to_lowercase()
624            .replace('\\', "\\\\")
625            .replace('%', "\\%")
626            .replace('_', "\\_");
627        let pattern = format!("%{escaped}%");
628
629        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
630            if let Some(ns) = namespace {
631                (
632                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
633                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
634                 ORDER BY centrality DESC LIMIT ?3"
635                        .to_string(),
636                    vec![
637                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
638                        Box::new(ns.to_string()),
639                        Box::new(limit as i64),
640                    ],
641                )
642            } else {
643                (
644                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
645                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
646                 ORDER BY centrality DESC LIMIT ?2"
647                        .to_string(),
648                    vec![
649                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
650                        Box::new(limit as i64),
651                    ],
652                )
653            };
654
655        let refs: Vec<&dyn rusqlite::types::ToSql> =
656            params_vec.iter().map(|p| p.as_ref()).collect();
657        let mut stmt = conn
658            .prepare(&sql)
659            .map_err(|e| CodememError::Storage(e.to_string()))?;
660
661        let rows = stmt
662            .query_map(refs.as_slice(), |row| {
663                let kind_str: String = row.get(1)?;
664                let payload_str: String = row.get(3)?;
665                Ok(GraphNode {
666                    id: row.get(0)?,
667                    kind: kind_str.parse().unwrap_or(NodeKind::Memory),
668                    label: row.get(2)?,
669                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
670                    centrality: row.get(4)?,
671                    memory_id: row.get(5)?,
672                    namespace: row.get(6)?,
673                })
674            })
675            .map_err(|e| CodememError::Storage(e.to_string()))?
676            .collect::<Result<Vec<_>, _>>()
677            .map_err(|e| CodememError::Storage(e.to_string()))?;
678
679        Ok(rows)
680    }
681
682    fn list_memories_filtered(
683        &self,
684        namespace: Option<&str>,
685        memory_type: Option<&str>,
686    ) -> Result<Vec<MemoryNode>, CodememError> {
687        let conn = self.conn()?;
688        let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
689                        content_hash, tags, metadata, namespace, session_id, created_at, updated_at, \
690                        last_accessed_at FROM memories WHERE 1=1"
691            .to_string();
692        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
693
694        if let Some(ns) = namespace {
695            param_values.push(Box::new(ns.to_string()));
696            sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
697        }
698        if let Some(mt) = memory_type {
699            param_values.push(Box::new(mt.to_string()));
700            sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
701        }
702        sql.push_str(" ORDER BY created_at DESC");
703
704        let refs: Vec<&dyn rusqlite::types::ToSql> =
705            param_values.iter().map(|p| p.as_ref()).collect();
706        let mut stmt = conn
707            .prepare(&sql)
708            .map_err(|e| CodememError::Storage(e.to_string()))?;
709
710        let rows = stmt
711            .query_map(refs.as_slice(), |row| {
712                Ok(MemoryRow {
713                    id: row.get(0)?,
714                    content: row.get(1)?,
715                    memory_type: row.get(2)?,
716                    importance: row.get(3)?,
717                    confidence: row.get(4)?,
718                    access_count: row.get(5)?,
719                    content_hash: row.get(6)?,
720                    tags: row.get(7)?,
721                    metadata: row.get(8)?,
722                    namespace: row.get(9)?,
723                    session_id: row.get(10)?,
724                    created_at: row.get(11)?,
725                    updated_at: row.get(12)?,
726                    last_accessed_at: row.get(13)?,
727                })
728            })
729            .map_err(|e| CodememError::Storage(e.to_string()))?;
730
731        let mut result = Vec::new();
732        for row in rows {
733            let mr = row.map_err(|e| CodememError::Storage(e.to_string()))?;
734            result.push(mr.into_memory_node()?);
735        }
736
737        Ok(result)
738    }
739
740    // ── Session Activity (delegated) ──────────────────────────────────
741
742    delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
743    delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
744    delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
745    delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
746    delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
747    delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
748    delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
749
750    // ── Stats (delegated) ─────────────────────────────────────────────
751
752    delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
753
754    // ── Transaction Control ──────────────────────────────────────────
755
756    fn begin_transaction(&self) -> Result<(), CodememError> {
757        let conn = self.conn()?;
758        conn.execute_batch("BEGIN IMMEDIATE")
759            .map_err(|e| CodememError::Storage(e.to_string()))?;
760        self.in_transaction
761            .store(true, std::sync::atomic::Ordering::Release);
762        Ok(())
763    }
764
765    fn commit_transaction(&self) -> Result<(), CodememError> {
766        let conn = self.conn()?;
767        conn.execute_batch("COMMIT")
768            .map_err(|e| CodememError::Storage(e.to_string()))?;
769        // Clear flag after COMMIT succeeds — if COMMIT fails, the flag
770        // stays set so callers know a transaction is still active.
771        self.in_transaction
772            .store(false, std::sync::atomic::Ordering::Release);
773        Ok(())
774    }
775
776    fn rollback_transaction(&self) -> Result<(), CodememError> {
777        self.in_transaction
778            .store(false, std::sync::atomic::Ordering::Release);
779        let conn = self.conn()?;
780        conn.execute_batch("ROLLBACK")
781            .map_err(|e| CodememError::Storage(e.to_string()))?;
782        Ok(())
783    }
784}
785
786#[cfg(test)]
787#[path = "tests/backend_tests.rs"]
788mod tests;