Skip to main content

codex_recall/
store.rs

1use crate::parser::{EventKind, ParsedSession};
2use anyhow::{bail, Context, Result};
3use rusqlite::{params, params_from_iter, Connection, OpenFlags};
4use std::collections::{hash_map::Entry, BTreeSet, HashMap};
5use std::path::{Path, PathBuf};
6use std::time::Duration;
7
8const CONTENT_VERSION: i64 = 2;
9
10pub struct Store {
11    conn: Connection,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct Stats {
16    pub session_count: u64,
17    pub event_count: u64,
18    pub source_file_count: u64,
19    pub duplicate_source_file_count: u64,
20}
21
22#[derive(Debug, Clone, PartialEq)]
23pub struct SearchResult {
24    pub session_key: String,
25    pub session_id: String,
26    pub repo: String,
27    pub kind: EventKind,
28    pub text: String,
29    pub snippet: String,
30    pub score: f64,
31    pub session_timestamp: String,
32    pub cwd: String,
33    pub source_file_path: PathBuf,
34    pub source_line_number: usize,
35    pub source_timestamp: Option<String>,
36    repo_matches_current: bool,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct SearchOptions {
41    pub query: String,
42    pub limit: usize,
43    pub repo: Option<String>,
44    pub cwd: Option<String>,
45    pub since: Option<String>,
46    pub from: Option<String>,
47    pub until: Option<String>,
48    pub include_duplicates: bool,
49    pub exclude_sessions: Vec<String>,
50    pub kinds: Vec<EventKind>,
51    pub current_repo: Option<String>,
52}
53
54impl SearchOptions {
55    pub fn new(query: impl Into<String>, limit: usize) -> Self {
56        Self {
57            query: query.into(),
58            limit,
59            repo: None,
60            cwd: None,
61            since: None,
62            from: None,
63            until: None,
64            include_duplicates: false,
65            exclude_sessions: Vec::new(),
66            kinds: Vec::new(),
67            current_repo: None,
68        }
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct SessionEvent {
74    pub session_key: String,
75    pub session_id: String,
76    pub kind: EventKind,
77    pub text: String,
78    pub cwd: String,
79    pub source_file_path: PathBuf,
80    pub source_line_number: usize,
81    pub source_timestamp: Option<String>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct SessionMatch {
86    pub session_key: String,
87    pub session_id: String,
88    pub cwd: String,
89    pub repo: String,
90    pub source_file_path: PathBuf,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct RecentSession {
95    pub session_key: String,
96    pub session_id: String,
97    pub repo: String,
98    pub cwd: String,
99    pub session_timestamp: String,
100    pub source_file_path: PathBuf,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct RecentOptions {
105    pub limit: usize,
106    pub repo: Option<String>,
107    pub cwd: Option<String>,
108    pub since: Option<String>,
109    pub from: Option<String>,
110    pub until: Option<String>,
111    pub include_duplicates: bool,
112    pub exclude_sessions: Vec<String>,
113    pub kinds: Vec<EventKind>,
114}
115
116impl Default for RecentOptions {
117    fn default() -> Self {
118        Self {
119            limit: 20,
120            repo: None,
121            cwd: None,
122            since: None,
123            from: None,
124            until: None,
125            include_duplicates: false,
126            exclude_sessions: Vec::new(),
127            kinds: Vec::new(),
128        }
129    }
130}
131
132struct OldSessionRow {
133    session_id: String,
134    session_timestamp: String,
135    cwd: String,
136    repo: String,
137    cli_version: Option<String>,
138    source_file_path: String,
139}
140
141struct OldEventRow {
142    session_id: String,
143    kind: String,
144    role: Option<String>,
145    text: String,
146    command: Option<String>,
147    cwd: Option<String>,
148    exit_code: Option<i64>,
149    source_timestamp: Option<String>,
150    source_file_path: String,
151    source_line_number: i64,
152}
153
154impl Store {
155    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
156        let path = path.as_ref();
157        if let Some(parent) = path.parent() {
158            std::fs::create_dir_all(parent)
159                .with_context(|| format!("create db directory {}", parent.display()))?;
160        }
161
162        let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
163        configure_write_connection(&conn)?;
164        let store = Self { conn };
165        store.init_schema()?;
166        Ok(store)
167    }
168
169    pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
170        let path = path.as_ref();
171        let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
172            .with_context(|| format!("open db read-only {}", path.display()))?;
173        configure_read_connection(&conn)?;
174        Ok(Self { conn })
175    }
176
177    pub fn index_session(&self, parsed: &ParsedSession) -> Result<()> {
178        self.conn.execute("BEGIN IMMEDIATE", [])?;
179        let result = self.index_session_inner(parsed);
180        match result {
181            Ok(()) => {
182                self.conn.execute("COMMIT", [])?;
183                Ok(())
184            }
185            Err(error) => {
186                let _ = self.conn.execute("ROLLBACK", []);
187                Err(error)
188            }
189        }
190    }
191
192    pub fn stats(&self) -> Result<Stats> {
193        let session_count = self
194            .conn
195            .query_row("SELECT COUNT(*) FROM sessions", [], |row| {
196                row.get::<_, u64>(0)
197            })?;
198        let event_count = self
199            .conn
200            .query_row("SELECT COUNT(*) FROM events", [], |row| {
201                row.get::<_, u64>(0)
202            })?;
203        let source_file_count =
204            self.conn
205                .query_row("SELECT COUNT(*) FROM ingestion_state", [], |row| {
206                    row.get::<_, u64>(0)
207                })?;
208        let unique_ingested_sessions = self.conn.query_row(
209            "SELECT COUNT(DISTINCT COALESCE(session_key, session_id)) FROM ingestion_state WHERE COALESCE(session_key, session_id) IS NOT NULL",
210            [],
211            |row| row.get::<_, u64>(0),
212        )?;
213
214        Ok(Stats {
215            session_count,
216            event_count,
217            source_file_count,
218            duplicate_source_file_count: source_file_count.saturating_sub(unique_ingested_sessions),
219        })
220    }
221
222    pub fn quick_check(&self) -> Result<String> {
223        self.conn
224            .query_row("PRAGMA quick_check", [], |row| row.get::<_, String>(0))
225            .map_err(Into::into)
226    }
227
228    pub fn fts_integrity_check(&self) -> Result<()> {
229        self.conn.execute(
230            "INSERT INTO events_fts(events_fts) VALUES('integrity-check')",
231            [],
232        )?;
233        Ok(())
234    }
235
236    pub fn fts_read_check(&self) -> Result<()> {
237        self.conn
238            .query_row("SELECT COUNT(*) FROM events_fts", [], |row| {
239                row.get::<_, i64>(0)
240            })
241            .map(|_| ())
242            .map_err(Into::into)
243    }
244
245    pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
246        self.search_with_options(SearchOptions::new(query, limit))
247    }
248
249    pub fn search_with_options(&self, options: SearchOptions) -> Result<Vec<SearchResult>> {
250        let terms = fts_terms(&options.query);
251        if terms.is_empty() {
252            return Ok(Vec::new());
253        }
254
255        let limit = options.limit.clamp(1, 100);
256        let fetch_limit = limit.saturating_mul(50).clamp(200, 1_000);
257        let results = self.search_with_fts_query(&options, &and_fts_query(&terms), fetch_limit)?;
258        if !results.is_empty() || terms.len() == 1 {
259            return Ok(rank_search_results(
260                results,
261                options.current_repo.as_deref(),
262                limit,
263                options.include_duplicates,
264            ));
265        }
266
267        let results = self.search_with_fts_query(&options, &or_fts_query(&terms), fetch_limit)?;
268        Ok(rank_search_results(
269            results,
270            options.current_repo.as_deref(),
271            limit,
272            options.include_duplicates,
273        ))
274    }
275
276    fn search_with_fts_query(
277        &self,
278        options: &SearchOptions,
279        fts_query: &str,
280        limit: usize,
281    ) -> Result<Vec<SearchResult>> {
282        let mut query_params = Vec::<String>::new();
283        let current_repo_expr = if let Some(current_repo) = &options.current_repo {
284            query_params.push(current_repo.clone());
285            "EXISTS (
286                    SELECT 1 FROM session_repos current_repos
287                    WHERE current_repos.session_key = sessions.session_key
288                      AND lower(current_repos.repo) = lower(?)
289                )"
290        } else {
291            "0"
292        };
293        query_params.push(fts_query.to_owned());
294
295        let mut sql = format!(
296            r#"
297            SELECT
298                events.session_key,
299                events.session_id,
300                sessions.repo,
301                events.kind,
302                events.text,
303                snippet(events_fts, 4, '', '', ' ... ', 16) AS snippet,
304                events_fts.rank AS score,
305                sessions.session_timestamp,
306                sessions.cwd,
307                events.source_file_path,
308                events.source_line_number,
309                events.source_timestamp,
310                {current_repo_expr} AS current_repo_match
311            FROM events_fts
312            JOIN events ON events.id = events_fts.event_id
313            JOIN sessions ON sessions.session_key = events.session_key
314            WHERE events_fts MATCH ?
315            "#,
316        );
317
318        if let Some(repo) = &options.repo {
319            sql.push_str(
320                r#"
321                AND EXISTS (
322                    SELECT 1 FROM session_repos filter_repos
323                    WHERE filter_repos.session_key = sessions.session_key
324                      AND lower(filter_repos.repo) = lower(?)
325                )
326                "#,
327            );
328            query_params.push(repo.clone());
329        }
330        if let Some(cwd) = &options.cwd {
331            sql.push_str(
332                r#"
333                AND (
334                    sessions.cwd LIKE '%' || ? || '%'
335                    OR EXISTS (
336                        SELECT 1 FROM events cwd_events
337                        WHERE cwd_events.session_key = sessions.session_key
338                          AND cwd_events.cwd LIKE '%' || ? || '%'
339                    )
340                )
341                "#,
342            );
343            query_params.push(cwd.clone());
344            query_params.push(cwd.clone());
345        }
346        append_from_until_clauses(
347            &mut sql,
348            &mut query_params,
349            options.since.as_ref(),
350            options.from.as_ref(),
351            options.until.as_ref(),
352        )?;
353        append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
354        append_event_kind_clause(&mut sql, &mut query_params, "events.kind", &options.kinds);
355
356        sql.push_str(" ORDER BY events_fts.rank ASC, events.source_line_number ASC LIMIT ");
357        sql.push_str(&limit.to_string());
358
359        let mut statement = self.conn.prepare(&sql)?;
360
361        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
362            let kind_text: String = row.get(3)?;
363            let kind = kind_text.parse::<EventKind>().map_err(|_| {
364                rusqlite::Error::InvalidColumnType(
365                    3,
366                    "kind".to_owned(),
367                    rusqlite::types::Type::Text,
368                )
369            })?;
370            let source_file_path: String = row.get(9)?;
371            let source_line_number: i64 = row.get(10)?;
372
373            Ok(SearchResult {
374                session_key: row.get(0)?,
375                session_id: row.get(1)?,
376                repo: row.get(2)?,
377                kind,
378                text: row.get(4)?,
379                snippet: row.get(5)?,
380                score: row.get(6)?,
381                session_timestamp: row.get(7)?,
382                cwd: row.get(8)?,
383                source_file_path: PathBuf::from(source_file_path),
384                source_line_number: source_line_number as usize,
385                source_timestamp: row.get(11)?,
386                repo_matches_current: row.get::<_, i64>(12)? != 0,
387            })
388        })?;
389
390        rows.collect::<std::result::Result<Vec<_>, _>>()
391            .map_err(Into::into)
392    }
393
394    pub fn resolve_session_reference(&self, reference: &str) -> Result<Vec<SessionMatch>> {
395        let mut statement = self.conn.prepare(
396            r#"
397            SELECT session_key, session_id, cwd, repo, source_file_path
398            FROM sessions
399            WHERE session_key = ? OR session_id = ?
400            ORDER BY session_timestamp DESC, source_file_path ASC
401            "#,
402        )?;
403        let rows = statement.query_map(params![reference, reference], |row| {
404            let source_file_path: String = row.get(4)?;
405            Ok(SessionMatch {
406                session_key: row.get(0)?,
407                session_id: row.get(1)?,
408                cwd: row.get(2)?,
409                repo: row.get(3)?,
410                source_file_path: PathBuf::from(source_file_path),
411            })
412        })?;
413
414        rows.collect::<std::result::Result<Vec<_>, _>>()
415            .map_err(Into::into)
416    }
417
418    pub fn recent_sessions(&self, options: RecentOptions) -> Result<Vec<RecentSession>> {
419        let limit = options.limit.clamp(1, 100);
420        let fetch_limit = if options.include_duplicates {
421            limit
422        } else {
423            limit.saturating_mul(5).clamp(limit, 500)
424        };
425        let mut query_params = Vec::<String>::new();
426        let mut sql = r#"
427            SELECT
428                sessions.session_key,
429                sessions.session_id,
430                sessions.repo,
431                sessions.cwd,
432                sessions.session_timestamp,
433                sessions.source_file_path
434            FROM sessions
435            WHERE 1 = 1
436            "#
437        .to_owned();
438
439        if let Some(repo) = &options.repo {
440            sql.push_str(
441                r#"
442                AND EXISTS (
443                    SELECT 1 FROM session_repos filter_repos
444                    WHERE filter_repos.session_key = sessions.session_key
445                      AND lower(filter_repos.repo) = lower(?)
446                )
447                "#,
448            );
449            query_params.push(repo.clone());
450        }
451        if let Some(cwd) = &options.cwd {
452            sql.push_str(
453                r#"
454                AND (
455                    sessions.cwd LIKE '%' || ? || '%'
456                    OR EXISTS (
457                        SELECT 1 FROM events cwd_events
458                        WHERE cwd_events.session_key = sessions.session_key
459                          AND cwd_events.cwd LIKE '%' || ? || '%'
460                    )
461                )
462                "#,
463            );
464            query_params.push(cwd.clone());
465            query_params.push(cwd.clone());
466        }
467        append_from_until_clauses(
468            &mut sql,
469            &mut query_params,
470            options.since.as_ref(),
471            options.from.as_ref(),
472            options.until.as_ref(),
473        )?;
474        append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
475        append_recent_event_kind_clause(&mut sql, &mut query_params, &options.kinds);
476
477        sql.push_str(
478            r#"
479            ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
480                     sessions.source_file_path ASC
481            LIMIT ?
482            "#,
483        );
484        query_params.push(fetch_limit.to_string());
485
486        let mut statement = self.conn.prepare(&sql)?;
487        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
488            let source_file_path: String = row.get(5)?;
489            Ok(RecentSession {
490                session_key: row.get(0)?,
491                session_id: row.get(1)?,
492                repo: row.get(2)?,
493                cwd: row.get(3)?,
494                session_timestamp: row.get(4)?,
495                source_file_path: PathBuf::from(source_file_path),
496            })
497        })?;
498
499        let mut sessions = rows.collect::<std::result::Result<Vec<_>, _>>()?;
500        if !options.include_duplicates {
501            sessions = dedupe_recent_sessions(sessions);
502        }
503        sessions.truncate(limit);
504        Ok(sessions)
505    }
506
507    pub fn session_events(&self, session_key: &str, limit: usize) -> Result<Vec<SessionEvent>> {
508        self.session_events_with_kinds(session_key, limit, &[])
509    }
510
511    pub fn session_events_with_kinds(
512        &self,
513        session_key: &str,
514        limit: usize,
515        kinds: &[EventKind],
516    ) -> Result<Vec<SessionEvent>> {
517        let limit = limit.clamp(1, 500);
518        let mut query_params = vec![session_key.to_owned()];
519        let mut sql = r#"
520            SELECT
521                events.session_key,
522                events.session_id,
523                events.kind,
524                events.text,
525                sessions.cwd,
526                events.source_file_path,
527                events.source_line_number,
528                events.source_timestamp
529            FROM events
530            JOIN sessions ON sessions.session_key = events.session_key
531            WHERE events.session_key = ?
532            "#
533        .to_owned();
534        append_event_kind_clause(&mut sql, &mut query_params, "events.kind", kinds);
535        sql.push_str(
536            r#"
537            ORDER BY events.source_line_number ASC
538            LIMIT ?
539            "#,
540        );
541        query_params.push(limit.to_string());
542
543        let mut statement = self.conn.prepare(&sql)?;
544
545        let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
546            let kind_text: String = row.get(2)?;
547            let kind = kind_text.parse::<EventKind>().map_err(|_| {
548                rusqlite::Error::InvalidColumnType(
549                    2,
550                    "kind".to_owned(),
551                    rusqlite::types::Type::Text,
552                )
553            })?;
554            let source_file_path: String = row.get(5)?;
555            let source_line_number: i64 = row.get(6)?;
556
557            Ok(SessionEvent {
558                session_key: row.get(0)?,
559                session_id: row.get(1)?,
560                kind,
561                text: row.get(3)?,
562                cwd: row.get(4)?,
563                source_file_path: PathBuf::from(source_file_path),
564                source_line_number: source_line_number as usize,
565                source_timestamp: row.get(7)?,
566            })
567        })?;
568
569        rows.collect::<std::result::Result<Vec<_>, _>>()
570            .map_err(Into::into)
571    }
572
573    pub fn session_repos(&self, session_key: &str) -> Result<Vec<String>> {
574        let mut statement = self.conn.prepare(
575            r#"
576            SELECT repo
577            FROM session_repos
578            WHERE session_key = ?
579            ORDER BY lower(repo) ASC
580            "#,
581        )?;
582        let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
583
584        rows.collect::<std::result::Result<Vec<_>, _>>()
585            .map_err(Into::into)
586    }
587
588    pub fn is_source_current(
589        &self,
590        source_file_path: &Path,
591        source_file_mtime_ns: i64,
592        source_file_size: i64,
593    ) -> Result<bool> {
594        let count = self.conn.query_row(
595            r#"
596            SELECT COUNT(*)
597            FROM ingestion_state
598            WHERE source_file_path = ?
599              AND source_file_mtime_ns = ?
600              AND source_file_size = ?
601              AND content_version = ?
602            "#,
603            params![
604                source_file_path.display().to_string(),
605                source_file_mtime_ns,
606                source_file_size,
607                CONTENT_VERSION,
608            ],
609            |row| row.get::<_, i64>(0),
610        )?;
611        Ok(count > 0)
612    }
613
614    pub fn mark_source_indexed(
615        &self,
616        source_file_path: &Path,
617        source_file_mtime_ns: i64,
618        source_file_size: i64,
619        session_id: Option<&str>,
620        session_key: Option<&str>,
621    ) -> Result<()> {
622        self.conn.execute(
623            r#"
624            INSERT INTO ingestion_state (
625                source_file_path, source_file_mtime_ns, source_file_size, session_id, session_key, content_version, indexed_at
626            ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
627            ON CONFLICT(source_file_path) DO UPDATE SET
628                source_file_mtime_ns = excluded.source_file_mtime_ns,
629                source_file_size = excluded.source_file_size,
630                session_id = excluded.session_id,
631                session_key = excluded.session_key,
632                content_version = excluded.content_version,
633                indexed_at = excluded.indexed_at
634            "#,
635            params![
636                source_file_path.display().to_string(),
637                source_file_mtime_ns,
638                source_file_size,
639                session_id,
640                session_key,
641                CONTENT_VERSION,
642            ],
643        )?;
644        Ok(())
645    }
646
647    pub fn last_indexed_at(&self) -> Result<Option<String>> {
648        self.conn
649            .query_row("SELECT MAX(indexed_at) FROM ingestion_state", [], |row| {
650                row.get::<_, Option<String>>(0)
651            })
652            .map_err(Into::into)
653    }
654
655    fn init_schema(&self) -> Result<()> {
656        self.conn.execute_batch(
657            r#"
658            PRAGMA journal_mode = WAL;
659            PRAGMA synchronous = NORMAL;
660            "#,
661        )?;
662
663        if self.table_exists("sessions")? && !self.table_has_column("sessions", "session_key")? {
664            self.migrate_to_session_key_schema()?;
665        }
666
667        self.create_schema_objects()?;
668        self.ensure_ingestion_state_session_key_column()?;
669        self.ensure_ingestion_state_content_version_column()?;
670        self.backfill_session_repos()?;
671        self.backfill_session_repo_memberships()?;
672        self.backfill_ingestion_session_keys()?;
673        Ok(())
674    }
675
676    fn create_schema_objects(&self) -> Result<()> {
677        self.conn.execute_batch(
678            r#"
679            CREATE TABLE IF NOT EXISTS sessions (
680                session_key TEXT PRIMARY KEY,
681                session_id TEXT NOT NULL,
682                session_timestamp TEXT NOT NULL,
683                cwd TEXT NOT NULL,
684                repo TEXT NOT NULL DEFAULT '',
685                cli_version TEXT,
686                source_file_path TEXT NOT NULL
687            );
688
689            CREATE INDEX IF NOT EXISTS sessions_session_id_idx ON sessions(session_id);
690            CREATE INDEX IF NOT EXISTS sessions_repo_idx ON sessions(repo);
691
692            CREATE TABLE IF NOT EXISTS session_repos (
693                session_key TEXT NOT NULL,
694                repo TEXT NOT NULL,
695                PRIMARY KEY(session_key, repo),
696                FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
697            );
698
699            CREATE INDEX IF NOT EXISTS session_repos_repo_idx ON session_repos(repo);
700
701            CREATE TABLE IF NOT EXISTS events (
702                id INTEGER PRIMARY KEY AUTOINCREMENT,
703                session_key TEXT NOT NULL,
704                session_id TEXT NOT NULL,
705                kind TEXT NOT NULL,
706                role TEXT,
707                text TEXT NOT NULL,
708                command TEXT,
709                cwd TEXT,
710                exit_code INTEGER,
711                source_timestamp TEXT,
712                source_file_path TEXT NOT NULL,
713                source_line_number INTEGER NOT NULL,
714                FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
715            );
716
717            CREATE INDEX IF NOT EXISTS events_session_key_idx ON events(session_key);
718            CREATE INDEX IF NOT EXISTS events_session_id_idx ON events(session_id);
719            CREATE INDEX IF NOT EXISTS events_source_idx ON events(source_file_path, source_line_number);
720
721            CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
722                event_id UNINDEXED,
723                session_key UNINDEXED,
724                session_id UNINDEXED,
725                kind UNINDEXED,
726                text,
727                tokenize = 'porter unicode61'
728            );
729
730            CREATE TABLE IF NOT EXISTS ingestion_state (
731                source_file_path TEXT PRIMARY KEY,
732                source_file_mtime_ns INTEGER NOT NULL,
733                source_file_size INTEGER NOT NULL,
734                session_id TEXT,
735                session_key TEXT,
736                content_version INTEGER NOT NULL DEFAULT 2,
737                indexed_at TEXT NOT NULL
738            );
739            "#,
740        )?;
741        Ok(())
742    }
743
744    fn index_session_inner(&self, parsed: &ParsedSession) -> Result<()> {
745        let session_key = session_key(&parsed.session.id, &parsed.session.source_file_path);
746        let repo = repo_slug(&parsed.session.cwd);
747        self.conn.execute(
748            r#"
749            INSERT INTO sessions (
750                session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path
751            ) VALUES (?, ?, ?, ?, ?, ?, ?)
752            ON CONFLICT(session_key) DO UPDATE SET
753                session_id = excluded.session_id,
754                session_timestamp = excluded.session_timestamp,
755                cwd = excluded.cwd,
756                repo = excluded.repo,
757                cli_version = excluded.cli_version,
758                source_file_path = excluded.source_file_path
759            "#,
760            params![
761                session_key.as_str(),
762                parsed.session.id,
763                parsed.session.timestamp,
764                parsed.session.cwd,
765                repo,
766                parsed.session.cli_version,
767                parsed.session.source_file_path.display().to_string(),
768            ],
769        )?;
770
771        self.conn.execute(
772            "DELETE FROM events_fts WHERE session_key = ?",
773            params![session_key.as_str()],
774        )?;
775        self.conn.execute(
776            "DELETE FROM events WHERE session_key = ?",
777            params![session_key.as_str()],
778        )?;
779        self.conn.execute(
780            "DELETE FROM session_repos WHERE session_key = ?",
781            params![session_key.as_str()],
782        )?;
783
784        for repo in session_repos(parsed) {
785            self.conn.execute(
786                "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
787                params![session_key.as_str(), repo],
788            )?;
789        }
790
791        for event in &parsed.events {
792            self.conn.execute(
793                r#"
794                INSERT INTO events (
795                    session_key, session_id, kind, role, text, command, cwd, exit_code,
796                    source_timestamp, source_file_path, source_line_number
797                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
798                "#,
799                params![
800                    session_key.as_str(),
801                    parsed.session.id,
802                    event.kind.as_str(),
803                    event.role,
804                    event.text,
805                    event.command,
806                    event.cwd,
807                    event.exit_code,
808                    event.source_timestamp,
809                    event.source_file_path.display().to_string(),
810                    event.source_line_number as i64,
811                ],
812            )?;
813            let event_id = self.conn.last_insert_rowid();
814            self.conn.execute(
815                "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
816                params![
817                    event_id,
818                    session_key.as_str(),
819                    parsed.session.id,
820                    event.kind.as_str(),
821                    event.text
822                ],
823            )?;
824        }
825
826        Ok(())
827    }
828
829    fn backfill_session_repos(&self) -> Result<()> {
830        let mut statement = self
831            .conn
832            .prepare("SELECT session_key, cwd FROM sessions WHERE repo = ''")?;
833        let rows = statement.query_map([], |row| {
834            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
835        })?;
836
837        for row in rows {
838            let (session_key, cwd) = row?;
839            self.conn.execute(
840                "UPDATE sessions SET repo = ? WHERE session_key = ?",
841                params![repo_slug(&cwd), session_key],
842            )?;
843        }
844        Ok(())
845    }
846
847    fn table_exists(&self, table_name: &str) -> Result<bool> {
848        let count = self.conn.query_row(
849            "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?",
850            params![table_name],
851            |row| row.get::<_, i64>(0),
852        )?;
853        Ok(count > 0)
854    }
855
856    fn table_has_column(&self, table_name: &str, column_name: &str) -> Result<bool> {
857        let mut statement = self
858            .conn
859            .prepare(&format!("PRAGMA table_info({table_name})"))?;
860        let columns = statement
861            .query_map([], |row| row.get::<_, String>(1))?
862            .collect::<std::result::Result<Vec<_>, _>>()?;
863        Ok(columns.iter().any(|column| column == column_name))
864    }
865
866    fn ensure_ingestion_state_session_key_column(&self) -> Result<()> {
867        if self.table_has_column("ingestion_state", "session_key")? {
868            return Ok(());
869        }
870
871        match self.conn.execute(
872            "ALTER TABLE ingestion_state ADD COLUMN session_key TEXT",
873            [],
874        ) {
875            Ok(_) => Ok(()),
876            Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
877            Err(error) => Err(error.into()),
878        }
879    }
880
881    fn ensure_ingestion_state_content_version_column(&self) -> Result<()> {
882        if self.table_has_column("ingestion_state", "content_version")? {
883            return Ok(());
884        }
885
886        match self.conn.execute(
887            "ALTER TABLE ingestion_state ADD COLUMN content_version INTEGER NOT NULL DEFAULT 0",
888            [],
889        ) {
890            Ok(_) => Ok(()),
891            Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
892            Err(error) => Err(error.into()),
893        }
894    }
895
896    fn backfill_ingestion_session_keys(&self) -> Result<()> {
897        if !self.table_exists("ingestion_state")? {
898            return Ok(());
899        }
900
901        self.conn.execute(
902            r#"
903            UPDATE ingestion_state
904            SET session_key = (
905                SELECT sessions.session_key
906                FROM sessions
907                WHERE sessions.source_file_path = ingestion_state.source_file_path
908                LIMIT 1
909            )
910            WHERE session_key IS NULL
911              AND session_id IS NOT NULL
912            "#,
913            [],
914        )?;
915        Ok(())
916    }
917
918    fn backfill_session_repo_memberships(&self) -> Result<()> {
919        self.conn.execute(
920            r#"
921            INSERT OR IGNORE INTO session_repos (session_key, repo)
922            SELECT session_key, repo
923            FROM sessions
924            WHERE repo != ''
925            "#,
926            [],
927        )?;
928
929        let mut statement = self.conn.prepare(
930            r#"
931            SELECT DISTINCT session_key, cwd
932            FROM events
933            WHERE cwd IS NOT NULL
934              AND cwd != ''
935            "#,
936        )?;
937        let rows = statement.query_map([], |row| {
938            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
939        })?;
940
941        for row in rows {
942            let (session_key, cwd) = row?;
943            let repo = repo_slug(&cwd);
944            if repo.is_empty() {
945                continue;
946            }
947            self.conn.execute(
948                "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
949                params![session_key, repo],
950            )?;
951        }
952
953        Ok(())
954    }
955
956    fn migrate_to_session_key_schema(&self) -> Result<()> {
957        let old_sessions = self.load_old_sessions()?;
958        let old_events = self.load_old_events()?;
959        let mut keys_by_session_id = HashMap::new();
960        for session in &old_sessions {
961            keys_by_session_id.insert(
962                session.session_id.clone(),
963                session_key(&session.session_id, Path::new(&session.source_file_path)),
964            );
965        }
966
967        self.conn.execute("BEGIN IMMEDIATE", [])?;
968        let result = (|| -> Result<()> {
969            self.conn.execute_batch(
970                r#"
971                DROP TABLE IF EXISTS events_fts;
972                DROP TABLE IF EXISTS events;
973                DROP TABLE IF EXISTS sessions;
974                "#,
975            )?;
976            self.create_schema_objects()?;
977
978            for session in &old_sessions {
979                let session_key = keys_by_session_id
980                    .get(&session.session_id)
981                    .expect("session key exists");
982                let repo = if session.repo.is_empty() {
983                    repo_slug(&session.cwd)
984                } else {
985                    session.repo.clone()
986                };
987                self.conn.execute(
988                    r#"
989                    INSERT INTO sessions (
990                        session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path
991                    ) VALUES (?, ?, ?, ?, ?, ?, ?)
992                    "#,
993                    params![
994                        session_key,
995                        session.session_id,
996                        session.session_timestamp,
997                        session.cwd,
998                        repo,
999                        session.cli_version,
1000                        session.source_file_path,
1001                    ],
1002                )?;
1003                self.conn.execute(
1004                    "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1005                    params![session_key, repo],
1006                )?;
1007            }
1008
1009            for event in &old_events {
1010                let Some(session_key) = keys_by_session_id.get(&event.session_id) else {
1011                    continue;
1012                };
1013                self.conn.execute(
1014                    r#"
1015                    INSERT INTO events (
1016                        session_key, session_id, kind, role, text, command, cwd, exit_code,
1017                        source_timestamp, source_file_path, source_line_number
1018                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1019                    "#,
1020                    params![
1021                        session_key,
1022                        event.session_id,
1023                        event.kind,
1024                        event.role,
1025                        event.text,
1026                        event.command,
1027                        event.cwd,
1028                        event.exit_code,
1029                        event.source_timestamp,
1030                        event.source_file_path,
1031                        event.source_line_number,
1032                    ],
1033                )?;
1034                let event_id = self.conn.last_insert_rowid();
1035                self.conn.execute(
1036                    "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
1037                    params![
1038                        event_id,
1039                        session_key,
1040                        event.session_id,
1041                        event.kind,
1042                        event.text
1043                    ],
1044                )?;
1045            }
1046
1047            Ok(())
1048        })();
1049
1050        match result {
1051            Ok(()) => {
1052                self.conn.execute("COMMIT", [])?;
1053                Ok(())
1054            }
1055            Err(error) => {
1056                let _ = self.conn.execute("ROLLBACK", []);
1057                Err(error)
1058            }
1059        }
1060    }
1061
1062    fn load_old_sessions(&self) -> Result<Vec<OldSessionRow>> {
1063        let has_repo = self.table_has_column("sessions", "repo")?;
1064        let sql = if has_repo {
1065            "SELECT session_id, session_timestamp, cwd, repo, cli_version, source_file_path FROM sessions"
1066        } else {
1067            "SELECT session_id, session_timestamp, cwd, '' AS repo, cli_version, source_file_path FROM sessions"
1068        };
1069        let mut statement = self.conn.prepare(sql)?;
1070        let rows = statement.query_map([], |row| {
1071            Ok(OldSessionRow {
1072                session_id: row.get(0)?,
1073                session_timestamp: row.get(1)?,
1074                cwd: row.get(2)?,
1075                repo: row.get(3)?,
1076                cli_version: row.get(4)?,
1077                source_file_path: row.get(5)?,
1078            })
1079        })?;
1080
1081        rows.collect::<std::result::Result<Vec<_>, _>>()
1082            .map_err(Into::into)
1083    }
1084
1085    fn load_old_events(&self) -> Result<Vec<OldEventRow>> {
1086        if !self.table_exists("events")? {
1087            return Ok(Vec::new());
1088        }
1089
1090        let mut statement = self.conn.prepare(
1091            r#"
1092            SELECT
1093                session_id, kind, role, text, command, cwd, exit_code,
1094                source_timestamp, source_file_path, source_line_number
1095            FROM events
1096            ORDER BY id ASC
1097            "#,
1098        )?;
1099        let rows = statement.query_map([], |row| {
1100            Ok(OldEventRow {
1101                session_id: row.get(0)?,
1102                kind: row.get(1)?,
1103                role: row.get(2)?,
1104                text: row.get(3)?,
1105                command: row.get(4)?,
1106                cwd: row.get(5)?,
1107                exit_code: row.get(6)?,
1108                source_timestamp: row.get(7)?,
1109                source_file_path: row.get(8)?,
1110                source_line_number: row.get(9)?,
1111            })
1112        })?;
1113
1114        rows.collect::<std::result::Result<Vec<_>, _>>()
1115            .map_err(Into::into)
1116    }
1117}
1118
1119fn configure_write_connection(conn: &Connection) -> Result<()> {
1120    conn.busy_timeout(Duration::from_secs(30))?;
1121    conn.execute_batch(
1122        r#"
1123        PRAGMA journal_mode = WAL;
1124        PRAGMA synchronous = NORMAL;
1125        PRAGMA temp_store = MEMORY;
1126        "#,
1127    )?;
1128    Ok(())
1129}
1130
1131fn configure_read_connection(conn: &Connection) -> Result<()> {
1132    conn.busy_timeout(Duration::from_secs(30))?;
1133    conn.execute_batch(
1134        r#"
1135        PRAGMA query_only = ON;
1136        PRAGMA temp_store = MEMORY;
1137        "#,
1138    )?;
1139    Ok(())
1140}
1141
1142enum SinceFilter {
1143    Absolute(String),
1144    LastDays(u32),
1145    Today,
1146    Yesterday,
1147}
1148
1149fn parse_date_filter(value: &str, flag_name: &str) -> Result<SinceFilter> {
1150    let trimmed = value.trim();
1151    let lower = trimmed.to_ascii_lowercase();
1152    if lower == "today" {
1153        return Ok(SinceFilter::Today);
1154    }
1155    if lower == "yesterday" {
1156        return Ok(SinceFilter::Yesterday);
1157    }
1158    if let Some(days) = lower.strip_suffix('d') {
1159        let days = days
1160            .parse::<u32>()
1161            .with_context(|| format!("parse {flag_name} relative day value `{value}`"))?;
1162        if days == 0 {
1163            return Ok(SinceFilter::Today);
1164        }
1165        return Ok(SinceFilter::LastDays(days));
1166    }
1167    if looks_like_absolute_date(trimmed) {
1168        return Ok(SinceFilter::Absolute(trimmed.to_owned()));
1169    }
1170
1171    anyhow::bail!(
1172        "unsupported {flag_name} value `{value}`; use YYYY-MM-DD, today, yesterday, or Nd like 7d"
1173    )
1174}
1175
1176fn looks_like_absolute_date(value: &str) -> bool {
1177    let bytes = value.as_bytes();
1178    bytes.len() >= 10
1179        && bytes[0..4].iter().all(|byte| byte.is_ascii_digit())
1180        && bytes[4] == b'-'
1181        && bytes[5..7].iter().all(|byte| byte.is_ascii_digit())
1182        && bytes[7] == b'-'
1183        && bytes[8..10].iter().all(|byte| byte.is_ascii_digit())
1184}
1185
1186fn append_since_clause(
1187    sql: &mut String,
1188    query_params: &mut Vec<String>,
1189    value: &str,
1190) -> Result<()> {
1191    append_lower_bound_clause(sql, query_params, value, "--since")
1192}
1193
1194fn append_lower_bound_clause(
1195    sql: &mut String,
1196    query_params: &mut Vec<String>,
1197    value: &str,
1198    flag_name: &str,
1199) -> Result<()> {
1200    sql.push_str(
1201        " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) >= ",
1202    );
1203    match parse_date_filter(value, flag_name)? {
1204        SinceFilter::Absolute(value) => {
1205            sql.push_str("datetime(?)");
1206            query_params.push(value);
1207        }
1208        SinceFilter::LastDays(days) => {
1209            sql.push_str("datetime('now', ?)");
1210            query_params.push(format!("-{days} days"));
1211        }
1212        SinceFilter::Today => {
1213            sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
1214        }
1215        SinceFilter::Yesterday => {
1216            sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
1217        }
1218    }
1219    Ok(())
1220}
1221
1222fn append_until_clause(
1223    sql: &mut String,
1224    query_params: &mut Vec<String>,
1225    value: &str,
1226) -> Result<()> {
1227    sql.push_str(
1228        " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) < ",
1229    );
1230    match parse_date_filter(value, "--until")? {
1231        SinceFilter::Absolute(value) => {
1232            sql.push_str("datetime(?)");
1233            query_params.push(value);
1234        }
1235        SinceFilter::LastDays(days) => {
1236            sql.push_str("datetime('now', ?)");
1237            query_params.push(format!("-{days} days"));
1238        }
1239        SinceFilter::Today => {
1240            sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
1241        }
1242        SinceFilter::Yesterday => {
1243            sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
1244        }
1245    }
1246    Ok(())
1247}
1248
1249fn append_from_until_clauses(
1250    sql: &mut String,
1251    query_params: &mut Vec<String>,
1252    since: Option<&String>,
1253    from: Option<&String>,
1254    until: Option<&String>,
1255) -> Result<()> {
1256    if since.is_some() && from.is_some() {
1257        bail!("use either --since or --from, not both");
1258    }
1259    if let Some(since) = since {
1260        append_since_clause(sql, query_params, since)?;
1261    } else if let Some(from) = from {
1262        append_lower_bound_clause(sql, query_params, from, "--from")?;
1263    }
1264    if let Some(until) = until {
1265        append_until_clause(sql, query_params, until)?;
1266    }
1267    Ok(())
1268}
1269
1270fn append_excluded_sessions_clause(
1271    sql: &mut String,
1272    query_params: &mut Vec<String>,
1273    excluded_sessions: &[String],
1274) {
1275    for excluded_session in excluded_sessions {
1276        sql.push_str(" AND sessions.session_id != ? AND sessions.session_key != ?");
1277        query_params.push(excluded_session.clone());
1278        query_params.push(excluded_session.clone());
1279    }
1280}
1281
1282fn append_event_kind_clause(
1283    sql: &mut String,
1284    query_params: &mut Vec<String>,
1285    column_name: &str,
1286    kinds: &[EventKind],
1287) {
1288    if kinds.is_empty() {
1289        return;
1290    }
1291    sql.push_str(" AND ");
1292    sql.push_str(column_name);
1293    sql.push_str(" IN (");
1294    sql.push_str(&placeholders(kinds.len()));
1295    sql.push(')');
1296    append_kind_params(query_params, kinds);
1297}
1298
1299fn append_recent_event_kind_clause(
1300    sql: &mut String,
1301    query_params: &mut Vec<String>,
1302    kinds: &[EventKind],
1303) {
1304    if kinds.is_empty() {
1305        return;
1306    }
1307    sql.push_str(
1308        r#"
1309        AND EXISTS (
1310            SELECT 1 FROM events kind_events
1311            WHERE kind_events.session_key = sessions.session_key
1312              AND kind_events.kind IN (
1313        "#,
1314    );
1315    sql.push_str(&placeholders(kinds.len()));
1316    sql.push_str("))");
1317    append_kind_params(query_params, kinds);
1318}
1319
1320fn placeholders(count: usize) -> String {
1321    std::iter::repeat_n("?", count)
1322        .collect::<Vec<_>>()
1323        .join(", ")
1324}
1325
1326fn append_kind_params(query_params: &mut Vec<String>, kinds: &[EventKind]) {
1327    query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
1328}
1329
1330struct SessionGroup {
1331    session_key: String,
1332    session_id: String,
1333    source_file_path: PathBuf,
1334    repo_matches_current: bool,
1335    hit_count: usize,
1336    best_score: f64,
1337    best_kind_weight: u8,
1338    session_timestamp: String,
1339    results: Vec<SearchResult>,
1340}
1341
1342fn rank_search_results(
1343    results: Vec<SearchResult>,
1344    _current_repo: Option<&str>,
1345    limit: usize,
1346    include_duplicates: bool,
1347) -> Vec<SearchResult> {
1348    let mut groups = Vec::<SessionGroup>::new();
1349
1350    for result in results {
1351        let kind_weight = event_kind_weight(result.kind);
1352        if let Some(group) = groups
1353            .iter_mut()
1354            .find(|group| group.session_key == result.session_key)
1355        {
1356            group.hit_count += 1;
1357            group.best_score = group.best_score.min(result.score);
1358            group.best_kind_weight = group.best_kind_weight.min(kind_weight);
1359            group.results.push(result);
1360        } else {
1361            groups.push(SessionGroup {
1362                session_key: result.session_key.clone(),
1363                session_id: result.session_id.clone(),
1364                source_file_path: result.source_file_path.clone(),
1365                repo_matches_current: result.repo_matches_current,
1366                hit_count: 1,
1367                best_score: result.score,
1368                best_kind_weight: kind_weight,
1369                session_timestamp: result.session_timestamp.clone(),
1370                results: vec![result],
1371            });
1372        }
1373    }
1374
1375    if !include_duplicates {
1376        groups = dedupe_session_groups(groups);
1377    }
1378
1379    groups.sort_by(|left, right| {
1380        right
1381            .repo_matches_current
1382            .cmp(&left.repo_matches_current)
1383            .then_with(|| right.hit_count.cmp(&left.hit_count))
1384            .then_with(|| left.best_kind_weight.cmp(&right.best_kind_weight))
1385            .then_with(|| {
1386                left.best_score
1387                    .partial_cmp(&right.best_score)
1388                    .unwrap_or(std::cmp::Ordering::Equal)
1389            })
1390            .then_with(|| right.session_timestamp.cmp(&left.session_timestamp))
1391            .then_with(|| left.session_key.cmp(&right.session_key))
1392    });
1393
1394    let mut ranked = Vec::new();
1395    for mut group in groups {
1396        group.results.sort_by(|left, right| {
1397            left.score
1398                .partial_cmp(&right.score)
1399                .unwrap_or(std::cmp::Ordering::Equal)
1400                .then_with(|| event_kind_weight(left.kind).cmp(&event_kind_weight(right.kind)))
1401                .then_with(|| left.source_line_number.cmp(&right.source_line_number))
1402        });
1403        ranked.extend(group.results);
1404        if ranked.len() >= limit {
1405            ranked.truncate(limit);
1406            break;
1407        }
1408    }
1409
1410    ranked
1411}
1412
1413fn dedupe_session_groups(groups: Vec<SessionGroup>) -> Vec<SessionGroup> {
1414    let mut selected = HashMap::<String, SessionGroup>::new();
1415    for group in groups {
1416        match selected.entry(group.session_id.clone()) {
1417            Entry::Occupied(mut entry) => {
1418                if is_preferred_group(&group, entry.get()) {
1419                    entry.insert(group);
1420                }
1421            }
1422            Entry::Vacant(entry) => {
1423                entry.insert(group);
1424            }
1425        }
1426    }
1427    selected.into_values().collect()
1428}
1429
1430fn is_preferred_group(candidate: &SessionGroup, current: &SessionGroup) -> bool {
1431    let candidate_priority = source_priority(&candidate.source_file_path);
1432    let current_priority = source_priority(&current.source_file_path);
1433    candidate_priority < current_priority
1434        || (candidate_priority == current_priority
1435            && candidate.repo_matches_current
1436            && !current.repo_matches_current)
1437        || (candidate_priority == current_priority
1438            && candidate.repo_matches_current == current.repo_matches_current
1439            && candidate.session_timestamp > current.session_timestamp)
1440        || (candidate_priority == current_priority
1441            && candidate.repo_matches_current == current.repo_matches_current
1442            && candidate.session_timestamp == current.session_timestamp
1443            && candidate.session_key < current.session_key)
1444}
1445
1446fn dedupe_recent_sessions(sessions: Vec<RecentSession>) -> Vec<RecentSession> {
1447    let mut selected = HashMap::<String, RecentSession>::new();
1448    for session in sessions {
1449        match selected.entry(session.session_id.clone()) {
1450            Entry::Occupied(mut entry) => {
1451                if is_preferred_recent_session(&session, entry.get()) {
1452                    entry.insert(session);
1453                }
1454            }
1455            Entry::Vacant(entry) => {
1456                entry.insert(session);
1457            }
1458        }
1459    }
1460
1461    let mut sessions = selected.into_values().collect::<Vec<_>>();
1462    sessions.sort_by(|left, right| {
1463        right
1464            .session_timestamp
1465            .cmp(&left.session_timestamp)
1466            .then_with(|| {
1467                source_priority(&left.source_file_path)
1468                    .cmp(&source_priority(&right.source_file_path))
1469            })
1470            .then_with(|| left.session_key.cmp(&right.session_key))
1471    });
1472    sessions
1473}
1474
1475fn is_preferred_recent_session(candidate: &RecentSession, current: &RecentSession) -> bool {
1476    let candidate_priority = source_priority(&candidate.source_file_path);
1477    let current_priority = source_priority(&current.source_file_path);
1478    candidate_priority < current_priority
1479        || (candidate_priority == current_priority
1480            && candidate.session_timestamp > current.session_timestamp)
1481        || (candidate_priority == current_priority
1482            && candidate.session_timestamp == current.session_timestamp
1483            && candidate.session_key < current.session_key)
1484}
1485
1486fn source_priority(path: &Path) -> u8 {
1487    if path
1488        .components()
1489        .any(|component| component.as_os_str() == "archived_sessions")
1490    {
1491        return 2;
1492    }
1493    if path
1494        .components()
1495        .any(|component| component.as_os_str() == "sessions")
1496    {
1497        return 0;
1498    }
1499    1
1500}
1501
1502fn event_kind_weight(kind: EventKind) -> u8 {
1503    match kind {
1504        EventKind::UserMessage => 0,
1505        EventKind::AssistantMessage => 1,
1506        EventKind::Command => 2,
1507    }
1508}
1509
1510fn session_repos(parsed: &ParsedSession) -> BTreeSet<String> {
1511    let mut repos = BTreeSet::new();
1512    let session_repo = repo_slug(&parsed.session.cwd);
1513    if !session_repo.is_empty() {
1514        repos.insert(session_repo);
1515    }
1516
1517    for event in &parsed.events {
1518        let Some(cwd) = &event.cwd else {
1519            continue;
1520        };
1521        let repo = repo_slug(cwd);
1522        if !repo.is_empty() {
1523            repos.insert(repo);
1524        }
1525    }
1526
1527    repos
1528}
1529
1530fn fts_terms(query: &str) -> Vec<String> {
1531    let mut terms = Vec::new();
1532    let mut current = String::new();
1533
1534    for ch in query.chars() {
1535        if ch.is_alphanumeric() || ch == '_' {
1536            current.push(ch);
1537        } else if !current.is_empty() {
1538            terms.push(std::mem::take(&mut current));
1539        }
1540    }
1541
1542    if !current.is_empty() {
1543        terms.push(current);
1544    }
1545
1546    terms
1547}
1548
1549fn quote_fts_term(term: &str) -> String {
1550    format!("\"{}\"", term.replace('"', "\"\""))
1551}
1552
1553fn and_fts_query(terms: &[String]) -> String {
1554    terms
1555        .iter()
1556        .map(|term| quote_fts_term(term))
1557        .collect::<Vec<_>>()
1558        .join(" AND ")
1559}
1560
1561fn or_fts_query(terms: &[String]) -> String {
1562    terms
1563        .iter()
1564        .map(|term| quote_fts_term(term))
1565        .collect::<Vec<_>>()
1566        .join(" OR ")
1567}
1568
1569pub fn build_session_key(session_id: &str, source_file_path: &Path) -> String {
1570    session_key(session_id, source_file_path)
1571}
1572
1573fn session_key(session_id: &str, source_file_path: &Path) -> String {
1574    format!(
1575        "{}:{:016x}",
1576        session_id,
1577        fnv1a64(source_file_path.display().to_string().as_bytes())
1578    )
1579}
1580
1581fn fnv1a64(bytes: &[u8]) -> u64 {
1582    let mut hash = 0xcbf29ce484222325u64;
1583    for byte in bytes {
1584        hash ^= u64::from(*byte);
1585        hash = hash.wrapping_mul(0x100000001b3);
1586    }
1587    hash
1588}
1589
1590fn repo_slug(cwd: &str) -> String {
1591    Path::new(cwd)
1592        .file_name()
1593        .and_then(|name| name.to_str())
1594        .unwrap_or(cwd)
1595        .to_owned()
1596}
1597
1598#[cfg(test)]
1599mod tests;