Skip to main content

codemem_storage/
queries.rs

1//! Stats, consolidation, pattern queries, and session management on Storage.
2
3use crate::{MapStorageErr, Storage};
4use codemem_core::{CodememError, ConsolidationLogEntry, Session, StorageStats};
5use rusqlite::params;
6
7impl Storage {
8    // ── Health / Diagnostics ────────────────────────────────────────────
9
10    /// Run SQLite `PRAGMA integrity_check`. Returns `true` if the database is OK.
11    pub fn integrity_check(&self) -> Result<bool, CodememError> {
12        let conn = self.conn()?;
13        let result: String = conn
14            .query_row("PRAGMA integrity_check", [], |row| row.get(0))
15            .storage_err()?;
16        Ok(result == "ok")
17    }
18
19    /// Return the current schema version (max applied migration number).
20    pub fn schema_version(&self) -> Result<u32, CodememError> {
21        let conn = self.conn()?;
22        let version: u32 = conn
23            .query_row(
24                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
25                [],
26                |row| row.get(0),
27            )
28            .storage_err()?;
29        Ok(version)
30    }
31
32    // ── Stats ───────────────────────────────────────────────────────────
33
34    /// Get database statistics in a single query.
35    pub fn stats(&self) -> Result<StorageStats, CodememError> {
36        let conn = self.conn()?;
37
38        let (memory_count, embedding_count, node_count, edge_count) = conn
39            .query_row(
40                "SELECT
41                    (SELECT COUNT(*) FROM memories) AS memory_count,
42                    (SELECT COUNT(*) FROM memory_embeddings) AS embedding_count,
43                    (SELECT COUNT(*) FROM graph_nodes) AS node_count,
44                    (SELECT COUNT(*) FROM graph_edges) AS edge_count",
45                [],
46                |row| {
47                    Ok((
48                        row.get::<_, i64>(0)?,
49                        row.get::<_, i64>(1)?,
50                        row.get::<_, i64>(2)?,
51                        row.get::<_, i64>(3)?,
52                    ))
53                },
54            )
55            .storage_err()?;
56
57        Ok(StorageStats {
58            memory_count: memory_count as usize,
59            embedding_count: embedding_count as usize,
60            node_count: node_count as usize,
61            edge_count: edge_count as usize,
62        })
63    }
64
65    // ── Consolidation Log ──────────────────────────────────────────────
66
67    /// Record a consolidation run.
68    pub fn insert_consolidation_log(
69        &self,
70        cycle_type: &str,
71        affected_count: usize,
72    ) -> Result<(), CodememError> {
73        let conn = self.conn()?;
74        let now = chrono::Utc::now().timestamp();
75        conn.execute(
76            "INSERT INTO consolidation_log (cycle_type, run_at, affected_count) VALUES (?1, ?2, ?3)",
77            params![cycle_type, now, affected_count as i64],
78        )
79        .storage_err()?;
80        Ok(())
81    }
82
83    /// Get the last consolidation run for each cycle type.
84    pub fn last_consolidation_runs(&self) -> Result<Vec<ConsolidationLogEntry>, CodememError> {
85        let conn = self.conn()?;
86        let mut stmt = conn
87            .prepare(
88                "SELECT cycle_type, run_at, affected_count FROM consolidation_log
89                 WHERE (cycle_type, run_at) IN (
90                     SELECT cycle_type, MAX(run_at) FROM consolidation_log GROUP BY cycle_type
91                 )
92                 ORDER BY cycle_type",
93            )
94            .storage_err()?;
95
96        let entries = stmt
97            .query_map([], |row| {
98                Ok(ConsolidationLogEntry {
99                    cycle_type: row.get(0)?,
100                    run_at: row.get(1)?,
101                    affected_count: row.get::<_, i64>(2)? as usize,
102                })
103            })
104            .storage_err()?
105            .collect::<Result<Vec<_>, _>>()
106            .storage_err()?;
107
108        Ok(entries)
109    }
110
111    // ── Pattern Detection Queries ───────────────────────────────────────
112
113    /// Find repeated search patterns (Grep/Glob) by extracting the "pattern" field
114    /// from memory metadata JSON. Returns (pattern, count, memory_ids) tuples where
115    /// count >= min_count, ordered by count descending.
116    pub fn get_repeated_searches(
117        &self,
118        min_count: usize,
119        namespace: Option<&str>,
120    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
121        let conn = self.conn()?;
122        let sql = if namespace.is_some() {
123            "SELECT json_extract(metadata, '$.pattern') AS pat,
124                    COUNT(*) AS cnt,
125                    GROUP_CONCAT(id, ',') AS ids
126             FROM memories
127             WHERE json_extract(metadata, '$.tool') IN ('Grep', 'Glob')
128               AND pat IS NOT NULL
129               AND namespace = ?1
130             GROUP BY pat
131             HAVING cnt >= ?2
132             ORDER BY cnt DESC"
133        } else {
134            "SELECT json_extract(metadata, '$.pattern') AS pat,
135                    COUNT(*) AS cnt,
136                    GROUP_CONCAT(id, ',') AS ids
137             FROM memories
138             WHERE json_extract(metadata, '$.tool') IN ('Grep', 'Glob')
139               AND pat IS NOT NULL
140             GROUP BY pat
141             HAVING cnt >= ?1
142             ORDER BY cnt DESC"
143        };
144
145        let mut stmt = conn.prepare(sql).storage_err()?;
146
147        let rows = if let Some(ns) = namespace {
148            stmt.query_map(params![ns, min_count as i64], |row| {
149                Ok((
150                    row.get::<_, String>(0)?,
151                    row.get::<_, i64>(1)?,
152                    row.get::<_, String>(2)?,
153                ))
154            })
155            .storage_err()?
156            .collect::<Result<Vec<_>, _>>()
157            .storage_err()?
158        } else {
159            stmt.query_map(params![min_count as i64], |row| {
160                Ok((
161                    row.get::<_, String>(0)?,
162                    row.get::<_, i64>(1)?,
163                    row.get::<_, String>(2)?,
164                ))
165            })
166            .storage_err()?
167            .collect::<Result<Vec<_>, _>>()
168            .storage_err()?
169        };
170
171        Ok(rows
172            .into_iter()
173            .map(|(pat, cnt, ids_str)| {
174                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
175                (pat, cnt as usize, ids)
176            })
177            .collect())
178    }
179
180    /// Find file hotspots by extracting the "file_path" field from memory metadata.
181    pub fn get_file_hotspots(
182        &self,
183        min_count: usize,
184        namespace: Option<&str>,
185    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
186        let conn = self.conn()?;
187        let sql = if namespace.is_some() {
188            "SELECT json_extract(metadata, '$.file_path') AS fp,
189                    COUNT(*) AS cnt,
190                    GROUP_CONCAT(id, ',') AS ids
191             FROM memories
192             WHERE fp IS NOT NULL
193               AND namespace = ?1
194             GROUP BY fp
195             HAVING cnt >= ?2
196             ORDER BY cnt DESC"
197        } else {
198            "SELECT json_extract(metadata, '$.file_path') AS fp,
199                    COUNT(*) AS cnt,
200                    GROUP_CONCAT(id, ',') AS ids
201             FROM memories
202             WHERE fp IS NOT NULL
203             GROUP BY fp
204             HAVING cnt >= ?1
205             ORDER BY cnt DESC"
206        };
207
208        let mut stmt = conn.prepare(sql).storage_err()?;
209
210        let rows = if let Some(ns) = namespace {
211            stmt.query_map(params![ns, min_count as i64], |row| {
212                Ok((
213                    row.get::<_, String>(0)?,
214                    row.get::<_, i64>(1)?,
215                    row.get::<_, String>(2)?,
216                ))
217            })
218            .storage_err()?
219            .collect::<Result<Vec<_>, _>>()
220            .storage_err()?
221        } else {
222            stmt.query_map(params![min_count as i64], |row| {
223                Ok((
224                    row.get::<_, String>(0)?,
225                    row.get::<_, i64>(1)?,
226                    row.get::<_, String>(2)?,
227                ))
228            })
229            .storage_err()?
230            .collect::<Result<Vec<_>, _>>()
231            .storage_err()?
232        };
233
234        Ok(rows
235            .into_iter()
236            .map(|(fp, cnt, ids_str)| {
237                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
238                (fp, cnt as usize, ids)
239            })
240            .collect())
241    }
242
243    /// Get tool usage statistics from memory metadata.
244    /// Returns (tool_name, count) pairs sorted by count descending.
245    pub fn get_tool_usage_stats(
246        &self,
247        namespace: Option<&str>,
248    ) -> Result<Vec<(String, usize)>, CodememError> {
249        let conn = self.conn()?;
250        let sql = if namespace.is_some() {
251            "SELECT json_extract(metadata, '$.tool') AS tool,
252                    COUNT(*) AS cnt
253             FROM memories
254             WHERE tool IS NOT NULL
255               AND namespace = ?1
256             GROUP BY tool
257             ORDER BY cnt DESC"
258        } else {
259            "SELECT json_extract(metadata, '$.tool') AS tool,
260                    COUNT(*) AS cnt
261             FROM memories
262             WHERE tool IS NOT NULL
263             GROUP BY tool
264             ORDER BY cnt DESC"
265        };
266
267        let mut stmt = conn.prepare(sql).storage_err()?;
268
269        let rows = if let Some(ns) = namespace {
270            stmt.query_map(params![ns], |row| {
271                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
272            })
273            .storage_err()?
274            .collect::<Result<Vec<_>, _>>()
275            .storage_err()?
276        } else {
277            stmt.query_map([], |row| {
278                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
279            })
280            .storage_err()?
281            .collect::<Result<Vec<_>, _>>()
282            .storage_err()?
283        };
284
285        Ok(rows
286            .into_iter()
287            .map(|(tool, cnt)| (tool, cnt as usize))
288            .collect())
289    }
290
291    /// Find decision chains: files with multiple Edit/Write memories over time.
292    pub fn get_decision_chains(
293        &self,
294        min_count: usize,
295        namespace: Option<&str>,
296    ) -> Result<Vec<(String, usize, Vec<String>)>, CodememError> {
297        let conn = self.conn()?;
298        let sql = if namespace.is_some() {
299            "SELECT json_extract(metadata, '$.file_path') AS fp,
300                    COUNT(*) AS cnt,
301                    GROUP_CONCAT(id, ',') AS ids
302             FROM memories
303             WHERE json_extract(metadata, '$.tool') IN ('Edit', 'Write')
304               AND fp IS NOT NULL
305               AND namespace = ?1
306             GROUP BY fp
307             HAVING cnt >= ?2
308             ORDER BY cnt DESC"
309        } else {
310            "SELECT json_extract(metadata, '$.file_path') AS fp,
311                    COUNT(*) AS cnt,
312                    GROUP_CONCAT(id, ',') AS ids
313             FROM memories
314             WHERE json_extract(metadata, '$.tool') IN ('Edit', 'Write')
315               AND fp IS NOT NULL
316             GROUP BY fp
317             HAVING cnt >= ?1
318             ORDER BY cnt DESC"
319        };
320
321        let mut stmt = conn.prepare(sql).storage_err()?;
322
323        let rows = if let Some(ns) = namespace {
324            stmt.query_map(params![ns, min_count as i64], |row| {
325                Ok((
326                    row.get::<_, String>(0)?,
327                    row.get::<_, i64>(1)?,
328                    row.get::<_, String>(2)?,
329                ))
330            })
331            .storage_err()?
332            .collect::<Result<Vec<_>, _>>()
333            .storage_err()?
334        } else {
335            stmt.query_map(params![min_count as i64], |row| {
336                Ok((
337                    row.get::<_, String>(0)?,
338                    row.get::<_, i64>(1)?,
339                    row.get::<_, String>(2)?,
340                ))
341            })
342            .storage_err()?
343            .collect::<Result<Vec<_>, _>>()
344            .storage_err()?
345        };
346
347        Ok(rows
348            .into_iter()
349            .map(|(fp, cnt, ids_str)| {
350                let ids: Vec<String> = ids_str.split(',').map(String::from).collect();
351                (fp, cnt as usize, ids)
352            })
353            .collect())
354    }
355
356    // ── Insight / Tag Queries ──────────────────────────────────────────
357
358    /// List memories that contain a specific tag, optionally scoped to a namespace.
359    /// Uses `json_each` for proper JSON array querying instead of LIKE patterns.
360    pub fn list_memories_by_tag(
361        &self,
362        tag: &str,
363        namespace: Option<&str>,
364        limit: usize,
365    ) -> Result<Vec<codemem_core::MemoryNode>, CodememError> {
366        let conn = self.conn()?;
367
368        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(ns) =
369            namespace
370        {
371            (
372                "SELECT m.id, m.content, m.memory_type, m.importance, m.confidence, m.access_count, \
373                 m.content_hash, m.tags, m.metadata, m.namespace, m.session_id, m.repo, m.git_ref, m.expires_at, m.created_at, m.updated_at, m.last_accessed_at \
374                 FROM memories m, json_each(m.tags) AS jt \
375                 WHERE jt.value = ?1 AND m.namespace = ?2 \
376                 AND (m.expires_at IS NULL OR m.expires_at > ?3) \
377                 ORDER BY m.created_at DESC LIMIT ?4"
378                    .to_string(),
379                vec![
380                    Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
381                    Box::new(ns.to_string()),
382                    Box::new(chrono::Utc::now().timestamp()),
383                    Box::new(limit as i64),
384                ],
385            )
386        } else {
387            (
388                "SELECT m.id, m.content, m.memory_type, m.importance, m.confidence, m.access_count, \
389                 m.content_hash, m.tags, m.metadata, m.namespace, m.session_id, m.repo, m.git_ref, m.expires_at, m.created_at, m.updated_at, m.last_accessed_at \
390                 FROM memories m, json_each(m.tags) AS jt \
391                 WHERE jt.value = ?1 \
392                 AND (m.expires_at IS NULL OR m.expires_at > ?2) \
393                 ORDER BY m.created_at DESC LIMIT ?3"
394                    .to_string(),
395                vec![
396                    Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
397                    Box::new(chrono::Utc::now().timestamp()),
398                    Box::new(limit as i64),
399                ],
400            )
401        };
402
403        let params_refs: Vec<&dyn rusqlite::types::ToSql> =
404            params_vec.iter().map(|b| &**b).collect();
405
406        let mut stmt = conn.prepare(&sql).storage_err()?;
407
408        let rows = stmt
409            .query_map(params_refs.as_slice(), |row| {
410                let expires_ts: Option<i64> = row.get(13)?;
411                let created_ts: i64 = row.get(14)?;
412                let updated_ts: i64 = row.get(15)?;
413                let accessed_ts: i64 = row.get(16)?;
414                let tags_json: String = row.get(7)?;
415                let metadata_json: String = row.get(8)?;
416                let memory_type_str: String = row.get(2)?;
417
418                Ok(codemem_core::MemoryNode {
419                    id: row.get(0)?,
420                    content: row.get(1)?,
421                    memory_type: memory_type_str
422                        .parse()
423                        .unwrap_or(codemem_core::MemoryType::Context),
424                    importance: row.get(3)?,
425                    confidence: row.get(4)?,
426                    access_count: row.get::<_, i64>(5).unwrap_or(0) as u32,
427                    content_hash: row.get(6)?,
428                    tags: serde_json::from_str(&tags_json).unwrap_or_default(),
429                    metadata: serde_json::from_str(&metadata_json).unwrap_or_default(),
430                    namespace: row.get(9)?,
431                    session_id: row.get(10)?,
432                    repo: row.get(11)?,
433                    git_ref: row.get(12)?,
434                    expires_at: expires_ts
435                        .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0))
436                        .map(|dt| dt.with_timezone(&chrono::Utc)),
437                    created_at: chrono::DateTime::from_timestamp(created_ts, 0)
438                        .unwrap_or_default()
439                        .with_timezone(&chrono::Utc),
440                    updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
441                        .unwrap_or_default()
442                        .with_timezone(&chrono::Utc),
443                    last_accessed_at: chrono::DateTime::from_timestamp(accessed_ts, 0)
444                        .unwrap_or_default()
445                        .with_timezone(&chrono::Utc),
446                })
447            })
448            .storage_err()?
449            .collect::<Result<Vec<_>, _>>()
450            .storage_err()?;
451
452        Ok(rows)
453    }
454
455    // ── Session Management ─────────────────────────────────────────────
456
457    /// Start a new session.
458    pub fn start_session(&self, id: &str, namespace: Option<&str>) -> Result<(), CodememError> {
459        let conn = self.conn()?;
460        let now = chrono::Utc::now().timestamp();
461        conn.execute(
462            "INSERT OR IGNORE INTO sessions (id, namespace, started_at) VALUES (?1, ?2, ?3)",
463            params![id, namespace, now],
464        )
465        .storage_err()?;
466        Ok(())
467    }
468
469    /// End a session by setting ended_at and optionally a summary.
470    pub fn end_session(&self, id: &str, summary: Option<&str>) -> Result<(), CodememError> {
471        let conn = self.conn()?;
472        let now = chrono::Utc::now().timestamp();
473        conn.execute(
474            "UPDATE sessions SET ended_at = ?1, summary = ?2 WHERE id = ?3",
475            params![now, summary, id],
476        )
477        .storage_err()?;
478        Ok(())
479    }
480
481    /// List sessions, optionally filtered by namespace.
482    pub fn list_sessions(&self, namespace: Option<&str>) -> Result<Vec<Session>, CodememError> {
483        self.list_sessions_with_limit(namespace, usize::MAX)
484    }
485
486    // ── Session Activity Tracking ─────────────────────────────────
487
488    /// Record a session activity event.
489    pub fn record_session_activity(
490        &self,
491        session_id: &str,
492        tool_name: &str,
493        file_path: Option<&str>,
494        directory: Option<&str>,
495        pattern: Option<&str>,
496    ) -> Result<(), CodememError> {
497        let conn = self.conn()?;
498        let now = chrono::Utc::now().timestamp();
499        conn.execute(
500            "INSERT INTO session_activity (session_id, tool_name, file_path, directory, pattern, created_at)
501             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
502            params![session_id, tool_name, file_path, directory, pattern, now],
503        )
504        .storage_err()?;
505        Ok(())
506    }
507
508    /// Get a summary of session activity counts using a single query with conditional aggregation.
509    pub fn get_session_activity_summary(
510        &self,
511        session_id: &str,
512    ) -> Result<codemem_core::SessionActivitySummary, CodememError> {
513        let conn = self.conn()?;
514
515        let (files_read, files_edited, searches, total_actions) = conn
516            .query_row(
517                "SELECT
518                     COUNT(DISTINCT CASE WHEN tool_name = 'Read' AND file_path IS NOT NULL THEN file_path END),
519                     COUNT(DISTINCT CASE WHEN tool_name IN ('Edit', 'Write') AND file_path IS NOT NULL THEN file_path END),
520                     SUM(CASE WHEN tool_name IN ('Grep', 'Glob') THEN 1 ELSE 0 END),
521                     COUNT(*)
522                 FROM session_activity
523                 WHERE session_id = ?1",
524                params![session_id],
525                |row| {
526                    Ok((
527                        row.get::<_, i64>(0)?,
528                        row.get::<_, i64>(1)?,
529                        row.get::<_, i64>(2)?,
530                        row.get::<_, i64>(3)?,
531                    ))
532                },
533            )
534            .storage_err()?;
535
536        Ok(codemem_core::SessionActivitySummary {
537            files_read: files_read as usize,
538            files_edited: files_edited as usize,
539            searches: searches as usize,
540            total_actions: total_actions as usize,
541        })
542    }
543
544    /// Get the most active directories in a session.
545    pub fn get_session_hot_directories(
546        &self,
547        session_id: &str,
548        limit: usize,
549    ) -> Result<Vec<(String, usize)>, CodememError> {
550        let conn = self.conn()?;
551        let mut stmt = conn
552            .prepare(
553                "SELECT directory, COUNT(*) AS cnt FROM session_activity
554                 WHERE session_id = ?1 AND directory IS NOT NULL
555                 GROUP BY directory ORDER BY cnt DESC LIMIT ?2",
556            )
557            .storage_err()?;
558
559        let rows = stmt
560            .query_map(params![session_id, limit as i64], |row| {
561                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
562            })
563            .storage_err()?
564            .collect::<Result<Vec<_>, _>>()
565            .storage_err()?;
566
567        Ok(rows
568            .into_iter()
569            .map(|(dir, cnt)| (dir, cnt as usize))
570            .collect())
571    }
572
573    /// Check whether an auto-insight dedup tag exists for a session.
574    /// Uses `json_extract` with proper parameter binding instead of LIKE on JSON.
575    pub fn has_auto_insight(
576        &self,
577        session_id: &str,
578        dedup_tag: &str,
579    ) -> Result<bool, CodememError> {
580        let conn = self.conn()?;
581        let count: i64 = conn
582            .query_row(
583                "SELECT COUNT(*) FROM memories
584                 WHERE json_extract(metadata, '$.session_id') = ?1
585                   AND json_extract(metadata, '$.auto_insight_tag') = ?2",
586                params![session_id, dedup_tag],
587                |row| row.get(0),
588            )
589            .storage_err()?;
590        Ok(count > 0)
591    }
592
593    /// Count Read events in a directory during a session.
594    pub fn count_directory_reads(
595        &self,
596        session_id: &str,
597        directory: &str,
598    ) -> Result<usize, CodememError> {
599        let conn = self.conn()?;
600        let count: i64 = conn
601            .query_row(
602                "SELECT COUNT(*) FROM session_activity
603                 WHERE session_id = ?1 AND tool_name = 'Read' AND directory = ?2",
604                params![session_id, directory],
605                |row| row.get(0),
606            )
607            .storage_err()?;
608        Ok(count as usize)
609    }
610
611    /// Check if a file was read in the current session.
612    pub fn was_file_read_in_session(
613        &self,
614        session_id: &str,
615        file_path: &str,
616    ) -> Result<bool, CodememError> {
617        let conn = self.conn()?;
618        let count: i64 = conn
619            .query_row(
620                "SELECT COUNT(*) FROM session_activity
621                 WHERE session_id = ?1 AND tool_name = 'Read' AND file_path = ?2",
622                params![session_id, file_path],
623                |row| row.get(0),
624            )
625            .storage_err()?;
626        Ok(count > 0)
627    }
628
629    /// Count how many times a search pattern was used in a session.
630    pub fn count_search_pattern_in_session(
631        &self,
632        session_id: &str,
633        pattern: &str,
634    ) -> Result<usize, CodememError> {
635        let conn = self.conn()?;
636        let count: i64 = conn
637            .query_row(
638                "SELECT COUNT(*) FROM session_activity
639                 WHERE session_id = ?1 AND tool_name IN ('Grep', 'Glob') AND pattern = ?2",
640                params![session_id, pattern],
641                |row| row.get(0),
642            )
643            .storage_err()?;
644        Ok(count as usize)
645    }
646
647    /// List sessions with a limit.
648    pub(crate) fn list_sessions_with_limit(
649        &self,
650        namespace: Option<&str>,
651        limit: usize,
652    ) -> Result<Vec<Session>, CodememError> {
653        let conn = self.conn()?;
654        let sql_with_ns = "SELECT s.id, s.namespace, s.started_at, s.ended_at, (SELECT COUNT(*) FROM memories m WHERE m.session_id = s.id) as memory_count, s.summary FROM sessions s WHERE s.namespace = ?1 ORDER BY s.started_at DESC LIMIT ?2";
655        let sql_all = "SELECT s.id, s.namespace, s.started_at, s.ended_at, (SELECT COUNT(*) FROM memories m WHERE m.session_id = s.id) as memory_count, s.summary FROM sessions s ORDER BY s.started_at DESC LIMIT ?1";
656
657        let map_row = |row: &rusqlite::Row<'_>| -> rusqlite::Result<Session> {
658            let started_ts: i64 = row.get(2)?;
659            let ended_ts: Option<i64> = row.get(3)?;
660            Ok(Session {
661                id: row.get(0)?,
662                namespace: row.get(1)?,
663                started_at: chrono::DateTime::from_timestamp(started_ts, 0)
664                    .unwrap_or_default()
665                    .with_timezone(&chrono::Utc),
666                ended_at: ended_ts.and_then(|ts| {
667                    chrono::DateTime::from_timestamp(ts, 0).map(|dt| dt.with_timezone(&chrono::Utc))
668                }),
669                memory_count: row.get::<_, i64>(4).unwrap_or(0) as u32,
670                summary: row.get(5)?,
671            })
672        };
673
674        if let Some(ns) = namespace {
675            let mut stmt = conn.prepare(sql_with_ns).storage_err()?;
676            let rows = stmt
677                .query_map(params![ns, limit as i64], map_row)
678                .storage_err()?;
679            rows.collect::<Result<Vec<_>, _>>().storage_err()
680        } else {
681            let mut stmt = conn.prepare(sql_all).storage_err()?;
682            let rows = stmt
683                .query_map(params![limit as i64], map_row)
684                .storage_err()?;
685            rows.collect::<Result<Vec<_>, _>>().storage_err()
686        }
687    }
688
689    // ── Tag-based Queries ─────────────────────────────────────────
690
691    /// Find memory IDs whose tags JSON array contains the given tag value.
692    /// Optionally scoped to a namespace. Excludes the given `exclude_id`.
693    /// Returns at most 50 results ordered by creation time (most recent siblings first).
694    pub fn find_memory_ids_by_tag(
695        &self,
696        tag: &str,
697        namespace: Option<&str>,
698        exclude_id: &str,
699    ) -> Result<Vec<String>, CodememError> {
700        let conn = self.conn()?;
701
702        // Use json_each() for exact tag matching instead of LIKE (safe against %, _, " in tags).
703        let (sql, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
704            if let Some(ns) = namespace {
705                (
706                    "SELECT DISTINCT m.id FROM memories m, json_each(m.tags) t \
707                 WHERE t.value = ?1 AND m.namespace IS ?2 AND m.id != ?3 \
708                 ORDER BY m.created_at DESC LIMIT 50"
709                        .to_string(),
710                    vec![
711                        Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
712                        Box::new(ns.to_string()),
713                        Box::new(exclude_id.to_string()),
714                    ],
715                )
716            } else {
717                (
718                    "SELECT DISTINCT m.id FROM memories m, json_each(m.tags) t \
719                 WHERE t.value = ?1 AND m.namespace IS NULL AND m.id != ?2 \
720                 ORDER BY m.created_at DESC LIMIT 50"
721                        .to_string(),
722                    vec![
723                        Box::new(tag.to_string()) as Box<dyn rusqlite::types::ToSql>,
724                        Box::new(exclude_id.to_string()),
725                    ],
726                )
727            };
728
729        let refs: Vec<&dyn rusqlite::types::ToSql> =
730            params_vec.iter().map(|p| p.as_ref()).collect();
731
732        let mut stmt = conn.prepare(&sql).storage_err()?;
733
734        let ids = stmt
735            .query_map(refs.as_slice(), |row| row.get(0))
736            .storage_err()?
737            .collect::<Result<Vec<String>, _>>()
738            .storage_err()?;
739
740        Ok(ids)
741    }
742
743    // ── Graph Cleanup ───────────────────────────────────────────────
744
745    /// Delete all graph nodes, their edges, and their embeddings where the
746    /// node ID starts with the given prefix. Returns count of nodes deleted.
747    /// Wrapped in a transaction so all three DELETEs are atomic.
748    pub fn delete_graph_nodes_by_prefix(&self, prefix: &str) -> Result<usize, CodememError> {
749        let conn = self.conn()?;
750        let like_pattern = format!("{prefix}%");
751
752        let tx = conn.unchecked_transaction().storage_err()?;
753
754        // Delete edges where src or dst matches prefix
755        tx.execute(
756            "DELETE FROM graph_edges WHERE src LIKE ?1 OR dst LIKE ?1",
757            params![like_pattern],
758        )
759        .storage_err()?;
760
761        // Delete embeddings for matching nodes
762        tx.execute(
763            "DELETE FROM memory_embeddings WHERE memory_id LIKE ?1",
764            params![like_pattern],
765        )
766        .storage_err()?;
767
768        // Delete the nodes themselves
769        let rows = tx
770            .execute(
771                "DELETE FROM graph_nodes WHERE id LIKE ?1",
772                params![like_pattern],
773            )
774            .storage_err()?;
775
776        tx.commit().storage_err()?;
777
778        Ok(rows)
779    }
780}
781
782#[cfg(test)]
783#[path = "tests/queries_tests.rs"]
784mod tests;