Skip to main content

codemem_storage/
backend.rs

1//! `StorageBackend` trait implementation for Storage.
2
3use crate::{MapStorageErr, MemoryRow, Storage};
4use codemem_core::{
5    CodememError, ConsolidationLogEntry, Edge, GraphNode, MemoryNode, NodeKind, Repository,
6    Session, StorageBackend, StorageStats,
7};
8use rusqlite::params;
9use std::collections::HashMap;
10
11/// Generate SQL placeholders for multi-row INSERT: "(?1,?2,...),(?3,?4,...)"
12fn multi_row_placeholders(cols: usize, rows: usize) -> String {
13    let mut s = String::new();
14    for r in 0..rows {
15        if r > 0 {
16            s.push(',');
17        }
18        s.push('(');
19        for c in 0..cols {
20            if c > 0 {
21                s.push(',');
22            }
23            s.push('?');
24            s.push_str(&(r * cols + c + 1).to_string());
25        }
26        s.push(')');
27    }
28    s
29}
30
31/// Macro to delegate pure-forwarding trait methods to `Storage` inherent methods.
32macro_rules! delegate_storage {
33    // &self, no args
34    ($method:ident(&self) -> $ret:ty) => {
35        fn $method(&self) -> $ret {
36            Storage::$method(self)
37        }
38    };
39    // &self, one arg
40    ($method:ident(&self, $a1:ident: $t1:ty) -> $ret:ty) => {
41        fn $method(&self, $a1: $t1) -> $ret {
42            Storage::$method(self, $a1)
43        }
44    };
45    // &self, two args
46    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty) -> $ret:ty) => {
47        fn $method(&self, $a1: $t1, $a2: $t2) -> $ret {
48            Storage::$method(self, $a1, $a2)
49        }
50    };
51    // &self, three args
52    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty) -> $ret:ty) => {
53        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3) -> $ret {
54            Storage::$method(self, $a1, $a2, $a3)
55        }
56    };
57    // &self, five args
58    ($method:ident(&self, $a1:ident: $t1:ty, $a2:ident: $t2:ty, $a3:ident: $t3:ty, $a4:ident: $t4:ty, $a5:ident: $t5:ty) -> $ret:ty) => {
59        fn $method(&self, $a1: $t1, $a2: $t2, $a3: $t3, $a4: $t4, $a5: $t5) -> $ret {
60            Storage::$method(self, $a1, $a2, $a3, $a4, $a5)
61        }
62    };
63}
64
65impl StorageBackend for Storage {
66    // ── Memory CRUD (delegated) ───────────────────────────────────────
67
68    delegate_storage!(insert_memory(&self, memory: &MemoryNode) -> Result<(), CodememError>);
69    delegate_storage!(get_memory(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
70    delegate_storage!(get_memory_no_touch(&self, id: &str) -> Result<Option<MemoryNode>, CodememError>);
71    delegate_storage!(update_memory(&self, id: &str, content: &str, importance: Option<f64>) -> Result<(), CodememError>);
72    delegate_storage!(delete_memory(&self, id: &str) -> Result<bool, CodememError>);
73
74    /// M1: Override with transactional cascade delete.
75    fn delete_memory_cascade(&self, id: &str) -> Result<bool, CodememError> {
76        // Delegates to Storage::delete_memory_cascade which wraps all
77        // deletes (memory + graph nodes/edges + embedding) in a single transaction.
78        Storage::delete_memory_cascade(self, id)
79    }
80
81    /// Override with transactional batch cascade delete.
82    fn delete_memories_batch_cascade(&self, ids: &[&str]) -> Result<usize, CodememError> {
83        Storage::delete_memories_batch_cascade(self, ids)
84    }
85
86    delegate_storage!(list_memory_ids(&self) -> Result<Vec<String>, CodememError>);
87    delegate_storage!(list_memory_ids_for_namespace(&self, namespace: &str) -> Result<Vec<String>, CodememError>);
88    delegate_storage!(find_memory_ids_by_tag(&self, tag: &str, namespace: Option<&str>, exclude_id: &str) -> Result<Vec<String>, CodememError>);
89    delegate_storage!(list_namespaces(&self) -> Result<Vec<String>, CodememError>);
90    delegate_storage!(memory_count(&self) -> Result<usize, CodememError>);
91
92    fn get_memories_batch(&self, ids: &[&str]) -> Result<Vec<MemoryNode>, CodememError> {
93        if ids.is_empty() {
94            return Ok(Vec::new());
95        }
96        let conn = self.conn()?;
97
98        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
99        let sql = format!(
100            "SELECT id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at FROM memories WHERE id IN ({})",
101            placeholders.join(",")
102        );
103
104        let mut stmt = conn.prepare(&sql).storage_err()?;
105
106        let params: Vec<&dyn rusqlite::types::ToSql> = ids
107            .iter()
108            .map(|id| id as &dyn rusqlite::types::ToSql)
109            .collect();
110
111        let rows = stmt
112            .query_map(params.as_slice(), MemoryRow::from_row)
113            .storage_err()?;
114
115        let mut memories = Vec::new();
116        for row in rows {
117            let row = row.storage_err()?;
118            memories.push(row.into_memory_node()?);
119        }
120        Ok(memories)
121    }
122
123    // ── Embedding Persistence (delegated where possible) ──────────────
124
125    delegate_storage!(store_embedding(&self, memory_id: &str, embedding: &[f32]) -> Result<(), CodememError>);
126    delegate_storage!(get_embedding(&self, memory_id: &str) -> Result<Option<Vec<f32>>, CodememError>);
127
128    fn delete_embedding(&self, memory_id: &str) -> Result<bool, CodememError> {
129        let conn = self.conn()?;
130        let deleted = conn
131            .execute(
132                "DELETE FROM memory_embeddings WHERE memory_id = ?1",
133                [memory_id],
134            )
135            .storage_err()?;
136        Ok(deleted > 0)
137    }
138
139    fn list_all_embeddings(&self) -> Result<Vec<(String, Vec<f32>)>, CodememError> {
140        let conn = self.conn()?;
141        let mut stmt = conn
142            .prepare("SELECT memory_id, embedding FROM memory_embeddings")
143            .storage_err()?;
144        let rows = stmt
145            .query_map([], |row| {
146                let id: String = row.get(0)?;
147                let blob: Vec<u8> = row.get(1)?;
148                Ok((id, blob))
149            })
150            .storage_err()?;
151        let mut result = Vec::new();
152        for row in rows {
153            let (id, blob) = row.storage_err()?;
154            let floats: Vec<f32> = blob
155                .chunks_exact(4)
156                .map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
157                .collect();
158            result.push((id, floats));
159        }
160        Ok(result)
161    }
162
163    // ── Graph Node/Edge Persistence (delegated) ───────────────────────
164
165    delegate_storage!(insert_graph_node(&self, node: &GraphNode) -> Result<(), CodememError>);
166    delegate_storage!(get_graph_node(&self, id: &str) -> Result<Option<GraphNode>, CodememError>);
167    delegate_storage!(delete_graph_node(&self, id: &str) -> Result<bool, CodememError>);
168    delegate_storage!(all_graph_nodes(&self) -> Result<Vec<GraphNode>, CodememError>);
169    delegate_storage!(insert_graph_edge(&self, edge: &Edge) -> Result<(), CodememError>);
170    delegate_storage!(get_edges_for_node(&self, node_id: &str) -> Result<Vec<Edge>, CodememError>);
171    delegate_storage!(all_graph_edges(&self) -> Result<Vec<Edge>, CodememError>);
172    delegate_storage!(delete_graph_edge(&self, edge_id: &str) -> Result<bool, CodememError>);
173    delegate_storage!(delete_graph_edges_for_node(&self, node_id: &str) -> Result<usize, CodememError>);
174    delegate_storage!(delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError>);
175    // ── Sessions (delegated where possible) ───────────────────────────
176
177    delegate_storage!(start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError>);
178    delegate_storage!(end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError>);
179
180    fn list_sessions(
181        &self,
182        namespace: Option<&str>,
183        limit: usize,
184    ) -> Result<Vec<Session>, CodememError> {
185        self.list_sessions_with_limit(namespace, limit)
186    }
187
188    // ── Consolidation (delegated) ─────────────────────────────────────
189
190    delegate_storage!(insert_consolidation_log(&self, cycle_type: &str, affected_count: usize) -> Result<(), CodememError>);
191    delegate_storage!(last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError>);
192
193    // ── Pattern Detection (delegated) ─────────────────────────────────
194
195    delegate_storage!(get_repeated_searches(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
196    delegate_storage!(get_file_hotspots(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
197    delegate_storage!(get_tool_usage_stats(&self, namespace: Option<&str>) -> Result<Vec<(String, usize)>, CodememError>);
198    delegate_storage!(get_decision_chains(&self, min_count: usize, namespace: Option<&str>) -> Result<Vec<(String, usize, Vec<String>)>, CodememError>);
199
200    // ── Bulk Operations ───────────────────────────────────────────────
201
202    fn decay_stale_memories(
203        &self,
204        threshold_ts: i64,
205        decay_factor: f64,
206    ) -> Result<usize, CodememError> {
207        let conn = self.conn()?;
208        let rows = conn
209            .execute(
210                "UPDATE memories SET importance = importance * ?1 WHERE last_accessed_at < ?2",
211                params![decay_factor, threshold_ts],
212            )
213            .storage_err()?;
214        Ok(rows)
215    }
216
217    fn list_memories_for_creative(
218        &self,
219    ) -> Result<Vec<(String, String, Vec<String>)>, CodememError> {
220        let conn = self.conn()?;
221        let mut stmt = conn
222            .prepare("SELECT id, memory_type, tags FROM memories ORDER BY created_at DESC")
223            .storage_err()?;
224
225        let rows = stmt
226            .query_map([], |row| {
227                Ok((
228                    row.get::<_, String>(0)?,
229                    row.get::<_, String>(1)?,
230                    row.get::<_, String>(2)?,
231                ))
232            })
233            .storage_err()?
234            .collect::<Result<Vec<_>, _>>()
235            .storage_err()?;
236
237        Ok(rows
238            .into_iter()
239            .map(|(id, mtype, tags_json)| {
240                let tags: Vec<String> = serde_json::from_str(&tags_json).unwrap_or_default();
241                (id, mtype, tags)
242            })
243            .collect())
244    }
245
246    fn find_hash_duplicates(&self) -> Result<Vec<(String, String, f64)>, CodememError> {
247        let conn = self.conn()?;
248        let mut stmt = conn
249            .prepare(
250                "SELECT a.id, b.id, 1.0 as similarity
251                 FROM memories a
252                 INNER JOIN memories b ON substr(a.content_hash, 1, 16) = substr(b.content_hash, 1, 16)
253                 WHERE a.id < b.id",
254            )
255            .storage_err()?;
256
257        let rows = stmt
258            .query_map([], |row| {
259                Ok((
260                    row.get::<_, String>(0)?,
261                    row.get::<_, String>(1)?,
262                    row.get::<_, f64>(2)?,
263                ))
264            })
265            .storage_err()?
266            .collect::<Result<Vec<_>, _>>()
267            .storage_err()?;
268
269        Ok(rows)
270    }
271
272    fn find_forgettable(&self, importance_threshold: f64) -> Result<Vec<String>, CodememError> {
273        let conn = self.conn()?;
274        let mut stmt = conn
275            .prepare(
276                "SELECT id FROM memories WHERE importance < ?1 AND access_count = 0 ORDER BY importance ASC, last_accessed_at ASC",
277            )
278            .storage_err()?;
279
280        let ids = stmt
281            .query_map(params![importance_threshold], |row| row.get(0))
282            .storage_err()?
283            .collect::<Result<Vec<String>, _>>()
284            .storage_err()?;
285
286        Ok(ids)
287    }
288
289    // ── Batch Operations ──────────────────────────────────────────────
290
291    fn insert_memories_batch(&self, memories: &[MemoryNode]) -> Result<(), CodememError> {
292        if memories.is_empty() {
293            return Ok(());
294        }
295        let conn = self.conn()?;
296        let tx = conn.unchecked_transaction().storage_err()?;
297
298        const COLS: usize = 14;
299        const BATCH: usize = 999 / COLS; // 71
300
301        for chunk in memories.chunks(BATCH) {
302            let placeholders = multi_row_placeholders(COLS, chunk.len());
303            let sql = format!(
304                "INSERT OR IGNORE INTO memories (id, content, memory_type, importance, confidence, access_count, content_hash, tags, metadata, namespace, session_id, created_at, updated_at, last_accessed_at) VALUES {placeholders}"
305            );
306
307            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
308                Vec::with_capacity(chunk.len() * COLS);
309            for memory in chunk {
310                let tags_json = serde_json::to_string(&memory.tags)?;
311                let metadata_json = serde_json::to_string(&memory.metadata)?;
312                param_values.push(Box::new(memory.id.clone()));
313                param_values.push(Box::new(memory.content.clone()));
314                param_values.push(Box::new(memory.memory_type.to_string()));
315                param_values.push(Box::new(memory.importance));
316                param_values.push(Box::new(memory.confidence));
317                param_values.push(Box::new(memory.access_count as i64));
318                param_values.push(Box::new(memory.content_hash.clone()));
319                param_values.push(Box::new(tags_json));
320                param_values.push(Box::new(metadata_json));
321                param_values.push(Box::new(memory.namespace.clone()));
322                param_values.push(Box::new(memory.session_id.clone()));
323                param_values.push(Box::new(memory.created_at.timestamp()));
324                param_values.push(Box::new(memory.updated_at.timestamp()));
325                param_values.push(Box::new(memory.last_accessed_at.timestamp()));
326            }
327
328            let refs: Vec<&dyn rusqlite::types::ToSql> =
329                param_values.iter().map(|p| p.as_ref()).collect();
330            tx.execute(&sql, refs.as_slice()).storage_err()?;
331        }
332
333        tx.commit().storage_err()?;
334        Ok(())
335    }
336
337    fn store_embeddings_batch(&self, items: &[(&str, &[f32])]) -> Result<(), CodememError> {
338        if items.is_empty() {
339            return Ok(());
340        }
341        let conn = self.conn()?;
342        let tx = conn.unchecked_transaction().storage_err()?;
343
344        const COLS: usize = 2;
345        const BATCH: usize = 999 / COLS; // 499
346
347        for chunk in items.chunks(BATCH) {
348            let placeholders = multi_row_placeholders(COLS, chunk.len());
349            let sql = format!(
350                "INSERT OR REPLACE INTO memory_embeddings (memory_id, embedding) VALUES {placeholders}"
351            );
352
353            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
354                Vec::with_capacity(chunk.len() * COLS);
355            for (id, embedding) in chunk {
356                let blob: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
357                param_values.push(Box::new(id.to_string()));
358                param_values.push(Box::new(blob));
359            }
360
361            let refs: Vec<&dyn rusqlite::types::ToSql> =
362                param_values.iter().map(|p| p.as_ref()).collect();
363            tx.execute(&sql, refs.as_slice()).storage_err()?;
364        }
365
366        tx.commit().storage_err()?;
367        Ok(())
368    }
369
370    fn load_file_hashes(&self, namespace: &str) -> Result<HashMap<String, String>, CodememError> {
371        let conn = self.conn()?;
372        let mut stmt = conn
373            .prepare("SELECT file_path, content_hash FROM file_hashes WHERE namespace = ?1")
374            .storage_err()?;
375
376        let rows = stmt
377            .query_map(params![namespace], |row| {
378                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
379            })
380            .storage_err()?
381            .collect::<Result<Vec<_>, _>>()
382            .storage_err()?;
383
384        Ok(rows.into_iter().collect())
385    }
386
387    fn save_file_hashes(
388        &self,
389        namespace: &str,
390        hashes: &HashMap<String, String>,
391    ) -> Result<(), CodememError> {
392        let conn = self.conn()?;
393        let tx = conn.unchecked_transaction().storage_err()?;
394
395        tx.execute(
396            "DELETE FROM file_hashes WHERE namespace = ?1",
397            params![namespace],
398        )
399        .storage_err()?;
400
401        for (path, hash) in hashes {
402            tx.execute(
403                "INSERT INTO file_hashes (namespace, file_path, content_hash) VALUES (?1, ?2, ?3)",
404                params![namespace, path, hash],
405            )
406            .storage_err()?;
407        }
408
409        tx.commit().storage_err()?;
410        Ok(())
411    }
412
413    fn insert_graph_nodes_batch(&self, nodes: &[GraphNode]) -> Result<(), CodememError> {
414        if nodes.is_empty() {
415            return Ok(());
416        }
417        let conn = self.conn()?;
418        let tx = conn.unchecked_transaction().storage_err()?;
419
420        const COLS: usize = 7;
421        const BATCH: usize = 999 / COLS; // 142
422
423        for chunk in nodes.chunks(BATCH) {
424            let placeholders = multi_row_placeholders(COLS, chunk.len());
425            let sql = format!(
426                "INSERT OR REPLACE INTO graph_nodes (id, kind, label, payload, centrality, memory_id, namespace) VALUES {placeholders}"
427            );
428
429            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
430                Vec::with_capacity(chunk.len() * COLS);
431            for node in chunk {
432                let payload_json =
433                    serde_json::to_string(&node.payload).unwrap_or_else(|_| "{}".to_string());
434                param_values.push(Box::new(node.id.clone()));
435                param_values.push(Box::new(node.kind.to_string()));
436                param_values.push(Box::new(node.label.clone()));
437                param_values.push(Box::new(payload_json));
438                param_values.push(Box::new(node.centrality));
439                param_values.push(Box::new(node.memory_id.clone()));
440                param_values.push(Box::new(node.namespace.clone()));
441            }
442
443            let refs: Vec<&dyn rusqlite::types::ToSql> =
444                param_values.iter().map(|p| p.as_ref()).collect();
445            tx.execute(&sql, refs.as_slice()).storage_err()?;
446        }
447
448        tx.commit().storage_err()?;
449        Ok(())
450    }
451
452    fn insert_graph_edges_batch(&self, edges: &[Edge]) -> Result<(), CodememError> {
453        if edges.is_empty() {
454            return Ok(());
455        }
456        let conn = self.conn()?;
457        let tx = conn.unchecked_transaction().storage_err()?;
458
459        const COLS: usize = 9;
460        const BATCH: usize = 999 / COLS; // 111
461
462        for chunk in edges.chunks(BATCH) {
463            let placeholders = multi_row_placeholders(COLS, chunk.len());
464            let sql = format!(
465                "INSERT OR REPLACE INTO graph_edges (id, src, dst, relationship, weight, properties, created_at, valid_from, valid_to) VALUES {placeholders}"
466            );
467
468            let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
469                Vec::with_capacity(chunk.len() * COLS);
470            for edge in chunk {
471                let props_json =
472                    serde_json::to_string(&edge.properties).unwrap_or_else(|_| "{}".to_string());
473                param_values.push(Box::new(edge.id.clone()));
474                param_values.push(Box::new(edge.src.clone()));
475                param_values.push(Box::new(edge.dst.clone()));
476                param_values.push(Box::new(edge.relationship.to_string()));
477                param_values.push(Box::new(edge.weight));
478                param_values.push(Box::new(props_json));
479                param_values.push(Box::new(edge.created_at.timestamp()));
480                param_values.push(Box::new(edge.valid_from.map(|dt| dt.timestamp())));
481                param_values.push(Box::new(edge.valid_to.map(|dt| dt.timestamp())));
482            }
483
484            let refs: Vec<&dyn rusqlite::types::ToSql> =
485                param_values.iter().map(|p| p.as_ref()).collect();
486            tx.execute(&sql, refs.as_slice()).storage_err()?;
487        }
488
489        tx.commit().storage_err()?;
490        Ok(())
491    }
492
493    fn get_stale_memories_for_decay(
494        &self,
495        threshold_ts: i64,
496    ) -> Result<Vec<(String, f64, u32, i64)>, CodememError> {
497        let conn = self.conn()?;
498        let mut stmt = conn
499            .prepare(
500                "SELECT id, importance, access_count, last_accessed_at FROM memories WHERE last_accessed_at < ?1",
501            )
502            .storage_err()?;
503
504        let rows = stmt
505            .query_map(params![threshold_ts], |row| {
506                Ok((
507                    row.get::<_, String>(0)?,
508                    row.get::<_, f64>(1)?,
509                    row.get::<_, u32>(2)?,
510                    row.get::<_, i64>(3)?,
511                ))
512            })
513            .storage_err()?
514            .collect::<Result<Vec<_>, _>>()
515            .storage_err()?;
516
517        Ok(rows)
518    }
519
520    fn batch_update_importance(&self, updates: &[(String, f64)]) -> Result<usize, CodememError> {
521        if updates.is_empty() {
522            return Ok(0);
523        }
524        let conn = self.conn()?;
525        let tx = conn.unchecked_transaction().storage_err()?;
526
527        let mut count = 0usize;
528        for (id, importance) in updates {
529            let rows = tx
530                .execute(
531                    "UPDATE memories SET importance = ?1 WHERE id = ?2",
532                    params![importance, id],
533                )
534                .storage_err()?;
535            count += rows;
536        }
537
538        tx.commit().storage_err()?;
539        Ok(count)
540    }
541
542    fn session_count(&self, namespace: Option<&str>) -> Result<usize, CodememError> {
543        let conn = self.conn()?;
544        let count: i64 = if let Some(ns) = namespace {
545            conn.query_row(
546                "SELECT COUNT(*) FROM sessions WHERE namespace = ?1",
547                params![ns],
548                |row| row.get(0),
549            )
550            .storage_err()?
551        } else {
552            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
553                .storage_err()?
554        };
555        Ok(count as usize)
556    }
557
558    // ── Query Helpers ─────────────────────────────────────────────────
559
560    fn find_unembedded_memories(&self) -> Result<Vec<(String, String)>, CodememError> {
561        let conn = self.conn()?;
562        let mut stmt = conn
563            .prepare(
564                "SELECT m.id, m.content FROM memories m
565                 LEFT JOIN memory_embeddings me ON m.id = me.memory_id
566                 WHERE me.memory_id IS NULL",
567            )
568            .storage_err()?;
569
570        let rows = stmt
571            .query_map([], |row| {
572                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
573            })
574            .storage_err()?
575            .collect::<Result<Vec<_>, _>>()
576            .storage_err()?;
577
578        Ok(rows)
579    }
580
581    fn search_graph_nodes(
582        &self,
583        query: &str,
584        namespace: Option<&str>,
585        limit: usize,
586    ) -> Result<Vec<GraphNode>, CodememError> {
587        let conn = self.conn()?;
588        let escaped = query
589            .to_lowercase()
590            .replace('\\', "\\\\")
591            .replace('%', "\\%")
592            .replace('_', "\\_");
593        let pattern = format!("%{escaped}%");
594
595        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
596            if let Some(ns) = namespace {
597                (
598                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
599                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' AND namespace = ?2 \
600                 ORDER BY centrality DESC LIMIT ?3"
601                        .to_string(),
602                    vec![
603                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
604                        Box::new(ns.to_string()),
605                        Box::new(limit as i64),
606                    ],
607                )
608            } else {
609                (
610                    "SELECT id, kind, label, payload, centrality, memory_id, namespace \
611                 FROM graph_nodes WHERE LOWER(label) LIKE ?1 ESCAPE '\\' \
612                 ORDER BY centrality DESC LIMIT ?2"
613                        .to_string(),
614                    vec![
615                        Box::new(pattern) as Box<dyn rusqlite::types::ToSql>,
616                        Box::new(limit as i64),
617                    ],
618                )
619            };
620
621        let refs: Vec<&dyn rusqlite::types::ToSql> =
622            params_vec.iter().map(|p| p.as_ref()).collect();
623        let mut stmt = conn.prepare(&sql).storage_err()?;
624
625        let rows = stmt
626            .query_map(refs.as_slice(), |row| {
627                let kind_str: String = row.get(1)?;
628                let payload_str: String = row.get(3)?;
629                Ok(GraphNode {
630                    id: row.get(0)?,
631                    kind: kind_str.parse().unwrap_or(NodeKind::Memory),
632                    label: row.get(2)?,
633                    payload: serde_json::from_str(&payload_str).unwrap_or_default(),
634                    centrality: row.get(4)?,
635                    memory_id: row.get(5)?,
636                    namespace: row.get(6)?,
637                })
638            })
639            .storage_err()?
640            .collect::<Result<Vec<_>, _>>()
641            .storage_err()?;
642
643        Ok(rows)
644    }
645
646    fn list_memories_by_tag(
647        &self,
648        tag: &str,
649        namespace: Option<&str>,
650        limit: usize,
651    ) -> Result<Vec<MemoryNode>, CodememError> {
652        Storage::list_memories_by_tag(self, tag, namespace, limit)
653    }
654
655    fn list_memories_filtered(
656        &self,
657        namespace: Option<&str>,
658        memory_type: Option<&str>,
659    ) -> Result<Vec<MemoryNode>, CodememError> {
660        let conn = self.conn()?;
661        let mut sql = "SELECT id, content, memory_type, importance, confidence, access_count, \
662                        content_hash, tags, metadata, namespace, session_id, created_at, updated_at, \
663                        last_accessed_at FROM memories WHERE 1=1"
664            .to_string();
665        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
666
667        if let Some(ns) = namespace {
668            param_values.push(Box::new(ns.to_string()));
669            sql.push_str(&format!(" AND namespace = ?{}", param_values.len()));
670        }
671        if let Some(mt) = memory_type {
672            param_values.push(Box::new(mt.to_string()));
673            sql.push_str(&format!(" AND memory_type = ?{}", param_values.len()));
674        }
675        sql.push_str(" ORDER BY created_at DESC");
676
677        let refs: Vec<&dyn rusqlite::types::ToSql> =
678            param_values.iter().map(|p| p.as_ref()).collect();
679        let mut stmt = conn.prepare(&sql).storage_err()?;
680
681        let rows = stmt
682            .query_map(refs.as_slice(), MemoryRow::from_row)
683            .storage_err()?;
684
685        let mut result = Vec::new();
686        for row in rows {
687            let mr = row.storage_err()?;
688            result.push(mr.into_memory_node()?);
689        }
690
691        Ok(result)
692    }
693
694    // ── Session Activity (delegated) ──────────────────────────────────
695
696    delegate_storage!(record_session_activity(&self, session_id: &str, tool_name: &str, file_path: Option<&str>, directory: Option<&str>, pattern: Option<&str>) -> Result<(), CodememError>);
697    delegate_storage!(get_session_activity_summary(&self, session_id: &str) -> Result<codemem_core::SessionActivitySummary, CodememError>);
698    delegate_storage!(get_session_hot_directories(&self, session_id: &str, limit: usize) -> Result<Vec<(String, usize)>, CodememError>);
699    delegate_storage!(has_auto_insight(&self, session_id: &str, dedup_tag: &str) -> Result<bool, CodememError>);
700    delegate_storage!(count_directory_reads(&self, session_id: &str, directory: &str) -> Result<usize, CodememError>);
701    delegate_storage!(was_file_read_in_session(&self, session_id: &str, file_path: &str) -> Result<bool, CodememError>);
702    delegate_storage!(count_search_pattern_in_session(&self, session_id: &str, pattern: &str) -> Result<usize, CodememError>);
703
704    // ── Stats (delegated) ─────────────────────────────────────────────
705
706    delegate_storage!(stats(&self) -> Result<StorageStats, CodememError>);
707
708    // ── Transaction Control ──────────────────────────────────────────
709
710    fn begin_transaction(&self) -> Result<(), CodememError> {
711        let conn = self.conn()?;
712        conn.execute_batch("BEGIN IMMEDIATE").storage_err()?;
713        self.in_transaction
714            .store(true, std::sync::atomic::Ordering::Release);
715        Ok(())
716    }
717
718    fn commit_transaction(&self) -> Result<(), CodememError> {
719        let conn = self.conn()?;
720        conn.execute_batch("COMMIT").storage_err()?;
721        // Clear flag after COMMIT succeeds — if COMMIT fails, the flag
722        // stays set so callers know a transaction is still active.
723        self.in_transaction
724            .store(false, std::sync::atomic::Ordering::Release);
725        Ok(())
726    }
727
728    fn rollback_transaction(&self) -> Result<(), CodememError> {
729        self.in_transaction
730            .store(false, std::sync::atomic::Ordering::Release);
731        let conn = self.conn()?;
732        conn.execute_batch("ROLLBACK").storage_err()?;
733        Ok(())
734    }
735
736    // ── Repository Management ────────────────────────────────────────
737
738    fn list_repos(&self) -> Result<Vec<Repository>, CodememError> {
739        Storage::list_repos(self)
740    }
741
742    fn add_repo(&self, repo: &Repository) -> Result<(), CodememError> {
743        Storage::add_repo(self, repo)
744    }
745
746    fn get_repo(&self, id: &str) -> Result<Option<Repository>, CodememError> {
747        Storage::get_repo(self, id)
748    }
749
750    fn remove_repo(&self, id: &str) -> Result<bool, CodememError> {
751        Storage::remove_repo(self, id)
752    }
753
754    fn update_repo_status(
755        &self,
756        id: &str,
757        status: &str,
758        indexed_at: Option<&str>,
759    ) -> Result<(), CodememError> {
760        Storage::update_repo_status(self, id, status, indexed_at)
761    }
762
763    // ── Cross-Repo Persistence ────────────────────────────────────────
764
765    fn graph_edges_for_namespace_with_cross(
766        &self,
767        namespace: &str,
768        include_cross_namespace: bool,
769    ) -> Result<Vec<Edge>, CodememError> {
770        Storage::graph_edges_for_namespace_with_cross(self, namespace, include_cross_namespace)
771    }
772
773    fn upsert_package_registry(
774        &self,
775        package_name: &str,
776        namespace: &str,
777        version: &str,
778        manifest: &str,
779    ) -> Result<(), CodememError> {
780        Storage::upsert_package_registry(self, package_name, namespace, version, manifest)
781    }
782
783    fn store_unresolved_ref(
784        &self,
785        source_qualified_name: &str,
786        target_name: &str,
787        source_namespace: &str,
788        file_path: &str,
789        line: usize,
790        ref_kind: &str,
791        package_hint: Option<&str>,
792    ) -> Result<(), CodememError> {
793        use crate::cross_repo::UnresolvedRefEntry;
794        let entry = UnresolvedRefEntry {
795            id: format!("uref:{source_namespace}:{source_qualified_name}:{target_name}"),
796            namespace: source_namespace.to_string(),
797            source_node: source_qualified_name.to_string(),
798            target_name: target_name.to_string(),
799            package_hint: package_hint.map(|s| s.to_string()),
800            ref_kind: ref_kind.to_string(),
801            file_path: Some(file_path.to_string()),
802            line: Some(line as i64),
803            created_at: chrono::Utc::now().timestamp(),
804        };
805        Storage::insert_unresolved_ref(self, &entry)
806    }
807
808    fn store_unresolved_refs_batch(
809        &self,
810        refs: &[codemem_core::UnresolvedRefData],
811    ) -> Result<usize, CodememError> {
812        use crate::cross_repo::UnresolvedRefEntry;
813        let now = chrono::Utc::now().timestamp();
814        let entries: Vec<UnresolvedRefEntry> = refs
815            .iter()
816            .map(|r| UnresolvedRefEntry {
817                id: format!(
818                    "uref:{}:{}:{}",
819                    r.namespace, r.source_qualified_name, r.target_name
820                ),
821                namespace: r.namespace.clone(),
822                source_node: r.source_qualified_name.clone(),
823                target_name: r.target_name.clone(),
824                package_hint: r.package_hint.clone(),
825                ref_kind: r.ref_kind.clone(),
826                file_path: Some(r.file_path.clone()),
827                line: Some(r.line as i64),
828                created_at: now,
829            })
830            .collect();
831        let count = entries.len();
832        Storage::insert_unresolved_refs_batch(self, &entries)?;
833        Ok(count)
834    }
835
836    fn list_registered_packages(&self) -> Result<Vec<(String, String, String)>, CodememError> {
837        let conn = self.conn()?;
838        let mut stmt = conn
839            .prepare("SELECT package_name, namespace, manifest FROM package_registry")
840            .map_err(|e| CodememError::Storage(e.to_string()))?;
841        let rows = stmt
842            .query_map([], |row| {
843                Ok((
844                    row.get::<_, String>(0)?,
845                    row.get::<_, String>(1)?,
846                    row.get::<_, String>(2)?,
847                ))
848            })
849            .map_err(|e| CodememError::Storage(e.to_string()))?
850            .filter_map(|r| r.ok())
851            .collect();
852        Ok(rows)
853    }
854
855    fn list_pending_unresolved_refs(
856        &self,
857    ) -> Result<Vec<codemem_core::PendingUnresolvedRef>, CodememError> {
858        let conn = self.conn()?;
859        let mut stmt = conn
860            .prepare(
861                "SELECT id, source_node, target_name, namespace, file_path, line, ref_kind, package_hint
862                 FROM unresolved_refs",
863            )
864            .map_err(|e| CodememError::Storage(e.to_string()))?;
865        let rows = stmt
866            .query_map([], |row| {
867                Ok(codemem_core::PendingUnresolvedRef {
868                    id: row.get::<_, String>(0)?,
869                    source_node: row.get::<_, String>(1)?,
870                    target_name: row.get::<_, String>(2)?,
871                    namespace: row.get::<_, String>(3)?,
872                    file_path: row.get::<_, Option<String>>(4)?.unwrap_or_default(),
873                    line: row.get::<_, Option<i64>>(5)?.unwrap_or(0) as usize,
874                    ref_kind: row.get::<_, String>(6)?,
875                    package_hint: row.get::<_, Option<String>>(7)?,
876                })
877            })
878            .map_err(|e| CodememError::Storage(e.to_string()))?
879            .filter_map(|r| r.ok())
880            .collect();
881        Ok(rows)
882    }
883
884    fn delete_unresolved_ref(&self, id: &str) -> Result<(), CodememError> {
885        Storage::delete_unresolved_ref(self, id)
886    }
887
888    fn count_unresolved_refs(&self, namespace: &str) -> Result<usize, CodememError> {
889        let conn = self.conn()?;
890        let count: i64 = conn
891            .query_row(
892                "SELECT COUNT(*) FROM unresolved_refs WHERE namespace = ?1",
893                rusqlite::params![namespace],
894                |row| row.get(0),
895            )
896            .map_err(|e| CodememError::Storage(e.to_string()))?;
897        Ok(count as usize)
898    }
899
900    fn list_registered_packages_for_namespace(
901        &self,
902        namespace: &str,
903    ) -> Result<Vec<(String, String, String)>, CodememError> {
904        let entries = Storage::get_packages_for_namespace(self, namespace)?;
905        Ok(entries
906            .into_iter()
907            .map(|e| (e.package_name, e.namespace, e.manifest))
908            .collect())
909    }
910
911    fn store_api_endpoint(
912        &self,
913        method: &str,
914        path: &str,
915        handler_symbol: &str,
916        namespace: &str,
917    ) -> Result<(), CodememError> {
918        use crate::cross_repo::ApiEndpointEntry;
919        let entry = ApiEndpointEntry {
920            id: format!("ep:{}:{}:{}", namespace, method, path),
921            namespace: namespace.to_string(),
922            method: Some(method.to_string()),
923            path: path.to_string(),
924            handler: Some(handler_symbol.to_string()),
925            schema: "{}".to_string(),
926        };
927        Storage::upsert_api_endpoint(self, &entry)
928    }
929
930    fn store_api_client_call(
931        &self,
932        library: &str,
933        method: Option<&str>,
934        caller_symbol: &str,
935        namespace: &str,
936    ) -> Result<(), CodememError> {
937        let id = format!("client:{caller_symbol}:{library}");
938        Storage::upsert_api_client_call(self, &id, namespace, method, "", caller_symbol, library)
939    }
940
941    fn list_api_endpoints(
942        &self,
943        namespace: &str,
944    ) -> Result<Vec<(String, String, String, String)>, CodememError> {
945        let entries = Storage::get_api_endpoints_for_namespace(self, namespace)?;
946        Ok(entries
947            .into_iter()
948            .map(|e| {
949                (
950                    e.method.unwrap_or_default(),
951                    e.path,
952                    e.handler.unwrap_or_default(),
953                    e.namespace,
954                )
955            })
956            .collect())
957    }
958}
959
960#[cfg(test)]
961#[path = "tests/backend_tests.rs"]
962mod tests;