Skip to main content

codemem_storage/
queries.rs

1//! Stats, consolidation, pattern queries, and session management on Storage.
2
3use crate::{MapStorageErr, Storage};
4use codemem_core::{CodememError, ConsolidationLogEntry, Session, StorageStats};
5use rusqlite::params;
6
7impl Storage {
8    // ── Health / Diagnostics ────────────────────────────────────────────
9
10    /// Run SQLite `PRAGMA integrity_check`. Returns `true` if the database is OK.
11    pub fn integrity_check(&self) -> Result<bool, CodememError> {
12        let conn = self.conn()?;
13        let result: String = conn
14            .query_row("PRAGMA integrity_check", [], |row| row.get(0))
15            .storage_err()?;
16        Ok(result == "ok")
17    }
18
19    /// Return the current schema version (max applied migration number).
20    pub fn schema_version(&self) -> Result<u32, CodememError> {
21        let conn = self.conn()?;
22        let version: u32 = conn
23            .query_row(
24                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
25                [],
26                |row| row.get(0),
27            )
28            .storage_err()?;
29        Ok(version)
30    }
31
32    // ── Stats ───────────────────────────────────────────────────────────
33
34    /// Get database statistics in a single query.
35    pub fn stats(&self) -> Result<StorageStats, CodememError> {
36        let conn = self.conn()?;
37
38        let (memory_count, embedding_count, node_count, edge_count) = conn
39            .query_row(
40                "SELECT
41                    (SELECT COUNT(*) FROM memories) AS memory_count,
42                    (SELECT COUNT(*) FROM memory_embeddings) AS embedding_count,
43                    (SELECT COUNT(*) FROM graph_nodes) AS node_count,
44                    (SELECT COUNT(*) FROM graph_edges) AS edge_count",
45                [],
46                |row| {
47                    Ok((
48                        row.get::<_, i64>(0)?,
49                        row.get::<_, i64>(1)?,
50                        row.get::<_, i64>(2)?,
51                        row.get::<_, i64>(3)?,
52                    ))
53                },
54            )
55            .storage_err()?;
56
57        Ok(StorageStats {
58            memory_count: memory_count as usize,
59            embedding_count: embedding_count as usize,
60            node_count: node_count as usize,
61            edge_count: edge_count as usize,
62        })
63    }
64
65    // ── Consolidation Log ──────────────────────────────────────────────
66
67    /// Record a consolidation run.
68    pub fn insert_consolidation_log(
69        &self,
70        cycle_type: &str,
71        affected_count: usize,
72    ) -> Result<(), CodememError> {
73        let conn = self.conn()?;
74        let now = chrono::Utc::now().timestamp();
75        conn.execute(
76            "INSERT INTO consolidation_log (cycle_type, run_at, affected_count) VALUES (?1, ?2, ?3)",
77            params![cycle_type, now, affected_count as i64],
78        )
79        .storage_err()?;
80        Ok(())
81    }
82
83    /// Get the last consolidation run for each cycle type.
84    pub fn last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError> {
85        let conn = self.conn()?;
86        let mut stmt = conn
87            .prepare(
88                "SELECT cycle_type, run_at, affected_count FROM consolidation_log
89                 WHERE (cycle_type, run_at) IN (
90                     SELECT cycle_type, MAX(run_at) FROM consolidation_log GROUP BY cycle_type
91                 )
92                 ORDER BY cycle_type",
93            )
94            .storage_err()?;
95
96        let entries = stmt
97            .query_map([], |row| {
98                Ok(ConsolidationLogEntry {
99                    cycle_type: row.get(0)?,
100                    run_at: row.get(1)?,
101                    affected_count: row.get::<_, i64>(2)? as usize,
102                })
103            })
104            .storage_err()?
105            .collect::<Result<Vec<_>, _>>()
106            .storage_err()?;
107
108        Ok(entries)
109    }
110
111    // ── Pattern Detection Queries ───────────────────────────────────────
112
113    /// Find repeated search patterns (Grep/Glob) by extracting the "pattern" field
114    /// from memory metadata JSON. Returns (pattern, count, memory_ids) tuples where
115    /// count >= min_count, ordered by count descending.
116    pub fn get_repeated_searches(
117        &self,
118        min_count: usize,
119        namespace: Option<&str>,
120    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
121        let conn = self.conn()?;
122        let sql = if namespace.is_some() {
123            "SELECT json_extract(metadata, '$.pattern') AS pat,
124                    COUNT(*) AS cnt,
125                    GROUP_CONCAT(id, ',') AS ids
126             FROM memories
127             WHERE json_extract(metadata, '$.tool') IN ('Grep', 'Glob')
128               AND pat IS NOT NULL
129               AND namespace = ?1
130             GROUP BY pat
131             HAVING cnt >= ?2
132             ORDER BY cnt DESC"
133        } else {
134            "SELECT json_extract(metadata, '$.pattern') AS pat,
135                    COUNT(*) AS cnt,
136                    GROUP_CONCAT(id, ',') AS ids
137             FROM memories
138             WHERE json_extract(metadata, '$.tool') IN ('Grep', 'Glob')
139               AND pat IS NOT NULL
140             GROUP BY pat
141             HAVING cnt >= ?1
142             ORDER BY cnt DESC"
143        };
144
145        let mut stmt = conn.prepare(sql).storage_err()?;
146
147        let rows = if let Some(ns) = namespace {
148            stmt.query_map(params![ns, min_count as i64], |row| {
149                Ok((
150                    row.get::<_, String>(0)?,
151                    row.get::<_, i64>(1)?,
152                    row.get::<_, String>(2)?,
153                ))
154            })
155            .storage_err()?
156            .collect::<Result<Vec<_>, _>>()
157            .storage_err()?
158        } else {
159            stmt.query_map(params![min_count as i64], |row| {
160                Ok((
161                    row.get::<_, String>(0)?,
162                    row.get::<_, i64>(1)?,
163                    row.get::<_, String>(2)?,
164                ))
165            })
166            .storage_err()?
167            .collect::<Result<Vec<_>, _>>()
168            .storage_err()?
169        };
170
171        Ok(rows
172            .into_iter()
173            .map(|(pat, cnt, ids_str)| {
174                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
175                (pat, cnt as usize, ids)
176            })
177            .collect())
178    }
179
180    /// Find file hotspots by extracting the "file_path" field from memory metadata.
181    pub fn get_file_hotspots(
182        &self,
183        min_count: usize,
184        namespace: Option<&str>,
185    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
186        let conn = self.conn()?;
187        let sql = if namespace.is_some() {
188            "SELECT json_extract(metadata, '$.file_path') AS fp,
189                    COUNT(*) AS cnt,
190                    GROUP_CONCAT(id, ',') AS ids
191             FROM memories
192             WHERE fp IS NOT NULL
193               AND namespace = ?1
194             GROUP BY fp
195             HAVING cnt >= ?2
196             ORDER BY cnt DESC"
197        } else {
198            "SELECT json_extract(metadata, '$.file_path') AS fp,
199                    COUNT(*) AS cnt,
200                    GROUP_CONCAT(id, ',') AS ids
201             FROM memories
202             WHERE fp IS NOT NULL
203             GROUP BY fp
204             HAVING cnt >= ?1
205             ORDER BY cnt DESC"
206        };
207
208        let mut stmt = conn.prepare(sql).storage_err()?;
209
210        let rows = if let Some(ns) = namespace {
211            stmt.query_map(params![ns, min_count as i64], |row| {
212                Ok((
213                    row.get::<_, String>(0)?,
214                    row.get::<_, i64>(1)?,
215                    row.get::<_, String>(2)?,
216                ))
217            })
218            .storage_err()?
219            .collect::<Result<Vec<_>, _>>()
220            .storage_err()?
221        } else {
222            stmt.query_map(params![min_count as i64], |row| {
223                Ok((
224                    row.get::<_, String>(0)?,
225                    row.get::<_, i64>(1)?,
226                    row.get::<_, String>(2)?,
227                ))
228            })
229            .storage_err()?
230            .collect::<Result<Vec<_>, _>>()
231            .storage_err()?
232        };
233
234        Ok(rows
235            .into_iter()
236            .map(|(fp, cnt, ids_str)| {
237                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
238                (fp, cnt as usize, ids)
239            })
240            .collect())
241    }
242
243    /// Get tool usage statistics from memory metadata.
244    /// Returns (tool_name, count) pairs sorted by count descending.
245    pub fn get_tool_usage_stats(
246        &self,
247        namespace: Option<&str>,
248    ) -> Result<Vec<(String, usize)>, CodememError> {
249        let conn = self.conn()?;
250        let sql = if namespace.is_some() {
251            "SELECT json_extract(metadata, '$.tool') AS tool,
252                    COUNT(*) AS cnt
253             FROM memories
254             WHERE tool IS NOT NULL
255               AND namespace = ?1
256             GROUP BY tool
257             ORDER BY cnt DESC"
258        } else {
259            "SELECT json_extract(metadata, '$.tool') AS tool,
260                    COUNT(*) AS cnt
261             FROM memories
262             WHERE tool IS NOT NULL
263             GROUP BY tool
264             ORDER BY cnt DESC"
265        };
266
267        let mut stmt = conn.prepare(sql).storage_err()?;
268
269        let rows = if let Some(ns) = namespace {
270            stmt.query_map(params![ns], |row| {
271                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
272            })
273            .storage_err()?
274            .collect::<Result<Vec<_>, _>>()
275            .storage_err()?
276        } else {
277            stmt.query_map([], |row| {
278                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
279            })
280            .storage_err()?
281            .collect::<Result<Vec<_>, _>>()
282            .storage_err()?
283        };
284
285        Ok(rows
286            .into_iter()
287            .map(|(tool, cnt)| (tool, cnt as usize))
288            .collect())
289    }
290
291    /// Find decision chains: files with multiple Edit/Write memories over time.
292    pub fn get_decision_chains(
293        &self,
294        min_count: usize,
295        namespace: Option<&str>,
296    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
297        let conn = self.conn()?;
298        let sql = if namespace.is_some() {
299            "SELECT json_extract(metadata, '$.file_path') AS fp,
300                    COUNT(*) AS cnt,
301                    GROUP_CONCAT(id, ',') AS ids
302             FROM memories
303             WHERE json_extract(metadata, '$.tool') IN ('Edit', 'Write')
304               AND fp IS NOT NULL
305               AND namespace = ?1
306             GROUP BY fp
307             HAVING cnt >= ?2
308             ORDER BY cnt DESC"
309        } else {
310            "SELECT json_extract(metadata, '$.file_path') AS fp,
311                    COUNT(*) AS cnt,
312                    GROUP_CONCAT(id, ',') AS ids
313             FROM memories
314             WHERE json_extract(metadata, '$.tool') IN ('Edit', 'Write')
315               AND fp IS NOT NULL
316             GROUP BY fp
317             HAVING cnt >= ?1
318             ORDER BY cnt DESC"
319        };
320
321        let mut stmt = conn.prepare(sql).storage_err()?;
322
323        let rows = if let Some(ns) = namespace {
324            stmt.query_map(params![ns, min_count as i64], |row| {
325                Ok((
326                    row.get::<_, String>(0)?,
327                    row.get::<_, i64>(1)?,
328                    row.get::<_, String>(2)?,
329                ))
330            })
331            .storage_err()?
332            .collect::<Result<Vec<_>, _>>()
333            .storage_err()?
334        } else {
335            stmt.query_map(params![min_count as i64], |row| {
336                Ok((
337                    row.get::<_, String>(0)?,
338                    row.get::<_, i64>(1)?,
339                    row.get::<_, String>(2)?,
340                ))
341            })
342            .storage_err()?
343            .collect::<Result<Vec<_>, _>>()
344            .storage_err()?
345        };
346
347        Ok(rows
348            .into_iter()
349            .map(|(fp, cnt, ids_str)| {
350                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
351                (fp, cnt as usize, ids)
352            })
353            .collect())
354    }
355
356    // ── Insight / Tag Queries ──────────────────────────────────────────
357
358    /// List memories that contain a specific tag, optionally scoped to a namespace.
359    /// Uses `json_each` for proper JSON array querying instead of LIKE patterns.
360    pub fn list_memories_by_tag(
361        &self,
362        tag: &str,
363        namespace: Option<&str>,
364        limit: usize,
365    ) -> Result<Vec<codemem_core::MemoryNode>, CodememError> {
366        let conn = self.conn()?;
367
368        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(ns) =
369            namespace
370        {
371            (
372                "SELECT m.id, m.content, m.memory_type, m.importance, m.confidence, m.access_count, \
373                 m.content_hash, m.tags, m.metadata, m.namespace, m.session_id, m.created_at, m.updated_at, m.last_accessed_at \
374                 FROM memories m, json_each(m.tags) AS jt \
375                 WHERE jt.value = ?1 AND m.namespace = ?2 \
376                 ORDER BY m.created_at DESC LIMIT ?3"
377                    .to_string(),
378                vec![
379                    Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
380                    Box::new(ns.to_string()),
381                    Box::new(limit as i64),
382                ],
383            )
384        } else {
385            (
386                "SELECT m.id, m.content, m.memory_type, m.importance, m.confidence, m.access_count, \
387                 m.content_hash, m.tags, m.metadata, m.namespace, m.session_id, m.created_at, m.updated_at, m.last_accessed_at \
388                 FROM memories m, json_each(m.tags) AS jt \
389                 WHERE jt.value = ?1 \
390                 ORDER BY m.created_at DESC LIMIT ?2"
391                    .to_string(),
392                vec![
393                    Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
394                    Box::new(limit as i64),
395                ],
396            )
397        };
398
399        let params_refs: Vec<&dyn rusqlite::types::ToSql> =
400            params_vec.iter().map(|b| &**b).collect();
401
402        let mut stmt = conn.prepare(&sql).storage_err()?;
403
404        let rows = stmt
405            .query_map(params_refs.as_slice(), |row| {
406                let created_ts: i64 = row.get(11)?;
407                let updated_ts: i64 = row.get(12)?;
408                let accessed_ts: i64 = row.get(13)?;
409                let tags_json: String = row.get(7)?;
410                let metadata_json: String = row.get(8)?;
411                let memory_type_str: String = row.get(2)?;
412
413                Ok(codemem_core::MemoryNode {
414                    id: row.get(0)?,
415                    content: row.get(1)?,
416                    memory_type: memory_type_str
417                        .parse()
418                        .unwrap_or(codemem_core::MemoryType::Context),
419                    importance: row.get(3)?,
420                    confidence: row.get(4)?,
421                    access_count: row.get::<_, i64>(5).unwrap_or(0) as u32,
422                    content_hash: row.get(6)?,
423                    tags: serde_json::from_str(&tags_json).unwrap_or_default(),
424                    metadata: serde_json::from_str(&metadata_json).unwrap_or_default(),
425                    namespace: row.get(9)?,
426                    session_id: row.get(10)?,
427                    created_at: chrono::DateTime::from_timestamp(created_ts, 0)
428                        .unwrap_or_default()
429                        .with_timezone(&chrono::Utc),
430                    updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
431                        .unwrap_or_default()
432                        .with_timezone(&chrono::Utc),
433                    last_accessed_at: chrono::DateTime::from_timestamp(accessed_ts, 0)
434                        .unwrap_or_default()
435                        .with_timezone(&chrono::Utc),
436                })
437            })
438            .storage_err()?
439            .collect::<Result<Vec<_>, _>>()
440            .storage_err()?;
441
442        Ok(rows)
443    }
444
445    // ── Session Management ─────────────────────────────────────────────
446
447    /// Start a new session.
448    pub fn start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError> {
449        let conn = self.conn()?;
450        let now = chrono::Utc::now().timestamp();
451        conn.execute(
452            "INSERT OR IGNORE INTO sessions (id, namespace, started_at) VALUES (?1, ?2, ?3)",
453            params![id, namespace, now],
454        )
455        .storage_err()?;
456        Ok(())
457    }
458
459    /// End a session by setting ended_at and optionally a summary.
460    pub fn end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError> {
461        let conn = self.conn()?;
462        let now = chrono::Utc::now().timestamp();
463        conn.execute(
464            "UPDATE sessions SET ended_at = ?1, summary = ?2 WHERE id = ?3",
465            params![now, summary, id],
466        )
467        .storage_err()?;
468        Ok(())
469    }
470
471    /// List sessions, optionally filtered by namespace.
472    pub fn list_sessions(&self, namespace: Option<&str>) -> Result<Vec<Session>, CodememError> {
473        self.list_sessions_with_limit(namespace, usize::MAX)
474    }
475
476    // ── Session Activity Tracking ─────────────────────────────────
477
478    /// Record a session activity event.
479    pub fn record_session_activity(
480        &self,
481        session_id: &str,
482        tool_name: &str,
483        file_path: Option<&str>,
484        directory: Option<&str>,
485        pattern: Option<&str>,
486    ) -> Result<(), CodememError> {
487        let conn = self.conn()?;
488        let now = chrono::Utc::now().timestamp();
489        conn.execute(
490            "INSERT INTO session_activity (session_id, tool_name, file_path, directory, pattern, created_at)
491             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
492            params![session_id, tool_name, file_path, directory, pattern, now],
493        )
494        .storage_err()?;
495        Ok(())
496    }
497
498    /// Get a summary of session activity counts using a single query with conditional aggregation.
499    pub fn get_session_activity_summary(
500        &self,
501        session_id: &str,
502    ) -> Result<codemem_core::SessionActivitySummary, CodememError> {
503        let conn = self.conn()?;
504
505        let (files_read, files_edited, searches, total_actions) = conn
506            .query_row(
507                "SELECT
508                     COUNT(DISTINCT CASE WHEN tool_name = 'Read' AND file_path IS NOT NULL THEN file_path END),
509                     COUNT(DISTINCT CASE WHEN tool_name IN ('Edit', 'Write') AND file_path IS NOT NULL THEN file_path END),
510                     SUM(CASE WHEN tool_name IN ('Grep', 'Glob') THEN 1 ELSE 0 END),
511                     COUNT(*)
512                 FROM session_activity
513                 WHERE session_id = ?1",
514                params![session_id],
515                |row| {
516                    Ok((
517                        row.get::<_, i64>(0)?,
518                        row.get::<_, i64>(1)?,
519                        row.get::<_, i64>(2)?,
520                        row.get::<_, i64>(3)?,
521                    ))
522                },
523            )
524            .storage_err()?;
525
526        Ok(codemem_core::SessionActivitySummary {
527            files_read: files_read as usize,
528            files_edited: files_edited as usize,
529            searches: searches as usize,
530            total_actions: total_actions as usize,
531        })
532    }
533
534    /// Get the most active directories in a session.
535    pub fn get_session_hot_directories(
536        &self,
537        session_id: &str,
538        limit: usize,
539    ) -> Result<Vec<(String, usize)>, CodememError> {
540        let conn = self.conn()?;
541        let mut stmt = conn
542            .prepare(
543                "SELECT directory, COUNT(*) AS cnt FROM session_activity
544                 WHERE session_id = ?1 AND directory IS NOT NULL
545                 GROUP BY directory ORDER BY cnt DESC LIMIT ?2",
546            )
547            .storage_err()?;
548
549        let rows = stmt
550            .query_map(params![session_id, limit as i64], |row| {
551                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
552            })
553            .storage_err()?
554            .collect::<Result<Vec<_>, _>>()
555            .storage_err()?;
556
557        Ok(rows
558            .into_iter()
559            .map(|(dir, cnt)| (dir, cnt as usize))
560            .collect())
561    }
562
563    /// Check whether an auto-insight dedup tag exists for a session.
564    /// Uses `json_extract` with proper parameter binding instead of LIKE on JSON.
565    pub fn has_auto_insight(
566        &self,
567        session_id: &str,
568        dedup_tag: &str,
569    ) -> Result<bool, CodememError> {
570        let conn = self.conn()?;
571        let count: i64 = conn
572            .query_row(
573                "SELECT COUNT(*) FROM memories
574                 WHERE json_extract(metadata, '$.session_id') = ?1
575                   AND json_extract(metadata, '$.auto_insight_tag') = ?2",
576                params![session_id, dedup_tag],
577                |row| row.get(0),
578            )
579            .storage_err()?;
580        Ok(count > 0)
581    }
582
583    /// Count Read events in a directory during a session.
584    pub fn count_directory_reads(
585        &self,
586        session_id: &str,
587        directory: &str,
588    ) -> Result<usize, CodememError> {
589        let conn = self.conn()?;
590        let count: i64 = conn
591            .query_row(
592                "SELECT COUNT(*) FROM session_activity
593                 WHERE session_id = ?1 AND tool_name = 'Read' AND directory = ?2",
594                params![session_id, directory],
595                |row| row.get(0),
596            )
597            .storage_err()?;
598        Ok(count as usize)
599    }
600
601    /// Check if a file was read in the current session.
602    pub fn was_file_read_in_session(
603        &self,
604        session_id: &str,
605        file_path: &str,
606    ) -> Result<bool, CodememError> {
607        let conn = self.conn()?;
608        let count: i64 = conn
609            .query_row(
610                "SELECT COUNT(*) FROM session_activity
611                 WHERE session_id = ?1 AND tool_name = 'Read' AND file_path = ?2",
612                params![session_id, file_path],
613                |row| row.get(0),
614            )
615            .storage_err()?;
616        Ok(count > 0)
617    }
618
619    /// Count how many times a search pattern was used in a session.
620    pub fn count_search_pattern_in_session(
621        &self,
622        session_id: &str,
623        pattern: &str,
624    ) -> Result<usize, CodememError> {
625        let conn = self.conn()?;
626        let count: i64 = conn
627            .query_row(
628                "SELECT COUNT(*) FROM session_activity
629                 WHERE session_id = ?1 AND tool_name IN ('Grep', 'Glob') AND pattern = ?2",
630                params![session_id, pattern],
631                |row| row.get(0),
632            )
633            .storage_err()?;
634        Ok(count as usize)
635    }
636
637    /// List sessions with a limit.
638    pub(crate) fn list_sessions_with_limit(
639        &self,
640        namespace: Option<&str>,
641        limit: usize,
642    ) -> Result<Vec<Session>, CodememError> {
643        let conn = self.conn()?;
644        let sql_with_ns = "SELECT s.id, s.namespace, s.started_at, s.ended_at, (SELECT COUNT(*) FROM memories m WHERE m.session_id = s.id) as memory_count, s.summary FROM sessions s WHERE s.namespace = ?1 ORDER BY s.started_at DESC LIMIT ?2";
645        let sql_all = "SELECT s.id, s.namespace, s.started_at, s.ended_at, (SELECT COUNT(*) FROM memories m WHERE m.session_id = s.id) as memory_count, s.summary FROM sessions s ORDER BY s.started_at DESC LIMIT ?1";
646
647        let map_row = |row: &rusqlite::Row<'_>| -> rusqlite::Result<Session> {
648            let started_ts: i64 = row.get(2)?;
649            let ended_ts: Option<i64> = row.get(3)?;
650            Ok(Session {
651                id: row.get(0)?,
652                namespace: row.get(1)?,
653                started_at: chrono::DateTime::from_timestamp(started_ts, 0)
654                    .unwrap_or_default()
655                    .with_timezone(&chrono::Utc),
656                ended_at: ended_ts.and_then(|ts| {
657                    chrono::DateTime::from_timestamp(ts, 0).map(|dt| dt.with_timezone(&chrono::Utc))
658                }),
659                memory_count: row.get::<_, i64>(4).unwrap_or(0) as u32,
660                summary: row.get(5)?,
661            })
662        };
663
664        if let Some(ns) = namespace {
665            let mut stmt = conn.prepare(sql_with_ns).storage_err()?;
666            let rows = stmt
667                .query_map(params![ns, limit as i64], map_row)
668                .storage_err()?;
669            rows.collect::<Result<Vec<_>, _>>().storage_err()
670        } else {
671            let mut stmt = conn.prepare(sql_all).storage_err()?;
672            let rows = stmt
673                .query_map(params![limit as i64], map_row)
674                .storage_err()?;
675            rows.collect::<Result<Vec<_>, _>>().storage_err()
676        }
677    }
678
679    // ── Tag-based Queries ─────────────────────────────────────────
680
681    /// Find memory IDs whose tags JSON array contains the given tag value.
682    /// Optionally scoped to a namespace. Excludes the given `exclude_id`.
683    /// Returns at most 50 results ordered by creation time (most recent siblings first).
684    pub fn find_memory_ids_by_tag(
685        &self,
686        tag: &str,
687        namespace: Option<&str>,
688        exclude_id: &str,
689    ) -> Result<Vec<String>, CodememError> {
690        let conn = self.conn()?;
691
692        // Use json_each() for exact tag matching instead of LIKE (safe against %, _, " in tags).
693        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
694            if let Some(ns) = namespace {
695                (
696                    "SELECT DISTINCT m.id FROM memories m, json_each(m.tags) t \
697                 WHERE t.value = ?1 AND m.namespace IS ?2 AND m.id != ?3 \
698                 ORDER BY m.created_at DESC LIMIT 50"
699                        .to_string(),
700                    vec![
701                        Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
702                        Box::new(ns.to_string()),
703                        Box::new(exclude_id.to_string()),
704                    ],
705                )
706            } else {
707                (
708                    "SELECT DISTINCT m.id FROM memories m, json_each(m.tags) t \
709                 WHERE t.value = ?1 AND m.namespace IS NULL AND m.id != ?2 \
710                 ORDER BY m.created_at DESC LIMIT 50"
711                        .to_string(),
712                    vec![
713                        Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
714                        Box::new(exclude_id.to_string()),
715                    ],
716                )
717            };
718
719        let refs: Vec<&dyn rusqlite::types::ToSql> =
720            params_vec.iter().map(|p| p.as_ref()).collect();
721
722        let mut stmt = conn.prepare(&sql).storage_err()?;
723
724        let ids = stmt
725            .query_map(refs.as_slice(), |row| row.get(0))
726            .storage_err()?
727            .collect::<Result<Vec<String>, _>>()
728            .storage_err()?;
729
730        Ok(ids)
731    }
732
733    // ── Graph Cleanup ───────────────────────────────────────────────
734
735    /// Delete all graph nodes, their edges, and their embeddings where the
736    /// node ID starts with the given prefix. Returns count of nodes deleted.
737    /// Wrapped in a transaction so all three DELETEs are atomic.
738    pub fn delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError> {
739        let conn = self.conn()?;
740        let like_pattern = format!("{prefix}%");
741
742        let tx = conn.unchecked_transaction().storage_err()?;
743
744        // Delete edges where src or dst matches prefix
745        tx.execute(
746            "DELETE FROM graph_edges WHERE src LIKE ?1 OR dst LIKE ?1",
747            params![like_pattern],
748        )
749        .storage_err()?;
750
751        // Delete embeddings for matching nodes
752        tx.execute(
753            "DELETE FROM memory_embeddings WHERE memory_id LIKE ?1",
754            params![like_pattern],
755        )
756        .storage_err()?;
757
758        // Delete the nodes themselves
759        let rows = tx
760            .execute(
761                "DELETE FROM graph_nodes WHERE id LIKE ?1",
762                params![like_pattern],
763            )
764            .storage_err()?;
765
766        tx.commit().storage_err()?;
767
768        Ok(rows)
769    }
770}
771
772#[cfg(test)]
773#[path = "tests/queries_tests.rs"]
774mod tests;