Skip to main content

codex_recall/
store.rs

1use crate::memory::{extract_memories, ExtractedMemory, MemoryKind};
2use crate::parser::{EventKind, ParsedSession};
3use anyhow::{bail, Context, Result};
4use rusqlite::{params, params_from_iter, Connection, OpenFlags};
5use std::collections::{hash_map::Entry, BTreeSet, HashMap};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8
9const CONTENT_VERSION: i64 = 3;
10
11pub struct Store {
12    conn: Connection,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct Stats {
17    pub session_count: u64,
18    pub event_count: u64,
19    pub source_file_count: u64,
20    pub duplicate_source_file_count: u64,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct SearchResult {
25    pub session_key: String,
26    pub session_id: String,
27    pub repo: String,
28    pub kind: EventKind,
29    pub text: String,
30    pub snippet: String,
31    pub score: f64,
32    pub session_timestamp: String,
33    pub cwd: String,
34    pub source_file_path: PathBuf,
35    pub source_line_number: usize,
36    pub source_timestamp: Option<String>,
37    pub session_hit_count: usize,
38    pub best_kind_weight: u8,
39    pub repo_matches_current: bool,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct SearchOptions {
44    pub query: String,
45    pub limit: usize,
46    pub repo: Option<String>,
47    pub cwd: Option<String>,
48    pub since: Option<String>,
49    pub from: Option<String>,
50    pub until: Option<String>,
51    pub include_duplicates: bool,
52    pub exclude_sessions: Vec<String>,
53    pub kinds: Vec<EventKind>,
54    pub current_repo: Option<String>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum MatchStrategy {
59    AllTerms,
60    AnyTermFallback,
61}
62
63impl MatchStrategy {
64    pub fn as_str(self) -> &'static str {
65        match self {
66            MatchStrategy::AllTerms => "all_terms",
67            MatchStrategy::AnyTermFallback => "any_terms_fallback",
68        }
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct SearchTrace {
74    pub match_strategy: MatchStrategy,
75    pub query_terms: Vec<String>,
76    pub fts_query: String,
77    pub fetch_limit: usize,
78    pub current_repo: Option<String>,
79    pub include_duplicates: bool,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct MemoryObject {
84    pub id: String,
85    pub kind: MemoryKind,
86    pub summary: String,
87    pub normalized_text: String,
88    pub first_seen_at: String,
89    pub last_seen_at: String,
90    pub created_at: String,
91    pub updated_at: String,
92    pub evidence_count: usize,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct MemoryEvidence {
97    pub memory_id: String,
98    pub session_key: String,
99    pub session_id: String,
100    pub repo: String,
101    pub cwd: String,
102    pub session_timestamp: String,
103    pub source_file_path: PathBuf,
104    pub source_line_number: usize,
105    pub source_timestamp: Option<String>,
106    pub event_kind: EventKind,
107    pub evidence_text: String,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct MemorySearchOptions {
112    pub query: Option<String>,
113    pub limit: usize,
114    pub repo: Option<String>,
115    pub cwd: Option<String>,
116    pub since: Option<String>,
117    pub from: Option<String>,
118    pub until: Option<String>,
119    pub kinds: Vec<MemoryKind>,
120}
121
122impl Default for MemorySearchOptions {
123    fn default() -> Self {
124        Self {
125            query: None,
126            limit: 20,
127            repo: None,
128            cwd: None,
129            since: None,
130            from: None,
131            until: None,
132            kinds: Vec::new(),
133        }
134    }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct MemoryResult {
139    pub object: MemoryObject,
140    pub repos: Vec<String>,
141    pub session_keys: Vec<String>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub enum DeltaItem {
146    Session {
147        change_id: i64,
148        action: String,
149        session_key: String,
150        session_id: String,
151        repo: String,
152        cwd: String,
153        session_timestamp: String,
154        updated_at: String,
155    },
156    Memory {
157        change_id: i64,
158        action: String,
159        object: MemoryObject,
160        repos: Vec<String>,
161        session_keys: Vec<String>,
162    },
163    Deleted {
164        change_id: i64,
165        object_type: String,
166        object_id: String,
167        action: String,
168        updated_at: String,
169    },
170}
171
172impl DeltaItem {
173    pub fn updated_at(&self) -> &str {
174        match self {
175            DeltaItem::Session { updated_at, .. } => updated_at,
176            DeltaItem::Memory { object, .. } => &object.updated_at,
177            DeltaItem::Deleted { updated_at, .. } => updated_at,
178        }
179    }
180
181    pub fn change_id(&self) -> i64 {
182        match self {
183            DeltaItem::Session { change_id, .. } => *change_id,
184            DeltaItem::Memory { change_id, .. } => *change_id,
185            DeltaItem::Deleted { change_id, .. } => *change_id,
186        }
187    }
188
189    pub fn change_kind(&self) -> &str {
190        match self {
191            DeltaItem::Session { .. } => "session",
192            DeltaItem::Memory { .. } => "memory",
193            DeltaItem::Deleted { object_type, .. } => object_type,
194        }
195    }
196
197    pub fn action(&self) -> &str {
198        match self {
199            DeltaItem::Session { action, .. } => action,
200            DeltaItem::Memory { action, .. } => action,
201            DeltaItem::Deleted { action, .. } => action,
202        }
203    }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct ResourceRecord {
208    pub uri: String,
209    pub name: String,
210    pub description: String,
211    pub mime_type: String,
212    pub object_type: String,
213    pub updated_at: String,
214}
215
216impl SearchOptions {
217    pub fn new(query: impl Into<String>, limit: usize) -> Self {
218        Self {
219            query: query.into(),
220            limit,
221            repo: None,
222            cwd: None,
223            since: None,
224            from: None,
225            until: None,
226            include_duplicates: false,
227            exclude_sessions: Vec::new(),
228            kinds: Vec::new(),
229            current_repo: None,
230        }
231    }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq)]
235pub struct SessionEvent {
236    pub session_key: String,
237    pub session_id: String,
238    pub kind: EventKind,
239    pub text: String,
240    pub cwd: String,
241    pub source_file_path: PathBuf,
242    pub source_line_number: usize,
243    pub source_timestamp: Option<String>,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct SessionMatch {
248    pub session_key: String,
249    pub session_id: String,
250    pub cwd: String,
251    pub repo: String,
252    pub source_file_path: PathBuf,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct RecentSession {
257    pub session_key: String,
258    pub session_id: String,
259    pub repo: String,
260    pub cwd: String,
261    pub session_timestamp: String,
262    pub source_file_path: PathBuf,
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub struct RecentOptions {
267    pub limit: usize,
268    pub repo: Option<String>,
269    pub cwd: Option<String>,
270    pub since: Option<String>,
271    pub from: Option<String>,
272    pub until: Option<String>,
273    pub include_duplicates: bool,
274    pub exclude_sessions: Vec<String>,
275    pub kinds: Vec<EventKind>,
276}
277
278impl Default for RecentOptions {
279    fn default() -> Self {
280        Self {
281            limit: 20,
282            repo: None,
283            cwd: None,
284            since: None,
285            from: None,
286            until: None,
287            include_duplicates: false,
288            exclude_sessions: Vec::new(),
289            kinds: Vec::new(),
290        }
291    }
292}
293
294struct OldSessionRow {
295    session_id: String,
296    session_timestamp: String,
297    cwd: String,
298    repo: String,
299    cli_version: Option<String>,
300    source_file_path: String,
301}
302
303struct OldEventRow {
304    session_id: String,
305    kind: String,
306    role: Option<String>,
307    text: String,
308    command: Option<String>,
309    cwd: Option<String>,
310    exit_code: Option<i64>,
311    source_timestamp: Option<String>,
312    source_file_path: String,
313    source_line_number: i64,
314}
315
316impl Store {
317    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
318        let path = path.as_ref();
319        if let Some(parent) = path.parent() {
320            std::fs::create_dir_all(parent)
321                .with_context(|| format!("create db directory {}", parent.display()))?;
322        }
323
324        let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
325        configure_write_connection(&conn)?;
326        let store = Self { conn };
327        store.init_schema()?;
328        Ok(store)
329    }
330
331    pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
332        let path = path.as_ref();
333        let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
334            .with_context(|| format!("open db read-only {}", path.display()))?;
335        configure_read_connection(&conn)?;
336        Ok(Self { conn })
337    }
338
339    pub fn index_session(&self, parsed: &ParsedSession) -> Result<()> {
340        self.conn.execute("BEGIN IMMEDIATE", [])?;
341        let result = self.index_session_inner(parsed);
342        match result {
343            Ok(()) => {
344                self.conn.execute("COMMIT", [])?;
345                Ok(())
346            }
347            Err(error) => {
348                let _ = self.conn.execute("ROLLBACK", []);
349                Err(error)
350            }
351        }
352    }
353
354    pub fn stats(&self) -> Result<Stats> {
355        let session_count = self
356            .conn
357            .query_row("SELECT COUNT(*) FROM sessions", [], |row| {
358                row.get::<_, u64>(0)
359            })?;
360        let event_count = self
361            .conn
362            .query_row("SELECT COUNT(*) FROM events", [], |row| {
363                row.get::<_, u64>(0)
364            })?;
365        let source_file_count =
366            self.conn
367                .query_row("SELECT COUNT(*) FROM ingestion_state", [], |row| {
368                    row.get::<_, u64>(0)
369                })?;
370        let unique_ingested_sessions = self.conn.query_row(
371            "SELECT COUNT(DISTINCT COALESCE(session_key, session_id)) FROM ingestion_state WHERE COALESCE(session_key, session_id) IS NOT NULL",
372            [],
373            |row| row.get::<_, u64>(0),
374        )?;
375
376        Ok(Stats {
377            session_count,
378            event_count,
379            source_file_count,
380            duplicate_source_file_count: source_file_count.saturating_sub(unique_ingested_sessions),
381        })
382    }
383
384    pub fn quick_check(&self) -> Result<String> {
385        self.conn
386            .query_row("PRAGMA quick_check", [], |row| row.get::<_, String>(0))
387            .map_err(Into::into)
388    }
389
390    pub fn fts_integrity_check(&self) -> Result<()> {
391        self.conn.execute(
392            "INSERT INTO events_fts(events_fts) VALUES('integrity-check')",
393            [],
394        )?;
395        Ok(())
396    }
397
398    pub fn fts_read_check(&self) -> Result<()> {
399        self.conn
400            .query_row("SELECT COUNT(*) FROM events_fts", [], |row| {
401                row.get::<_, i64>(0)
402            })
403            .map(|_| ())
404            .map_err(Into::into)
405    }
406
407    pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
408        self.search_with_options(SearchOptions::new(query, limit))
409    }
410
411    pub fn search_with_options(&self, options: SearchOptions) -> Result<Vec<SearchResult>> {
412        self.search_with_trace(options).map(|(_, results)| results)
413    }
414
415    pub fn search_with_trace(
416        &self,
417        options: SearchOptions,
418    ) -> Result<(SearchTrace, Vec<SearchResult>)> {
419        let terms = fts_terms(&options.query);
420        if terms.is_empty() {
421            return Ok((
422                SearchTrace {
423                    match_strategy: MatchStrategy::AllTerms,
424                    query_terms: Vec::new(),
425                    fts_query: String::new(),
426                    fetch_limit: 0,
427                    current_repo: options.current_repo.clone(),
428                    include_duplicates: options.include_duplicates,
429                },
430                Vec::new(),
431            ));
432        }
433
434        let limit = options.limit.clamp(1, 100);
435        let fetch_limit = limit.saturating_mul(50).clamp(200, 1_000);
436        let all_terms_query = and_fts_query(&terms);
437        let results = self.search_with_fts_query(&options, &all_terms_query, fetch_limit)?;
438        if !results.is_empty() || terms.len() == 1 {
439            return Ok((
440                SearchTrace {
441                    match_strategy: MatchStrategy::AllTerms,
442                    query_terms: terms,
443                    fts_query: all_terms_query,
444                    fetch_limit,
445                    current_repo: options.current_repo.clone(),
446                    include_duplicates: options.include_duplicates,
447                },
448                rank_search_results(
449                    results,
450                    options.current_repo.as_deref(),
451                    limit,
452                    options.include_duplicates,
453                ),
454            ));
455        }
456
457        let any_terms_query = or_fts_query(&terms);
458        let results = self.search_with_fts_query(&options, &any_terms_query, fetch_limit)?;
459        Ok((
460            SearchTrace {
461                match_strategy: MatchStrategy::AnyTermFallback,
462                query_terms: terms,
463                fts_query: any_terms_query,
464                fetch_limit,
465                current_repo: options.current_repo.clone(),
466                include_duplicates: options.include_duplicates,
467            },
468            rank_search_results(
469                results,
470                options.current_repo.as_deref(),
471                limit,
472                options.include_duplicates,
473            ),
474        ))
475    }
476
477    fn search_with_fts_query(
478        &self,
479        options: &SearchOptions,
480        fts_query: &str,
481        limit: usize,
482    ) -> Result<Vec<SearchResult>> {
483        let mut query_params = Vec::<String>::new();
484        let current_repo_expr = if let Some(current_repo) = &options.current_repo {
485            query_params.push(current_repo.clone());
486            "EXISTS (
487                    SELECT 1 FROM session_repos current_repos
488                    WHERE current_repos.session_key = sessions.session_key
489                      AND lower(current_repos.repo) = lower(?)
490                )"
491        } else {
492            "0"
493        };
494        query_params.push(fts_query.to_owned());
495
496        let mut sql = format!(
497            r#"
498            SELECT
499                events.session_key,
500                events.session_id,
501                sessions.repo,
502                events.kind,
503                events.text,
504                snippet(events_fts, 4, '', '', ' ... ', 16) AS snippet,
505                events_fts.rank AS score,
506                sessions.session_timestamp,
507                sessions.cwd,
508                events.source_file_path,
509                events.source_line_number,
510                events.source_timestamp,
511                {current_repo_expr} AS current_repo_match
512            FROM events_fts
513            JOIN events ON events.id = events_fts.event_id
514            JOIN sessions ON sessions.session_key = events.session_key
515            WHERE events_fts MATCH ?
516            "#,
517        );
518
519        if let Some(repo) = &options.repo {
520            sql.push_str(
521                r#"
522                AND EXISTS (
523                    SELECT 1 FROM session_repos filter_repos
524                    WHERE filter_repos.session_key = sessions.session_key
525                      AND lower(filter_repos.repo) = lower(?)
526                )
527                "#,
528            );
529            query_params.push(repo.clone());
530        }
531        if let Some(cwd) = &options.cwd {
532            sql.push_str(
533                r#"
534                AND (
535                    sessions.cwd LIKE '%' || ? || '%'
536                    OR EXISTS (
537                        SELECT 1 FROM events cwd_events
538                        WHERE cwd_events.session_key = sessions.session_key
539                          AND cwd_events.cwd LIKE '%' || ? || '%'
540                    )
541                )
542                "#,
543            );
544            query_params.push(cwd.clone());
545            query_params.push(cwd.clone());
546        }
547        append_from_until_clauses(
548            &mut sql,
549            &mut query_params,
550            options.since.as_ref(),
551            options.from.as_ref(),
552            options.until.as_ref(),
553        )?;
554        append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
555        append_event_kind_clause(&mut sql, &mut query_params, "events.kind", &options.kinds);
556
557        sql.push_str(" ORDER BY events_fts.rank ASC, events.source_line_number ASC LIMIT ");
558        sql.push_str(&limit.to_string());
559
560        let mut statement = self.conn.prepare(&sql)?;
561
562        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
563            let kind_text: String = row.get(3)?;
564            let kind = kind_text.parse::<EventKind>().map_err(|_| {
565                rusqlite::Error::InvalidColumnType(
566                    3,
567                    "kind".to_owned(),
568                    rusqlite::types::Type::Text,
569                )
570            })?;
571            let source_file_path: String = row.get(9)?;
572            let source_line_number: i64 = row.get(10)?;
573
574            Ok(SearchResult {
575                session_key: row.get(0)?,
576                session_id: row.get(1)?,
577                repo: row.get(2)?,
578                kind,
579                text: row.get(4)?,
580                snippet: row.get(5)?,
581                score: row.get(6)?,
582                session_timestamp: row.get(7)?,
583                cwd: row.get(8)?,
584                source_file_path: PathBuf::from(source_file_path),
585                source_line_number: source_line_number as usize,
586                source_timestamp: row.get(11)?,
587                session_hit_count: 0,
588                best_kind_weight: 0,
589                repo_matches_current: row.get::<_, i64>(12)? != 0,
590            })
591        })?;
592
593        rows.collect::<std::result::Result<Vec<_>, _>>()
594            .map_err(Into::into)
595    }
596
597    pub fn resolve_session_reference(&self, reference: &str) -> Result<Vec<SessionMatch>> {
598        let mut statement = self.conn.prepare(
599            r#"
600            SELECT session_key, session_id, cwd, repo, source_file_path
601            FROM sessions
602            WHERE session_key = ? OR session_id = ?
603            ORDER BY session_timestamp DESC, source_file_path ASC
604            "#,
605        )?;
606        let rows = statement.query_map(params![reference, reference], |row| {
607            let source_file_path: String = row.get(4)?;
608            Ok(SessionMatch {
609                session_key: row.get(0)?,
610                session_id: row.get(1)?,
611                cwd: row.get(2)?,
612                repo: row.get(3)?,
613                source_file_path: PathBuf::from(source_file_path),
614            })
615        })?;
616
617        rows.collect::<std::result::Result<Vec<_>, _>>()
618            .map_err(Into::into)
619    }
620
621    pub fn recent_sessions(&self, options: RecentOptions) -> Result<Vec<RecentSession>> {
622        let limit = options.limit.clamp(1, 100);
623        let fetch_limit = if options.include_duplicates {
624            limit
625        } else {
626            limit.saturating_mul(5).clamp(limit, 500)
627        };
628        let mut query_params = Vec::<String>::new();
629        let mut sql = r#"
630            SELECT
631                sessions.session_key,
632                sessions.session_id,
633                sessions.repo,
634                sessions.cwd,
635                sessions.session_timestamp,
636                sessions.source_file_path
637            FROM sessions
638            WHERE 1 = 1
639            "#
640        .to_owned();
641
642        if let Some(repo) = &options.repo {
643            sql.push_str(
644                r#"
645                AND EXISTS (
646                    SELECT 1 FROM session_repos filter_repos
647                    WHERE filter_repos.session_key = sessions.session_key
648                      AND lower(filter_repos.repo) = lower(?)
649                )
650                "#,
651            );
652            query_params.push(repo.clone());
653        }
654        if let Some(cwd) = &options.cwd {
655            sql.push_str(
656                r#"
657                AND (
658                    sessions.cwd LIKE '%' || ? || '%'
659                    OR EXISTS (
660                        SELECT 1 FROM events cwd_events
661                        WHERE cwd_events.session_key = sessions.session_key
662                          AND cwd_events.cwd LIKE '%' || ? || '%'
663                    )
664                )
665                "#,
666            );
667            query_params.push(cwd.clone());
668            query_params.push(cwd.clone());
669        }
670        append_from_until_clauses(
671            &mut sql,
672            &mut query_params,
673            options.since.as_ref(),
674            options.from.as_ref(),
675            options.until.as_ref(),
676        )?;
677        append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
678        append_recent_event_kind_clause(&mut sql, &mut query_params, &options.kinds);
679
680        sql.push_str(
681            r#"
682            ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
683                     sessions.source_file_path ASC
684            LIMIT ?
685            "#,
686        );
687        query_params.push(fetch_limit.to_string());
688
689        let mut statement = self.conn.prepare(&sql)?;
690        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
691            let source_file_path: String = row.get(5)?;
692            Ok(RecentSession {
693                session_key: row.get(0)?,
694                session_id: row.get(1)?,
695                repo: row.get(2)?,
696                cwd: row.get(3)?,
697                session_timestamp: row.get(4)?,
698                source_file_path: PathBuf::from(source_file_path),
699            })
700        })?;
701
702        let mut sessions = rows.collect::<std::result::Result<Vec<_>, _>>()?;
703        if !options.include_duplicates {
704            sessions = dedupe_recent_sessions(sessions);
705        }
706        sessions.truncate(limit);
707        Ok(sessions)
708    }
709
710    pub fn session_events(&self, session_key: &str, limit: usize) -> Result<Vec<SessionEvent>> {
711        self.session_events_with_kinds(session_key, limit, &[])
712    }
713
714    pub fn session_events_with_kinds(
715        &self,
716        session_key: &str,
717        limit: usize,
718        kinds: &[EventKind],
719    ) -> Result<Vec<SessionEvent>> {
720        let limit = limit.clamp(1, 500);
721        let mut query_params = vec![session_key.to_owned()];
722        let mut sql = r#"
723            SELECT
724                events.session_key,
725                events.session_id,
726                events.kind,
727                events.text,
728                sessions.cwd,
729                events.source_file_path,
730                events.source_line_number,
731                events.source_timestamp
732            FROM events
733            JOIN sessions ON sessions.session_key = events.session_key
734            WHERE events.session_key = ?
735            "#
736        .to_owned();
737        append_event_kind_clause(&mut sql, &mut query_params, "events.kind", kinds);
738        sql.push_str(
739            r#"
740            ORDER BY events.source_line_number ASC
741            LIMIT ?
742            "#,
743        );
744        query_params.push(limit.to_string());
745
746        let mut statement = self.conn.prepare(&sql)?;
747
748        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
749            let kind_text: String = row.get(2)?;
750            let kind = kind_text.parse::<EventKind>().map_err(|_| {
751                rusqlite::Error::InvalidColumnType(
752                    2,
753                    "kind".to_owned(),
754                    rusqlite::types::Type::Text,
755                )
756            })?;
757            let source_file_path: String = row.get(5)?;
758            let source_line_number: i64 = row.get(6)?;
759
760            Ok(SessionEvent {
761                session_key: row.get(0)?,
762                session_id: row.get(1)?,
763                kind,
764                text: row.get(3)?,
765                cwd: row.get(4)?,
766                source_file_path: PathBuf::from(source_file_path),
767                source_line_number: source_line_number as usize,
768                source_timestamp: row.get(7)?,
769            })
770        })?;
771
772        rows.collect::<std::result::Result<Vec<_>, _>>()
773            .map_err(Into::into)
774    }
775
776    pub fn session_repos(&self, session_key: &str) -> Result<Vec<String>> {
777        let mut statement = self.conn.prepare(
778            r#"
779            SELECT repo
780            FROM session_repos
781            WHERE session_key = ?
782            ORDER BY lower(repo) ASC
783            "#,
784        )?;
785        let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
786
787        rows.collect::<std::result::Result<Vec<_>, _>>()
788            .map_err(Into::into)
789    }
790
791    pub fn memory_results_with_trace(
792        &self,
793        options: MemorySearchOptions,
794    ) -> Result<(MatchStrategy, Vec<MemoryResult>)> {
795        let limit = options.limit.clamp(1, 100);
796        let Some(query) = options
797            .query
798            .as_ref()
799            .map(|value| value.trim())
800            .filter(|value| !value.is_empty())
801        else {
802            let ids = self.list_memory_ids(&options, limit)?;
803            return Ok((MatchStrategy::AllTerms, self.load_memory_results(&ids)?));
804        };
805
806        let terms = fts_terms(query);
807        if terms.is_empty() {
808            return Ok((MatchStrategy::AllTerms, Vec::new()));
809        }
810
811        let ids = self.search_memory_ids_with_fts(&options, &and_fts_query(&terms), limit)?;
812        if !ids.is_empty() || terms.len() == 1 {
813            return Ok((MatchStrategy::AllTerms, self.load_memory_results(&ids)?));
814        }
815
816        let ids = self.search_memory_ids_with_fts(&options, &or_fts_query(&terms), limit)?;
817        Ok((
818            MatchStrategy::AnyTermFallback,
819            self.load_memory_results(&ids)?,
820        ))
821    }
822
823    pub fn memory_results(&self, options: MemorySearchOptions) -> Result<Vec<MemoryResult>> {
824        self.memory_results_with_trace(options)
825            .map(|(_, results)| results)
826    }
827
828    pub fn memory_by_id(&self, memory_id: &str) -> Result<Option<MemoryObject>> {
829        let mut statement = self.conn.prepare(
830            r#"
831            SELECT id, kind, summary, normalized_text, first_seen_at, last_seen_at, created_at, updated_at
832            FROM memory_objects
833            WHERE id = ?
834            "#,
835        )?;
836        let mut rows = statement.query(params![memory_id])?;
837        let Some(row) = rows.next()? else {
838            return Ok(None);
839        };
840        let kind_text: String = row.get(1)?;
841        let kind = MemoryKind::parse(&kind_text)
842            .ok_or_else(|| anyhow::anyhow!("unknown memory kind `{kind_text}`"))?;
843        let evidence_count: i64 = self.conn.query_row(
844            "SELECT COUNT(*) FROM memory_evidence WHERE memory_id = ?",
845            params![memory_id],
846            |count_row| count_row.get(0),
847        )?;
848
849        Ok(Some(MemoryObject {
850            id: row.get(0)?,
851            kind,
852            summary: row.get(2)?,
853            normalized_text: row.get(3)?,
854            first_seen_at: row.get(4)?,
855            last_seen_at: row.get(5)?,
856            created_at: row.get(6)?,
857            updated_at: row.get(7)?,
858            evidence_count: evidence_count as usize,
859        }))
860    }
861
862    pub fn memory_evidence(&self, memory_id: &str, limit: usize) -> Result<Vec<MemoryEvidence>> {
863        let limit = limit.clamp(1, 200);
864        let mut statement = self.conn.prepare(
865            r#"
866            SELECT
867                memory_evidence.memory_id,
868                memory_evidence.session_key,
869                memory_evidence.session_id,
870                sessions.repo,
871                sessions.cwd,
872                sessions.session_timestamp,
873                memory_evidence.source_file_path,
874                memory_evidence.source_line_number,
875                memory_evidence.source_timestamp,
876                memory_evidence.event_kind,
877                memory_evidence.evidence_text
878            FROM memory_evidence
879            JOIN sessions ON sessions.session_key = memory_evidence.session_key
880            WHERE memory_evidence.memory_id = ?
881            ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
882                     memory_evidence.source_line_number ASC
883            LIMIT ?
884            "#,
885        )?;
886        let rows = statement.query_map(params![memory_id, limit.to_string()], |row| {
887            let event_kind_text: String = row.get(9)?;
888            let event_kind = event_kind_text.parse::<EventKind>().map_err(|_| {
889                rusqlite::Error::InvalidColumnType(
890                    9,
891                    "event_kind".to_owned(),
892                    rusqlite::types::Type::Text,
893                )
894            })?;
895            let source_file_path: String = row.get(6)?;
896            let source_line_number: i64 = row.get(7)?;
897            Ok(MemoryEvidence {
898                memory_id: row.get(0)?,
899                session_key: row.get(1)?,
900                session_id: row.get(2)?,
901                repo: row.get(3)?,
902                cwd: row.get(4)?,
903                session_timestamp: row.get(5)?,
904                source_file_path: PathBuf::from(source_file_path),
905                source_line_number: source_line_number as usize,
906                source_timestamp: row.get(8)?,
907                event_kind,
908                evidence_text: row.get(10)?,
909            })
910        })?;
911
912        rows.collect::<std::result::Result<Vec<_>, _>>()
913            .map_err(Into::into)
914    }
915
916    pub fn delta_items(
917        &self,
918        cursor: Option<&str>,
919        limit: usize,
920        repo: Option<&str>,
921    ) -> Result<Vec<DeltaItem>> {
922        if !self.table_exists("change_log")? {
923            return Ok(Vec::new());
924        }
925
926        let limit = limit.clamp(1, 200);
927        let mut next_change_id = cursor
928            .and_then(parse_delta_cursor)
929            .map_or(0, |item| item.change_id);
930        let mut items = Vec::new();
931        let batch_size = limit.saturating_mul(5).clamp(100, 500);
932
933        while items.len() < limit {
934            let rows = self.next_change_rows(next_change_id, batch_size)?;
935            if rows.is_empty() {
936                break;
937            }
938            next_change_id = rows.last().map(|row| row.seq).unwrap_or(next_change_id);
939            for row in rows {
940                if let Some(item) = self.resolve_delta_item(&row, repo)? {
941                    items.push(item);
942                    if items.len() == limit {
943                        break;
944                    }
945                }
946            }
947        }
948
949        Ok(items)
950    }
951
952    pub fn related_memories_for_session(
953        &self,
954        session_key: &str,
955        limit: usize,
956    ) -> Result<Vec<MemoryResult>> {
957        let mut statement = self.conn.prepare(
958            r#"
959            SELECT memory_id
960            FROM memory_evidence
961            WHERE session_key = ?
962            GROUP BY memory_id
963            ORDER BY COUNT(*) DESC, memory_id ASC
964            LIMIT ?
965            "#,
966        )?;
967        let rows = statement.query_map(params![session_key, limit.to_string()], |row| {
968            row.get::<_, String>(0)
969        })?;
970        let ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
971        self.load_memory_results(&ids)
972    }
973
974    pub fn related_sessions_for_session(
975        &self,
976        session_key: &str,
977        limit: usize,
978    ) -> Result<Vec<RecentSession>> {
979        let mut statement = self.conn.prepare(
980            r#"
981            SELECT
982                sessions.session_key,
983                sessions.session_id,
984                sessions.repo,
985                sessions.cwd,
986                sessions.session_timestamp,
987                sessions.source_file_path
988            FROM memory_evidence seed
989            JOIN memory_evidence related ON related.memory_id = seed.memory_id
990            JOIN sessions ON sessions.session_key = related.session_key
991            WHERE seed.session_key = ?
992              AND related.session_key != ?
993            GROUP BY sessions.session_key
994            ORDER BY COUNT(DISTINCT related.memory_id) DESC,
995                     datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
996                     sessions.session_key ASC
997            LIMIT ?
998            "#,
999        )?;
1000        let rows = statement.query_map(
1001            params![session_key, session_key, limit.to_string()],
1002            |row| {
1003                let source_file_path: String = row.get(5)?;
1004                Ok(RecentSession {
1005                    session_key: row.get(0)?,
1006                    session_id: row.get(1)?,
1007                    repo: row.get(2)?,
1008                    cwd: row.get(3)?,
1009                    session_timestamp: row.get(4)?,
1010                    source_file_path: PathBuf::from(source_file_path),
1011                })
1012            },
1013        )?;
1014        rows.collect::<std::result::Result<Vec<_>, _>>()
1015            .map_err(Into::into)
1016    }
1017
1018    pub fn related_sessions_for_memory(
1019        &self,
1020        memory_id: &str,
1021        limit: usize,
1022    ) -> Result<Vec<RecentSession>> {
1023        let mut statement = self.conn.prepare(
1024            r#"
1025            SELECT
1026                sessions.session_key,
1027                sessions.session_id,
1028                sessions.repo,
1029                sessions.cwd,
1030                sessions.session_timestamp,
1031                sessions.source_file_path
1032            FROM memory_evidence
1033            JOIN sessions ON sessions.session_key = memory_evidence.session_key
1034            WHERE memory_evidence.memory_id = ?
1035            GROUP BY sessions.session_key
1036            ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
1037                     sessions.session_key ASC
1038            LIMIT ?
1039            "#,
1040        )?;
1041        let rows = statement.query_map(params![memory_id, limit.to_string()], |row| {
1042            let source_file_path: String = row.get(5)?;
1043            Ok(RecentSession {
1044                session_key: row.get(0)?,
1045                session_id: row.get(1)?,
1046                repo: row.get(2)?,
1047                cwd: row.get(3)?,
1048                session_timestamp: row.get(4)?,
1049                source_file_path: PathBuf::from(source_file_path),
1050            })
1051        })?;
1052        rows.collect::<std::result::Result<Vec<_>, _>>()
1053            .map_err(Into::into)
1054    }
1055
1056    pub fn cooccurring_memories(&self, memory_id: &str, limit: usize) -> Result<Vec<MemoryResult>> {
1057        let mut statement = self.conn.prepare(
1058            r#"
1059            SELECT related.memory_id
1060            FROM memory_evidence seed
1061            JOIN memory_evidence related ON related.session_key = seed.session_key
1062            WHERE seed.memory_id = ?
1063              AND related.memory_id != ?
1064            GROUP BY related.memory_id
1065            ORDER BY COUNT(DISTINCT related.session_key) DESC, related.memory_id ASC
1066            LIMIT ?
1067            "#,
1068        )?;
1069        let rows = statement
1070            .query_map(params![memory_id, memory_id, limit.to_string()], |row| {
1071                row.get::<_, String>(0)
1072            })?;
1073        let ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1074        self.load_memory_results(&ids)
1075    }
1076
1077    pub fn session_resources(&self, limit: usize) -> Result<Vec<ResourceRecord>> {
1078        let sessions = self.recent_sessions(RecentOptions {
1079            limit,
1080            ..RecentOptions::default()
1081        })?;
1082        Ok(sessions
1083            .into_iter()
1084            .map(|session| ResourceRecord {
1085                uri: format!("codex-recall://session/{}", session.session_key),
1086                name: format!("session {}", session.session_id),
1087                description: format!("{} {}", session.repo, session.cwd),
1088                mime_type: "application/json".to_owned(),
1089                object_type: "session".to_owned(),
1090                updated_at: session.session_timestamp,
1091            })
1092            .collect())
1093    }
1094
1095    pub fn memory_resources(&self, limit: usize) -> Result<Vec<ResourceRecord>> {
1096        let memories = self.memory_results(MemorySearchOptions {
1097            limit,
1098            ..MemorySearchOptions::default()
1099        })?;
1100        Ok(memories
1101            .into_iter()
1102            .map(|memory| ResourceRecord {
1103                uri: format!("codex-recall://memory/{}", memory.object.id),
1104                name: format!("{} {}", memory.object.kind.as_str(), memory.object.id),
1105                description: memory.object.summary,
1106                mime_type: "application/json".to_owned(),
1107                object_type: "memory".to_owned(),
1108                updated_at: memory.object.updated_at,
1109            })
1110            .collect())
1111    }
1112
1113    fn list_memory_ids(&self, options: &MemorySearchOptions, limit: usize) -> Result<Vec<String>> {
1114        let mut sql = "SELECT memory_objects.id FROM memory_objects WHERE 1 = 1".to_owned();
1115        let mut params = Vec::<String>::new();
1116        append_memory_filter_clauses(&mut sql, &mut params, options)?;
1117        sql.push_str(
1118            r#"
1119            ORDER BY datetime(replace(replace(memory_objects.last_seen_at, 'T', ' '), 'Z', '')) DESC,
1120                     memory_objects.id ASC
1121            LIMIT ?
1122            "#,
1123        );
1124        params.push(limit.to_string());
1125        let mut statement = self.conn.prepare(&sql)?;
1126        let rows = statement.query_map(params_from_iter(params.iter()), |row| {
1127            row.get::<_, String>(0)
1128        })?;
1129        rows.collect::<std::result::Result<Vec<_>, _>>()
1130            .map_err(Into::into)
1131    }
1132
1133    fn search_memory_ids_with_fts(
1134        &self,
1135        options: &MemorySearchOptions,
1136        fts_query: &str,
1137        limit: usize,
1138    ) -> Result<Vec<String>> {
1139        let fetch_limit = limit.saturating_mul(10).clamp(limit, 500);
1140        let mut sql = r#"
1141            SELECT memory_objects.id
1142            FROM memory_fts
1143            JOIN memory_objects ON memory_objects.id = memory_fts.memory_id
1144            WHERE memory_fts MATCH ?
1145        "#
1146        .to_owned();
1147        let mut params = vec![fts_query.to_owned()];
1148        append_memory_filter_clauses(&mut sql, &mut params, options)?;
1149        sql.push_str(
1150            r#"
1151            ORDER BY memory_fts.rank ASC,
1152                     datetime(replace(replace(memory_objects.last_seen_at, 'T', ' '), 'Z', '')) DESC,
1153                     memory_objects.id ASC
1154            LIMIT ?
1155            "#,
1156        );
1157        params.push(fetch_limit.to_string());
1158        let mut statement = self.conn.prepare(&sql)?;
1159        let rows = statement.query_map(params_from_iter(params.iter()), |row| {
1160            row.get::<_, String>(0)
1161        })?;
1162        let mut ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1163        ids.dedup();
1164        if ids.len() > limit {
1165            ids.truncate(limit);
1166        }
1167        Ok(ids)
1168    }
1169
1170    fn load_memory_results(&self, ids: &[String]) -> Result<Vec<MemoryResult>> {
1171        let mut results = Vec::new();
1172        for id in ids {
1173            let Some(object) = self.memory_by_id(id)? else {
1174                continue;
1175            };
1176            let repos = self.memory_repos(id)?;
1177            let session_keys = self.memory_session_keys(id)?;
1178            results.push(MemoryResult {
1179                object,
1180                repos,
1181                session_keys,
1182            });
1183        }
1184        Ok(results)
1185    }
1186
1187    fn memory_repos(&self, memory_id: &str) -> Result<Vec<String>> {
1188        let mut statement = self.conn.prepare(
1189            r#"
1190            SELECT DISTINCT sessions.repo
1191            FROM memory_evidence
1192            JOIN sessions ON sessions.session_key = memory_evidence.session_key
1193            WHERE memory_evidence.memory_id = ?
1194              AND sessions.repo != ''
1195            ORDER BY lower(sessions.repo) ASC
1196            "#,
1197        )?;
1198        let rows = statement.query_map(params![memory_id], |row| row.get::<_, String>(0))?;
1199        rows.collect::<std::result::Result<Vec<_>, _>>()
1200            .map_err(Into::into)
1201    }
1202
1203    fn memory_session_keys(&self, memory_id: &str) -> Result<Vec<String>> {
1204        let mut statement = self.conn.prepare(
1205            r#"
1206            SELECT DISTINCT session_key
1207            FROM memory_evidence
1208            WHERE memory_id = ?
1209            ORDER BY session_key ASC
1210            "#,
1211        )?;
1212        let rows = statement.query_map(params![memory_id], |row| row.get::<_, String>(0))?;
1213        rows.collect::<std::result::Result<Vec<_>, _>>()
1214            .map_err(Into::into)
1215    }
1216
1217    fn memory_ids_for_session(&self, session_key: &str) -> Result<Vec<String>> {
1218        let mut statement = self.conn.prepare(
1219            r#"
1220            SELECT DISTINCT memory_id
1221            FROM memory_evidence
1222            WHERE session_key = ?
1223            ORDER BY memory_id ASC
1224            "#,
1225        )?;
1226        let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
1227        rows.collect::<std::result::Result<Vec<_>, _>>()
1228            .map_err(Into::into)
1229    }
1230
1231    fn upsert_memory_object(
1232        &self,
1233        memory: &ExtractedMemory,
1234        session_timestamp: &str,
1235    ) -> Result<()> {
1236        self.conn.execute(
1237            r#"
1238            INSERT INTO memory_objects (
1239                id, kind, summary, normalized_text, first_seen_at, last_seen_at, created_at, updated_at
1240            ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'), strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1241            ON CONFLICT(id) DO UPDATE SET
1242                kind = excluded.kind,
1243                summary = CASE
1244                    WHEN length(excluded.summary) < length(memory_objects.summary) THEN excluded.summary
1245                    ELSE memory_objects.summary
1246                END,
1247                normalized_text = excluded.normalized_text,
1248                first_seen_at = MIN(memory_objects.first_seen_at, excluded.first_seen_at),
1249                last_seen_at = MAX(memory_objects.last_seen_at, excluded.last_seen_at),
1250                updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
1251            "#,
1252            params![
1253                memory.id,
1254                memory.kind.as_str(),
1255                memory.summary,
1256                memory.normalized_text,
1257                session_timestamp,
1258                session_timestamp,
1259            ],
1260        )?;
1261        Ok(())
1262    }
1263
1264    fn refresh_memory_objects(&self, memory_ids: &[String]) -> Result<()> {
1265        for memory_id in memory_ids {
1266            let evidence_count: i64 = self.conn.query_row(
1267                "SELECT COUNT(*) FROM memory_evidence WHERE memory_id = ?",
1268                params![memory_id],
1269                |row| row.get(0),
1270            )?;
1271
1272            if evidence_count == 0 {
1273                self.conn.execute(
1274                    "DELETE FROM memory_fts WHERE memory_id = ?",
1275                    params![memory_id],
1276                )?;
1277                self.conn.execute(
1278                    "DELETE FROM memory_objects WHERE id = ?",
1279                    params![memory_id],
1280                )?;
1281                self.record_change("memory", memory_id, "delete")?;
1282                continue;
1283            }
1284
1285            let (first_seen_at, last_seen_at, kind, summary, normalized_text): (
1286                String,
1287                String,
1288                String,
1289                String,
1290                String,
1291            ) = self.conn.query_row(
1292                r#"
1293                SELECT
1294                    MIN(sessions.session_timestamp),
1295                    MAX(sessions.session_timestamp),
1296                    memory_objects.kind,
1297                    memory_objects.summary,
1298                    memory_objects.normalized_text
1299                FROM memory_objects
1300                JOIN memory_evidence ON memory_evidence.memory_id = memory_objects.id
1301                JOIN sessions ON sessions.session_key = memory_evidence.session_key
1302                WHERE memory_objects.id = ?
1303                GROUP BY memory_objects.id
1304                "#,
1305                params![memory_id],
1306                |row| {
1307                    Ok((
1308                        row.get(0)?,
1309                        row.get(1)?,
1310                        row.get(2)?,
1311                        row.get(3)?,
1312                        row.get(4)?,
1313                    ))
1314                },
1315            )?;
1316
1317            self.conn.execute(
1318                r#"
1319                UPDATE memory_objects
1320                SET first_seen_at = ?, last_seen_at = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
1321                WHERE id = ?
1322                "#,
1323                params![first_seen_at, last_seen_at, memory_id],
1324            )?;
1325            self.conn.execute(
1326                "DELETE FROM memory_fts WHERE memory_id = ?",
1327                params![memory_id],
1328            )?;
1329            self.conn.execute(
1330                r#"
1331                INSERT INTO memory_fts (memory_id, kind, summary, normalized_text)
1332                VALUES (?, ?, ?, ?)
1333                "#,
1334                params![memory_id, kind, summary, normalized_text],
1335            )?;
1336            self.record_change("memory", memory_id, "upsert")?;
1337        }
1338
1339        Ok(())
1340    }
1341
1342    fn record_change(&self, object_type: &str, object_id: &str, action: &str) -> Result<i64> {
1343        self.conn.execute(
1344            r#"
1345            INSERT INTO change_log (object_type, object_id, action)
1346            VALUES (?, ?, ?)
1347            "#,
1348            params![object_type, object_id, action],
1349        )?;
1350        Ok(self.conn.last_insert_rowid())
1351    }
1352
1353    fn next_change_rows(&self, after_change_id: i64, limit: usize) -> Result<Vec<ChangeLogRow>> {
1354        let mut statement = self.conn.prepare(
1355            r#"
1356            SELECT seq, object_type, object_id, action, recorded_at
1357            FROM change_log
1358            WHERE seq > ?
1359            ORDER BY seq ASC
1360            LIMIT ?
1361            "#,
1362        )?;
1363        let rows = statement.query_map(params![after_change_id, limit as i64], |row| {
1364            Ok(ChangeLogRow {
1365                seq: row.get(0)?,
1366                object_type: row.get(1)?,
1367                object_id: row.get(2)?,
1368                action: row.get(3)?,
1369                recorded_at: row.get(4)?,
1370            })
1371        })?;
1372
1373        rows.collect::<std::result::Result<Vec<_>, _>>()
1374            .map_err(Into::into)
1375    }
1376
1377    fn session_has_repo(&self, session_key: &str, repo: &str) -> Result<bool> {
1378        let count = self.conn.query_row(
1379            r#"
1380            SELECT COUNT(*)
1381            FROM session_repos
1382            WHERE session_key = ?
1383              AND lower(repo) = lower(?)
1384            "#,
1385            params![session_key, repo],
1386            |row| row.get::<_, i64>(0),
1387        )?;
1388        Ok(count > 0)
1389    }
1390
1391    fn memory_result_by_id(&self, memory_id: &str) -> Result<Option<MemoryResult>> {
1392        self.load_memory_results(&[memory_id.to_owned()])
1393            .map(|results| results.into_iter().next())
1394    }
1395
1396    fn resolve_delta_item(
1397        &self,
1398        row: &ChangeLogRow,
1399        repo: Option<&str>,
1400    ) -> Result<Option<DeltaItem>> {
1401        match row.object_type.as_str() {
1402            "session" => {
1403                let mut statement = self.conn.prepare(
1404                    r#"
1405                    SELECT session_key, session_id, repo, cwd, session_timestamp, indexed_at
1406                    FROM sessions
1407                    WHERE session_key = ?
1408                    "#,
1409                )?;
1410                let mut rows = statement.query(params![row.object_id.as_str()])?;
1411                let Some(session_row) = rows.next()? else {
1412                    return Ok(None);
1413                };
1414                let session_key: String = session_row.get(0)?;
1415                if let Some(repo_filter) = repo {
1416                    if !self.session_has_repo(&session_key, repo_filter)? {
1417                        return Ok(None);
1418                    }
1419                }
1420
1421                Ok(Some(DeltaItem::Session {
1422                    change_id: row.seq,
1423                    action: row.action.clone(),
1424                    session_key,
1425                    session_id: session_row.get(1)?,
1426                    repo: session_row.get(2)?,
1427                    cwd: session_row.get(3)?,
1428                    session_timestamp: session_row.get(4)?,
1429                    updated_at: session_row.get(5)?,
1430                }))
1431            }
1432            "memory" => {
1433                if let Some(result) = self.memory_result_by_id(&row.object_id)? {
1434                    if let Some(repo_filter) = repo {
1435                        if !result
1436                            .repos
1437                            .iter()
1438                            .any(|item| item.eq_ignore_ascii_case(repo_filter))
1439                        {
1440                            return Ok(None);
1441                        }
1442                    }
1443
1444                    return Ok(Some(DeltaItem::Memory {
1445                        change_id: row.seq,
1446                        action: row.action.clone(),
1447                        object: result.object,
1448                        repos: result.repos,
1449                        session_keys: result.session_keys,
1450                    }));
1451                }
1452
1453                if repo.is_some() {
1454                    return Ok(None);
1455                }
1456
1457                Ok(Some(DeltaItem::Deleted {
1458                    change_id: row.seq,
1459                    object_type: row.object_type.clone(),
1460                    object_id: row.object_id.clone(),
1461                    action: row.action.clone(),
1462                    updated_at: row.recorded_at.clone(),
1463                }))
1464            }
1465            _ => Ok(None),
1466        }
1467    }
1468
1469    pub fn is_source_current(
1470        &self,
1471        source_file_path: &Path,
1472        source_file_mtime_ns: i64,
1473        source_file_size: i64,
1474    ) -> Result<bool> {
1475        let count = self.conn.query_row(
1476            r#"
1477            SELECT COUNT(*)
1478            FROM ingestion_state
1479            WHERE source_file_path = ?
1480              AND source_file_mtime_ns = ?
1481              AND source_file_size = ?
1482              AND content_version = ?
1483            "#,
1484            params![
1485                source_file_path.display().to_string(),
1486                source_file_mtime_ns,
1487                source_file_size,
1488                CONTENT_VERSION,
1489            ],
1490            |row| row.get::<_, i64>(0),
1491        )?;
1492        Ok(count > 0)
1493    }
1494
1495    pub fn mark_source_indexed(
1496        &self,
1497        source_file_path: &Path,
1498        source_file_mtime_ns: i64,
1499        source_file_size: i64,
1500        session_id: Option<&str>,
1501        session_key: Option<&str>,
1502    ) -> Result<()> {
1503        self.conn.execute(
1504            r#"
1505            INSERT INTO ingestion_state (
1506                source_file_path, source_file_mtime_ns, source_file_size, session_id, session_key, content_version, indexed_at
1507            ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1508            ON CONFLICT(source_file_path) DO UPDATE SET
1509                source_file_mtime_ns = excluded.source_file_mtime_ns,
1510                source_file_size = excluded.source_file_size,
1511                session_id = excluded.session_id,
1512                session_key = excluded.session_key,
1513                content_version = excluded.content_version,
1514                indexed_at = excluded.indexed_at
1515            "#,
1516            params![
1517                source_file_path.display().to_string(),
1518                source_file_mtime_ns,
1519                source_file_size,
1520                session_id,
1521                session_key,
1522                CONTENT_VERSION,
1523            ],
1524        )?;
1525        Ok(())
1526    }
1527
1528    pub fn last_indexed_at(&self) -> Result<Option<String>> {
1529        self.conn
1530            .query_row("SELECT MAX(indexed_at) FROM ingestion_state", [], |row| {
1531                row.get::<_, Option<String>>(0)
1532            })
1533            .map_err(Into::into)
1534    }
1535
1536    fn init_schema(&self) -> Result<()> {
1537        self.conn.execute_batch(
1538            r#"
1539            PRAGMA journal_mode = WAL;
1540            PRAGMA synchronous = NORMAL;
1541            "#,
1542        )?;
1543
1544        if self.table_exists("sessions")? && !self.table_has_column("sessions", "session_key")? {
1545            self.migrate_to_session_key_schema()?;
1546        }
1547
1548        self.create_schema_objects()?;
1549        self.ensure_sessions_indexed_at_column()?;
1550        self.ensure_ingestion_state_session_key_column()?;
1551        self.ensure_ingestion_state_content_version_column()?;
1552        self.backfill_session_repos()?;
1553        self.backfill_session_repo_memberships()?;
1554        self.backfill_ingestion_session_keys()?;
1555        Ok(())
1556    }
1557
1558    fn create_schema_objects(&self) -> Result<()> {
1559        self.conn.execute_batch(
1560            r#"
1561            CREATE TABLE IF NOT EXISTS sessions (
1562                session_key TEXT PRIMARY KEY,
1563                session_id TEXT NOT NULL,
1564                session_timestamp TEXT NOT NULL,
1565                cwd TEXT NOT NULL,
1566                repo TEXT NOT NULL DEFAULT '',
1567                cli_version TEXT,
1568                source_file_path TEXT NOT NULL,
1569                indexed_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1570            );
1571
1572            CREATE INDEX IF NOT EXISTS sessions_session_id_idx ON sessions(session_id);
1573            CREATE INDEX IF NOT EXISTS sessions_repo_idx ON sessions(repo);
1574
1575            CREATE TABLE IF NOT EXISTS session_repos (
1576                session_key TEXT NOT NULL,
1577                repo TEXT NOT NULL,
1578                PRIMARY KEY(session_key, repo),
1579                FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1580            );
1581
1582            CREATE INDEX IF NOT EXISTS session_repos_repo_idx ON session_repos(repo);
1583
1584            CREATE TABLE IF NOT EXISTS events (
1585                id INTEGER PRIMARY KEY AUTOINCREMENT,
1586                session_key TEXT NOT NULL,
1587                session_id TEXT NOT NULL,
1588                kind TEXT NOT NULL,
1589                role TEXT,
1590                text TEXT NOT NULL,
1591                command TEXT,
1592                cwd TEXT,
1593                exit_code INTEGER,
1594                source_timestamp TEXT,
1595                source_file_path TEXT NOT NULL,
1596                source_line_number INTEGER NOT NULL,
1597                FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1598            );
1599
1600            CREATE INDEX IF NOT EXISTS events_session_key_idx ON events(session_key);
1601            CREATE INDEX IF NOT EXISTS events_session_id_idx ON events(session_id);
1602            CREATE INDEX IF NOT EXISTS events_source_idx ON events(source_file_path, source_line_number);
1603
1604            CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
1605                event_id UNINDEXED,
1606                session_key UNINDEXED,
1607                session_id UNINDEXED,
1608                kind UNINDEXED,
1609                text,
1610                tokenize = 'porter unicode61'
1611            );
1612
1613            CREATE TABLE IF NOT EXISTS ingestion_state (
1614                source_file_path TEXT PRIMARY KEY,
1615                source_file_mtime_ns INTEGER NOT NULL,
1616                source_file_size INTEGER NOT NULL,
1617                session_id TEXT,
1618                session_key TEXT,
1619                content_version INTEGER NOT NULL DEFAULT 3,
1620                indexed_at TEXT NOT NULL
1621            );
1622
1623            CREATE TABLE IF NOT EXISTS memory_objects (
1624                id TEXT PRIMARY KEY,
1625                kind TEXT NOT NULL,
1626                summary TEXT NOT NULL,
1627                normalized_text TEXT NOT NULL,
1628                first_seen_at TEXT NOT NULL,
1629                last_seen_at TEXT NOT NULL,
1630                created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
1631                updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1632            );
1633
1634            CREATE UNIQUE INDEX IF NOT EXISTS memory_objects_kind_normalized_idx
1635                ON memory_objects(kind, normalized_text);
1636            CREATE INDEX IF NOT EXISTS memory_objects_updated_idx
1637                ON memory_objects(updated_at);
1638
1639            CREATE TABLE IF NOT EXISTS memory_evidence (
1640                memory_id TEXT NOT NULL,
1641                session_key TEXT NOT NULL,
1642                session_id TEXT NOT NULL,
1643                source_file_path TEXT NOT NULL,
1644                source_line_number INTEGER NOT NULL,
1645                source_timestamp TEXT,
1646                event_kind TEXT NOT NULL,
1647                evidence_text TEXT NOT NULL,
1648                PRIMARY KEY(memory_id, source_file_path, source_line_number),
1649                FOREIGN KEY(memory_id) REFERENCES memory_objects(id) ON DELETE CASCADE,
1650                FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1651            );
1652
1653            CREATE INDEX IF NOT EXISTS memory_evidence_memory_idx ON memory_evidence(memory_id);
1654            CREATE INDEX IF NOT EXISTS memory_evidence_session_idx ON memory_evidence(session_key);
1655
1656            CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(
1657                memory_id UNINDEXED,
1658                kind UNINDEXED,
1659                summary,
1660                normalized_text,
1661                tokenize = 'porter unicode61'
1662            );
1663
1664            CREATE TABLE IF NOT EXISTS change_log (
1665                seq INTEGER PRIMARY KEY AUTOINCREMENT,
1666                object_type TEXT NOT NULL,
1667                object_id TEXT NOT NULL,
1668                action TEXT NOT NULL,
1669                recorded_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1670            );
1671
1672            CREATE INDEX IF NOT EXISTS change_log_object_idx
1673                ON change_log(object_type, object_id, seq);
1674            "#,
1675        )?;
1676        Ok(())
1677    }
1678
1679    fn index_session_inner(&self, parsed: &ParsedSession) -> Result<()> {
1680        let session_key = session_key(&parsed.session.id, &parsed.session.source_file_path);
1681        let repo = repo_slug(&parsed.session.cwd);
1682        self.conn.execute(
1683            r#"
1684            INSERT INTO sessions (
1685                session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path, indexed_at
1686            ) VALUES (?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1687            ON CONFLICT(session_key) DO UPDATE SET
1688                session_id = excluded.session_id,
1689                session_timestamp = excluded.session_timestamp,
1690                cwd = excluded.cwd,
1691                repo = excluded.repo,
1692                cli_version = excluded.cli_version,
1693                source_file_path = excluded.source_file_path,
1694                indexed_at = excluded.indexed_at
1695            "#,
1696            params![
1697                session_key.as_str(),
1698                parsed.session.id,
1699                parsed.session.timestamp,
1700                parsed.session.cwd,
1701                repo,
1702                parsed.session.cli_version,
1703                parsed.session.source_file_path.display().to_string(),
1704            ],
1705        )?;
1706        self.record_change("session", &session_key, "upsert")?;
1707
1708        let mut affected_memory_ids = self.memory_ids_for_session(&session_key)?;
1709
1710        self.conn.execute(
1711            "DELETE FROM events_fts WHERE session_key = ?",
1712            params![session_key.as_str()],
1713        )?;
1714        self.conn.execute(
1715            "DELETE FROM events WHERE session_key = ?",
1716            params![session_key.as_str()],
1717        )?;
1718        self.conn.execute(
1719            "DELETE FROM session_repos WHERE session_key = ?",
1720            params![session_key.as_str()],
1721        )?;
1722        self.conn.execute(
1723            "DELETE FROM memory_evidence WHERE session_key = ?",
1724            params![session_key.as_str()],
1725        )?;
1726
1727        for repo in session_repos(parsed) {
1728            self.conn.execute(
1729                "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1730                params![session_key.as_str(), repo],
1731            )?;
1732        }
1733
1734        for event in &parsed.events {
1735            self.conn.execute(
1736                r#"
1737                INSERT INTO events (
1738                    session_key, session_id, kind, role, text, command, cwd, exit_code,
1739                    source_timestamp, source_file_path, source_line_number
1740                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1741                "#,
1742                params![
1743                    session_key.as_str(),
1744                    parsed.session.id,
1745                    event.kind.as_str(),
1746                    event.role,
1747                    event.text,
1748                    event.command,
1749                    event.cwd,
1750                    event.exit_code,
1751                    event.source_timestamp,
1752                    event.source_file_path.display().to_string(),
1753                    event.source_line_number as i64,
1754                ],
1755            )?;
1756            let event_id = self.conn.last_insert_rowid();
1757            self.conn.execute(
1758                "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
1759                params![
1760                    event_id,
1761                    session_key.as_str(),
1762                    parsed.session.id,
1763                    event.kind.as_str(),
1764                    event.text
1765                ],
1766            )?;
1767        }
1768
1769        for memory in extract_memories(parsed) {
1770            self.upsert_memory_object(&memory, &parsed.session.timestamp)?;
1771            self.conn.execute(
1772                r#"
1773                INSERT OR REPLACE INTO memory_evidence (
1774                    memory_id, session_key, session_id, source_file_path, source_line_number,
1775                    source_timestamp, event_kind, evidence_text
1776                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1777                "#,
1778                params![
1779                    memory.id,
1780                    session_key.as_str(),
1781                    parsed.session.id,
1782                    parsed.session.source_file_path.display().to_string(),
1783                    memory.source_line_number as i64,
1784                    memory.source_timestamp,
1785                    memory.event_kind.as_str(),
1786                    memory.evidence_text,
1787                ],
1788            )?;
1789            affected_memory_ids.push(memory.id);
1790        }
1791
1792        affected_memory_ids.sort();
1793        affected_memory_ids.dedup();
1794        self.refresh_memory_objects(&affected_memory_ids)?;
1795
1796        Ok(())
1797    }
1798
1799    fn backfill_session_repos(&self) -> Result<()> {
1800        let mut statement = self
1801            .conn
1802            .prepare("SELECT session_key, cwd FROM sessions WHERE repo = ''")?;
1803        let rows = statement.query_map([], |row| {
1804            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1805        })?;
1806
1807        for row in rows {
1808            let (session_key, cwd) = row?;
1809            self.conn.execute(
1810                "UPDATE sessions SET repo = ? WHERE session_key = ?",
1811                params![repo_slug(&cwd), session_key],
1812            )?;
1813        }
1814        Ok(())
1815    }
1816
1817    fn table_exists(&self, table_name: &str) -> Result<bool> {
1818        let count = self.conn.query_row(
1819            "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?",
1820            params![table_name],
1821            |row| row.get::<_, i64>(0),
1822        )?;
1823        Ok(count > 0)
1824    }
1825
1826    fn table_has_column(&self, table_name: &str, column_name: &str) -> Result<bool> {
1827        let mut statement = self
1828            .conn
1829            .prepare(&format!("PRAGMA table_info({table_name})"))?;
1830        let columns = statement
1831            .query_map([], |row| row.get::<_, String>(1))?
1832            .collect::<std::result::Result<Vec<_>, _>>()?;
1833        Ok(columns.iter().any(|column| column == column_name))
1834    }
1835
1836    fn ensure_ingestion_state_session_key_column(&self) -> Result<()> {
1837        if self.table_has_column("ingestion_state", "session_key")? {
1838            return Ok(());
1839        }
1840
1841        match self.conn.execute(
1842            "ALTER TABLE ingestion_state ADD COLUMN session_key TEXT",
1843            [],
1844        ) {
1845            Ok(_) => Ok(()),
1846            Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1847            Err(error) => Err(error.into()),
1848        }
1849    }
1850
1851    fn ensure_sessions_indexed_at_column(&self) -> Result<()> {
1852        if self.table_has_column("sessions", "indexed_at")? {
1853            return Ok(());
1854        }
1855
1856        match self.conn.execute(
1857            "ALTER TABLE sessions ADD COLUMN indexed_at TEXT NOT NULL DEFAULT '1970-01-01T00:00:00.000Z'",
1858            [],
1859        ) {
1860            Ok(_) => {
1861                self.conn.execute(
1862                    "UPDATE sessions SET indexed_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE indexed_at = '1970-01-01T00:00:00.000Z'",
1863                    [],
1864                )?;
1865                Ok(())
1866            }
1867            Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1868            Err(error) => Err(error.into()),
1869        }
1870    }
1871
1872    fn ensure_ingestion_state_content_version_column(&self) -> Result<()> {
1873        if self.table_has_column("ingestion_state", "content_version")? {
1874            return Ok(());
1875        }
1876
1877        match self.conn.execute(
1878            "ALTER TABLE ingestion_state ADD COLUMN content_version INTEGER NOT NULL DEFAULT 0",
1879            [],
1880        ) {
1881            Ok(_) => Ok(()),
1882            Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1883            Err(error) => Err(error.into()),
1884        }
1885    }
1886
1887    fn backfill_ingestion_session_keys(&self) -> Result<()> {
1888        if !self.table_exists("ingestion_state")? {
1889            return Ok(());
1890        }
1891
1892        self.conn.execute(
1893            r#"
1894            UPDATE ingestion_state
1895            SET session_key = (
1896                SELECT sessions.session_key
1897                FROM sessions
1898                WHERE sessions.source_file_path = ingestion_state.source_file_path
1899                LIMIT 1
1900            )
1901            WHERE session_key IS NULL
1902              AND session_id IS NOT NULL
1903            "#,
1904            [],
1905        )?;
1906        Ok(())
1907    }
1908
1909    fn backfill_session_repo_memberships(&self) -> Result<()> {
1910        self.conn.execute(
1911            r#"
1912            INSERT OR IGNORE INTO session_repos (session_key, repo)
1913            SELECT session_key, repo
1914            FROM sessions
1915            WHERE repo != ''
1916            "#,
1917            [],
1918        )?;
1919
1920        let mut statement = self.conn.prepare(
1921            r#"
1922            SELECT DISTINCT session_key, cwd
1923            FROM events
1924            WHERE cwd IS NOT NULL
1925              AND cwd != ''
1926            "#,
1927        )?;
1928        let rows = statement.query_map([], |row| {
1929            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1930        })?;
1931
1932        for row in rows {
1933            let (session_key, cwd) = row?;
1934            let repo = repo_slug(&cwd);
1935            if repo.is_empty() {
1936                continue;
1937            }
1938            self.conn.execute(
1939                "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1940                params![session_key, repo],
1941            )?;
1942        }
1943
1944        Ok(())
1945    }
1946
1947    fn migrate_to_session_key_schema(&self) -> Result<()> {
1948        let old_sessions = self.load_old_sessions()?;
1949        let old_events = self.load_old_events()?;
1950        let mut keys_by_session_id = HashMap::new();
1951        for session in &old_sessions {
1952            keys_by_session_id.insert(
1953                session.session_id.clone(),
1954                session_key(&session.session_id, Path::new(&session.source_file_path)),
1955            );
1956        }
1957
1958        self.conn.execute("BEGIN IMMEDIATE", [])?;
1959        let result = (|| -> Result<()> {
1960            self.conn.execute_batch(
1961                r#"
1962                DROP TABLE IF EXISTS events_fts;
1963                DROP TABLE IF EXISTS events;
1964                DROP TABLE IF EXISTS sessions;
1965                "#,
1966            )?;
1967            self.create_schema_objects()?;
1968
1969            for session in &old_sessions {
1970                let session_key = keys_by_session_id
1971                    .get(&session.session_id)
1972                    .expect("session key exists");
1973                let repo = if session.repo.is_empty() {
1974                    repo_slug(&session.cwd)
1975                } else {
1976                    session.repo.clone()
1977                };
1978                self.conn.execute(
1979                    r#"
1980                    INSERT INTO sessions (
1981                        session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path, indexed_at
1982                    ) VALUES (?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1983                    "#,
1984                    params![
1985                        session_key,
1986                        session.session_id,
1987                        session.session_timestamp,
1988                        session.cwd,
1989                        repo,
1990                        session.cli_version,
1991                        session.source_file_path,
1992                    ],
1993                )?;
1994                self.conn.execute(
1995                    "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1996                    params![session_key, repo],
1997                )?;
1998            }
1999
2000            for event in &old_events {
2001                let Some(session_key) = keys_by_session_id.get(&event.session_id) else {
2002                    continue;
2003                };
2004                self.conn.execute(
2005                    r#"
2006                    INSERT INTO events (
2007                        session_key, session_id, kind, role, text, command, cwd, exit_code,
2008                        source_timestamp, source_file_path, source_line_number
2009                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2010                    "#,
2011                    params![
2012                        session_key,
2013                        event.session_id,
2014                        event.kind,
2015                        event.role,
2016                        event.text,
2017                        event.command,
2018                        event.cwd,
2019                        event.exit_code,
2020                        event.source_timestamp,
2021                        event.source_file_path,
2022                        event.source_line_number,
2023                    ],
2024                )?;
2025                let event_id = self.conn.last_insert_rowid();
2026                self.conn.execute(
2027                    "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
2028                    params![
2029                        event_id,
2030                        session_key,
2031                        event.session_id,
2032                        event.kind,
2033                        event.text
2034                    ],
2035                )?;
2036            }
2037
2038            Ok(())
2039        })();
2040
2041        match result {
2042            Ok(()) => {
2043                self.conn.execute("COMMIT", [])?;
2044                Ok(())
2045            }
2046            Err(error) => {
2047                let _ = self.conn.execute("ROLLBACK", []);
2048                Err(error)
2049            }
2050        }
2051    }
2052
2053    fn load_old_sessions(&self) -> Result<Vec<OldSessionRow>> {
2054        let has_repo = self.table_has_column("sessions", "repo")?;
2055        let sql = if has_repo {
2056            "SELECT session_id, session_timestamp, cwd, repo, cli_version, source_file_path FROM sessions"
2057        } else {
2058            "SELECT session_id, session_timestamp, cwd, '' AS repo, cli_version, source_file_path FROM sessions"
2059        };
2060        let mut statement = self.conn.prepare(sql)?;
2061        let rows = statement.query_map([], |row| {
2062            Ok(OldSessionRow {
2063                session_id: row.get(0)?,
2064                session_timestamp: row.get(1)?,
2065                cwd: row.get(2)?,
2066                repo: row.get(3)?,
2067                cli_version: row.get(4)?,
2068                source_file_path: row.get(5)?,
2069            })
2070        })?;
2071
2072        rows.collect::<std::result::Result<Vec<_>, _>>()
2073            .map_err(Into::into)
2074    }
2075
2076    fn load_old_events(&self) -> Result<Vec<OldEventRow>> {
2077        if !self.table_exists("events")? {
2078            return Ok(Vec::new());
2079        }
2080
2081        let mut statement = self.conn.prepare(
2082            r#"
2083            SELECT
2084                session_id, kind, role, text, command, cwd, exit_code,
2085                source_timestamp, source_file_path, source_line_number
2086            FROM events
2087            ORDER BY id ASC
2088            "#,
2089        )?;
2090        let rows = statement.query_map([], |row| {
2091            Ok(OldEventRow {
2092                session_id: row.get(0)?,
2093                kind: row.get(1)?,
2094                role: row.get(2)?,
2095                text: row.get(3)?,
2096                command: row.get(4)?,
2097                cwd: row.get(5)?,
2098                exit_code: row.get(6)?,
2099                source_timestamp: row.get(7)?,
2100                source_file_path: row.get(8)?,
2101                source_line_number: row.get(9)?,
2102            })
2103        })?;
2104
2105        rows.collect::<std::result::Result<Vec<_>, _>>()
2106            .map_err(Into::into)
2107    }
2108}
2109
2110fn configure_write_connection(conn: &Connection) -> Result<()> {
2111    conn.busy_timeout(Duration::from_secs(30))?;
2112    conn.execute_batch(
2113        r#"
2114        PRAGMA journal_mode = WAL;
2115        PRAGMA synchronous = NORMAL;
2116        PRAGMA temp_store = MEMORY;
2117        "#,
2118    )?;
2119    Ok(())
2120}
2121
2122fn configure_read_connection(conn: &Connection) -> Result<()> {
2123    conn.busy_timeout(Duration::from_secs(30))?;
2124    conn.execute_batch(
2125        r#"
2126        PRAGMA query_only = ON;
2127        PRAGMA temp_store = MEMORY;
2128        "#,
2129    )?;
2130    Ok(())
2131}
2132
2133enum SinceFilter {
2134    Absolute(String),
2135    LastDays(u32),
2136    Today,
2137    Yesterday,
2138}
2139
2140fn parse_date_filter(value: &str, flag_name: &str) -> Result<SinceFilter> {
2141    let trimmed = value.trim();
2142    let lower = trimmed.to_ascii_lowercase();
2143    if lower == "today" {
2144        return Ok(SinceFilter::Today);
2145    }
2146    if lower == "yesterday" {
2147        return Ok(SinceFilter::Yesterday);
2148    }
2149    if let Some(days) = lower.strip_suffix('d') {
2150        let days = days
2151            .parse::<u32>()
2152            .with_context(|| format!("parse {flag_name} relative day value `{value}`"))?;
2153        if days == 0 {
2154            return Ok(SinceFilter::Today);
2155        }
2156        return Ok(SinceFilter::LastDays(days));
2157    }
2158    if looks_like_absolute_date(trimmed) {
2159        return Ok(SinceFilter::Absolute(trimmed.to_owned()));
2160    }
2161
2162    anyhow::bail!(
2163        "unsupported {flag_name} value `{value}`; use YYYY-MM-DD, today, yesterday, or Nd like 7d"
2164    )
2165}
2166
2167fn looks_like_absolute_date(value: &str) -> bool {
2168    let bytes = value.as_bytes();
2169    bytes.len() >= 10
2170        && bytes[0..4].iter().all(|byte| byte.is_ascii_digit())
2171        && bytes[4] == b'-'
2172        && bytes[5..7].iter().all(|byte| byte.is_ascii_digit())
2173        && bytes[7] == b'-'
2174        && bytes[8..10].iter().all(|byte| byte.is_ascii_digit())
2175}
2176
2177fn append_since_clause(
2178    sql: &mut String,
2179    query_params: &mut Vec<String>,
2180    value: &str,
2181) -> Result<()> {
2182    append_lower_bound_clause(sql, query_params, value, "--since")
2183}
2184
2185fn append_lower_bound_clause(
2186    sql: &mut String,
2187    query_params: &mut Vec<String>,
2188    value: &str,
2189    flag_name: &str,
2190) -> Result<()> {
2191    sql.push_str(
2192        " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) >= ",
2193    );
2194    match parse_date_filter(value, flag_name)? {
2195        SinceFilter::Absolute(value) => {
2196            sql.push_str("datetime(?)");
2197            query_params.push(value);
2198        }
2199        SinceFilter::LastDays(days) => {
2200            sql.push_str("datetime('now', ?)");
2201            query_params.push(format!("-{days} days"));
2202        }
2203        SinceFilter::Today => {
2204            sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
2205        }
2206        SinceFilter::Yesterday => {
2207            sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
2208        }
2209    }
2210    Ok(())
2211}
2212
2213fn append_until_clause(
2214    sql: &mut String,
2215    query_params: &mut Vec<String>,
2216    value: &str,
2217) -> Result<()> {
2218    sql.push_str(
2219        " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) < ",
2220    );
2221    match parse_date_filter(value, "--until")? {
2222        SinceFilter::Absolute(value) => {
2223            sql.push_str("datetime(?)");
2224            query_params.push(value);
2225        }
2226        SinceFilter::LastDays(days) => {
2227            sql.push_str("datetime('now', ?)");
2228            query_params.push(format!("-{days} days"));
2229        }
2230        SinceFilter::Today => {
2231            sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
2232        }
2233        SinceFilter::Yesterday => {
2234            sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
2235        }
2236    }
2237    Ok(())
2238}
2239
2240fn append_from_until_clauses(
2241    sql: &mut String,
2242    query_params: &mut Vec<String>,
2243    since: Option<&String>,
2244    from: Option<&String>,
2245    until: Option<&String>,
2246) -> Result<()> {
2247    if since.is_some() && from.is_some() {
2248        bail!("use either --since or --from, not both");
2249    }
2250    if let Some(since) = since {
2251        append_since_clause(sql, query_params, since)?;
2252    } else if let Some(from) = from {
2253        append_lower_bound_clause(sql, query_params, from, "--from")?;
2254    }
2255    if let Some(until) = until {
2256        append_until_clause(sql, query_params, until)?;
2257    }
2258    Ok(())
2259}
2260
2261fn append_excluded_sessions_clause(
2262    sql: &mut String,
2263    query_params: &mut Vec<String>,
2264    excluded_sessions: &[String],
2265) {
2266    for excluded_session in excluded_sessions {
2267        sql.push_str(" AND sessions.session_id != ? AND sessions.session_key != ?");
2268        query_params.push(excluded_session.clone());
2269        query_params.push(excluded_session.clone());
2270    }
2271}
2272
2273fn append_event_kind_clause(
2274    sql: &mut String,
2275    query_params: &mut Vec<String>,
2276    column_name: &str,
2277    kinds: &[EventKind],
2278) {
2279    if kinds.is_empty() {
2280        return;
2281    }
2282    sql.push_str(" AND ");
2283    sql.push_str(column_name);
2284    sql.push_str(" IN (");
2285    sql.push_str(&placeholders(kinds.len()));
2286    sql.push(')');
2287    append_kind_params(query_params, kinds);
2288}
2289
2290fn append_recent_event_kind_clause(
2291    sql: &mut String,
2292    query_params: &mut Vec<String>,
2293    kinds: &[EventKind],
2294) {
2295    if kinds.is_empty() {
2296        return;
2297    }
2298    sql.push_str(
2299        r#"
2300        AND EXISTS (
2301            SELECT 1 FROM events kind_events
2302            WHERE kind_events.session_key = sessions.session_key
2303              AND kind_events.kind IN (
2304        "#,
2305    );
2306    sql.push_str(&placeholders(kinds.len()));
2307    sql.push_str("))");
2308    append_kind_params(query_params, kinds);
2309}
2310
2311fn append_memory_filter_clauses(
2312    sql: &mut String,
2313    query_params: &mut Vec<String>,
2314    options: &MemorySearchOptions,
2315) -> Result<()> {
2316    append_memory_kind_clause(sql, query_params, &options.kinds);
2317    if let Some(repo) = &options.repo {
2318        sql.push_str(
2319            r#"
2320            AND EXISTS (
2321                SELECT 1
2322                FROM memory_evidence
2323                JOIN sessions ON sessions.session_key = memory_evidence.session_key
2324                WHERE memory_evidence.memory_id = memory_objects.id
2325                  AND lower(sessions.repo) = lower(?)
2326            )
2327            "#,
2328        );
2329        query_params.push(repo.clone());
2330    }
2331    if let Some(cwd) = &options.cwd {
2332        sql.push_str(
2333            r#"
2334            AND EXISTS (
2335                SELECT 1
2336                FROM memory_evidence
2337                JOIN sessions ON sessions.session_key = memory_evidence.session_key
2338                WHERE memory_evidence.memory_id = memory_objects.id
2339                  AND sessions.cwd LIKE '%' || ? || '%'
2340            )
2341            "#,
2342        );
2343        query_params.push(cwd.clone());
2344    }
2345    if options.since.is_some() || options.from.is_some() || options.until.is_some() {
2346        let mut time_clause = String::new();
2347        let mut time_params = Vec::new();
2348        append_from_until_clauses(
2349            &mut time_clause,
2350            &mut time_params,
2351            options.since.as_ref(),
2352            options.from.as_ref(),
2353            options.until.as_ref(),
2354        )?;
2355        if !time_clause.is_empty() {
2356            let clause = time_clause
2357                .replacen(" AND ", "", 1)
2358                .replace("sessions.", "related_sessions.");
2359            sql.push_str(
2360                r#"
2361                AND EXISTS (
2362                    SELECT 1
2363                    FROM memory_evidence
2364                    JOIN sessions AS related_sessions ON related_sessions.session_key = memory_evidence.session_key
2365                    WHERE memory_evidence.memory_id = memory_objects.id
2366                "#,
2367            );
2368            sql.push_str(" AND ");
2369            sql.push_str(&clause);
2370            sql.push(')');
2371            query_params.extend(time_params);
2372        }
2373    }
2374    Ok(())
2375}
2376
2377fn append_memory_kind_clause(
2378    sql: &mut String,
2379    query_params: &mut Vec<String>,
2380    kinds: &[MemoryKind],
2381) {
2382    if kinds.is_empty() {
2383        return;
2384    }
2385    sql.push_str(" AND memory_objects.kind IN (");
2386    sql.push_str(&placeholders(kinds.len()));
2387    sql.push(')');
2388    query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
2389}
2390
2391fn placeholders(count: usize) -> String {
2392    std::iter::repeat_n("?", count)
2393        .collect::<Vec<_>>()
2394        .join(", ")
2395}
2396
2397#[derive(Debug, Clone, PartialEq, Eq)]
2398struct DeltaCursor {
2399    change_id: i64,
2400}
2401
2402#[derive(Debug, Clone, PartialEq, Eq)]
2403struct ChangeLogRow {
2404    seq: i64,
2405    object_type: String,
2406    object_id: String,
2407    action: String,
2408    recorded_at: String,
2409}
2410
2411fn parse_delta_cursor(value: &str) -> Option<DeltaCursor> {
2412    let change_id = value.strip_prefix("chg_")?.parse::<i64>().ok()?;
2413    Some(DeltaCursor { change_id })
2414}
2415
2416pub fn encode_delta_cursor(item: &DeltaItem) -> String {
2417    format!("chg_{}", item.change_id())
2418}
2419
2420fn append_kind_params(query_params: &mut Vec<String>, kinds: &[EventKind]) {
2421    query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
2422}
2423
2424struct SessionGroup {
2425    session_key: String,
2426    session_id: String,
2427    source_file_path: PathBuf,
2428    repo_matches_current: bool,
2429    hit_count: usize,
2430    best_score: f64,
2431    best_kind_weight: u8,
2432    session_timestamp: String,
2433    results: Vec<SearchResult>,
2434}
2435
2436fn rank_search_results(
2437    results: Vec<SearchResult>,
2438    _current_repo: Option<&str>,
2439    limit: usize,
2440    include_duplicates: bool,
2441) -> Vec<SearchResult> {
2442    let mut groups = Vec::<SessionGroup>::new();
2443
2444    for result in results {
2445        let kind_weight = event_kind_weight(result.kind);
2446        if let Some(group) = groups
2447            .iter_mut()
2448            .find(|group| group.session_key == result.session_key)
2449        {
2450            group.hit_count += 1;
2451            group.best_score = group.best_score.min(result.score);
2452            group.best_kind_weight = group.best_kind_weight.min(kind_weight);
2453            group.results.push(result);
2454        } else {
2455            groups.push(SessionGroup {
2456                session_key: result.session_key.clone(),
2457                session_id: result.session_id.clone(),
2458                source_file_path: result.source_file_path.clone(),
2459                repo_matches_current: result.repo_matches_current,
2460                hit_count: 1,
2461                best_score: result.score,
2462                best_kind_weight: kind_weight,
2463                session_timestamp: result.session_timestamp.clone(),
2464                results: vec![result],
2465            });
2466        }
2467    }
2468
2469    if !include_duplicates {
2470        groups = dedupe_session_groups(groups);
2471    }
2472
2473    groups.sort_by(|left, right| {
2474        right
2475            .repo_matches_current
2476            .cmp(&left.repo_matches_current)
2477            .then_with(|| right.hit_count.cmp(&left.hit_count))
2478            .then_with(|| left.best_kind_weight.cmp(&right.best_kind_weight))
2479            .then_with(|| {
2480                left.best_score
2481                    .partial_cmp(&right.best_score)
2482                    .unwrap_or(std::cmp::Ordering::Equal)
2483            })
2484            .then_with(|| right.session_timestamp.cmp(&left.session_timestamp))
2485            .then_with(|| left.session_key.cmp(&right.session_key))
2486    });
2487
2488    let mut ranked = Vec::new();
2489    for mut group in groups {
2490        group.results.sort_by(|left, right| {
2491            left.score
2492                .partial_cmp(&right.score)
2493                .unwrap_or(std::cmp::Ordering::Equal)
2494                .then_with(|| event_kind_weight(left.kind).cmp(&event_kind_weight(right.kind)))
2495                .then_with(|| left.source_line_number.cmp(&right.source_line_number))
2496        });
2497        for result in &mut group.results {
2498            result.session_hit_count = group.hit_count;
2499            result.best_kind_weight = group.best_kind_weight;
2500        }
2501        ranked.extend(group.results);
2502        if ranked.len() >= limit {
2503            ranked.truncate(limit);
2504            break;
2505        }
2506    }
2507
2508    ranked
2509}
2510
2511fn dedupe_session_groups(groups: Vec<SessionGroup>) -> Vec<SessionGroup> {
2512    let mut selected = HashMap::<String, SessionGroup>::new();
2513    for group in groups {
2514        match selected.entry(group.session_id.clone()) {
2515            Entry::Occupied(mut entry) => {
2516                if is_preferred_group(&group, entry.get()) {
2517                    entry.insert(group);
2518                }
2519            }
2520            Entry::Vacant(entry) => {
2521                entry.insert(group);
2522            }
2523        }
2524    }
2525    selected.into_values().collect()
2526}
2527
2528fn is_preferred_group(candidate: &SessionGroup, current: &SessionGroup) -> bool {
2529    let candidate_priority = source_priority(&candidate.source_file_path);
2530    let current_priority = source_priority(&current.source_file_path);
2531    candidate_priority < current_priority
2532        || (candidate_priority == current_priority
2533            && candidate.repo_matches_current
2534            && !current.repo_matches_current)
2535        || (candidate_priority == current_priority
2536            && candidate.repo_matches_current == current.repo_matches_current
2537            && candidate.session_timestamp > current.session_timestamp)
2538        || (candidate_priority == current_priority
2539            && candidate.repo_matches_current == current.repo_matches_current
2540            && candidate.session_timestamp == current.session_timestamp
2541            && candidate.session_key < current.session_key)
2542}
2543
2544fn dedupe_recent_sessions(sessions: Vec<RecentSession>) -> Vec<RecentSession> {
2545    let mut selected = HashMap::<String, RecentSession>::new();
2546    for session in sessions {
2547        match selected.entry(session.session_id.clone()) {
2548            Entry::Occupied(mut entry) => {
2549                if is_preferred_recent_session(&session, entry.get()) {
2550                    entry.insert(session);
2551                }
2552            }
2553            Entry::Vacant(entry) => {
2554                entry.insert(session);
2555            }
2556        }
2557    }
2558
2559    let mut sessions = selected.into_values().collect::<Vec<_>>();
2560    sessions.sort_by(|left, right| {
2561        right
2562            .session_timestamp
2563            .cmp(&left.session_timestamp)
2564            .then_with(|| {
2565                source_priority(&left.source_file_path)
2566                    .cmp(&source_priority(&right.source_file_path))
2567            })
2568            .then_with(|| left.session_key.cmp(&right.session_key))
2569    });
2570    sessions
2571}
2572
2573fn is_preferred_recent_session(candidate: &RecentSession, current: &RecentSession) -> bool {
2574    let candidate_priority = source_priority(&candidate.source_file_path);
2575    let current_priority = source_priority(&current.source_file_path);
2576    candidate_priority < current_priority
2577        || (candidate_priority == current_priority
2578            && candidate.session_timestamp > current.session_timestamp)
2579        || (candidate_priority == current_priority
2580            && candidate.session_timestamp == current.session_timestamp
2581            && candidate.session_key < current.session_key)
2582}
2583
2584fn source_priority(path: &Path) -> u8 {
2585    if path
2586        .components()
2587        .any(|component| component.as_os_str() == "archived_sessions")
2588    {
2589        return 2;
2590    }
2591    if path
2592        .components()
2593        .any(|component| component.as_os_str() == "sessions")
2594    {
2595        return 0;
2596    }
2597    1
2598}
2599
2600pub fn source_priority_for_path(path: &Path) -> u8 {
2601    source_priority(path)
2602}
2603
2604fn event_kind_weight(kind: EventKind) -> u8 {
2605    match kind {
2606        EventKind::UserMessage => 0,
2607        EventKind::AssistantMessage => 1,
2608        EventKind::Command => 2,
2609    }
2610}
2611
2612fn session_repos(parsed: &ParsedSession) -> BTreeSet<String> {
2613    let mut repos = BTreeSet::new();
2614    let session_repo = repo_slug(&parsed.session.cwd);
2615    if !session_repo.is_empty() {
2616        repos.insert(session_repo);
2617    }
2618
2619    for event in &parsed.events {
2620        let Some(cwd) = &event.cwd else {
2621            continue;
2622        };
2623        let repo = repo_slug(cwd);
2624        if !repo.is_empty() {
2625            repos.insert(repo);
2626        }
2627    }
2628
2629    repos
2630}
2631
2632fn fts_terms(query: &str) -> Vec<String> {
2633    let mut terms = Vec::new();
2634    let mut current = String::new();
2635
2636    for ch in query.chars() {
2637        if ch.is_alphanumeric() || ch == '_' {
2638            current.push(ch);
2639        } else if !current.is_empty() {
2640            terms.push(std::mem::take(&mut current));
2641        }
2642    }
2643
2644    if !current.is_empty() {
2645        terms.push(current);
2646    }
2647
2648    terms
2649}
2650
2651fn quote_fts_term(term: &str) -> String {
2652    format!("\"{}\"", term.replace('"', "\"\""))
2653}
2654
2655fn and_fts_query(terms: &[String]) -> String {
2656    terms
2657        .iter()
2658        .map(|term| quote_fts_term(term))
2659        .collect::<Vec<_>>()
2660        .join(" AND ")
2661}
2662
2663fn or_fts_query(terms: &[String]) -> String {
2664    terms
2665        .iter()
2666        .map(|term| quote_fts_term(term))
2667        .collect::<Vec<_>>()
2668        .join(" OR ")
2669}
2670
2671pub fn build_session_key(session_id: &str, source_file_path: &Path) -> String {
2672    session_key(session_id, source_file_path)
2673}
2674
2675fn session_key(session_id: &str, source_file_path: &Path) -> String {
2676    format!(
2677        "{}:{:016x}",
2678        session_id,
2679        fnv1a64(source_file_path.display().to_string().as_bytes())
2680    )
2681}
2682
2683fn fnv1a64(bytes: &[u8]) -> u64 {
2684    let mut hash = 0xcbf29ce484222325u64;
2685    for byte in bytes {
2686        hash ^= u64::from(*byte);
2687        hash = hash.wrapping_mul(0x100000001b3);
2688    }
2689    hash
2690}
2691
2692fn repo_slug(cwd: &str) -> String {
2693    Path::new(cwd)
2694        .file_name()
2695        .and_then(|name| name.to_str())
2696        .unwrap_or(cwd)
2697        .to_owned()
2698}
2699
2700#[cfg(test)]
2701mod tests;