1use crate::parser::{EventKind, ParsedSession};
2use anyhow::{bail, Context, Result};
3use rusqlite::{params, params_from_iter, Connection, OpenFlags};
4use std::collections::{hash_map::Entry, BTreeSet, HashMap};
5use std::path::{Path, PathBuf};
6use std::time::Duration;
7
8const CONTENT_VERSION: i64 = 2;
9
10pub struct Store {
11 conn: Connection,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct Stats {
16 pub session_count: u64,
17 pub event_count: u64,
18 pub source_file_count: u64,
19 pub duplicate_source_file_count: u64,
20}
21
22#[derive(Debug, Clone, PartialEq)]
23pub struct SearchResult {
24 pub session_key: String,
25 pub session_id: String,
26 pub repo: String,
27 pub kind: EventKind,
28 pub text: String,
29 pub snippet: String,
30 pub score: f64,
31 pub session_timestamp: String,
32 pub cwd: String,
33 pub source_file_path: PathBuf,
34 pub source_line_number: usize,
35 pub source_timestamp: Option<String>,
36 repo_matches_current: bool,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct SearchOptions {
41 pub query: String,
42 pub limit: usize,
43 pub repo: Option<String>,
44 pub cwd: Option<String>,
45 pub since: Option<String>,
46 pub from: Option<String>,
47 pub until: Option<String>,
48 pub include_duplicates: bool,
49 pub exclude_sessions: Vec<String>,
50 pub kinds: Vec<EventKind>,
51 pub current_repo: Option<String>,
52}
53
54impl SearchOptions {
55 pub fn new(query: impl Into<String>, limit: usize) -> Self {
56 Self {
57 query: query.into(),
58 limit,
59 repo: None,
60 cwd: None,
61 since: None,
62 from: None,
63 until: None,
64 include_duplicates: false,
65 exclude_sessions: Vec::new(),
66 kinds: Vec::new(),
67 current_repo: None,
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct SessionEvent {
74 pub session_key: String,
75 pub session_id: String,
76 pub kind: EventKind,
77 pub text: String,
78 pub cwd: String,
79 pub source_file_path: PathBuf,
80 pub source_line_number: usize,
81 pub source_timestamp: Option<String>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct SessionMatch {
86 pub session_key: String,
87 pub session_id: String,
88 pub cwd: String,
89 pub repo: String,
90 pub source_file_path: PathBuf,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct RecentSession {
95 pub session_key: String,
96 pub session_id: String,
97 pub repo: String,
98 pub cwd: String,
99 pub session_timestamp: String,
100 pub source_file_path: PathBuf,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct RecentOptions {
105 pub limit: usize,
106 pub repo: Option<String>,
107 pub cwd: Option<String>,
108 pub since: Option<String>,
109 pub from: Option<String>,
110 pub until: Option<String>,
111 pub include_duplicates: bool,
112 pub exclude_sessions: Vec<String>,
113 pub kinds: Vec<EventKind>,
114}
115
116impl Default for RecentOptions {
117 fn default() -> Self {
118 Self {
119 limit: 20,
120 repo: None,
121 cwd: None,
122 since: None,
123 from: None,
124 until: None,
125 include_duplicates: false,
126 exclude_sessions: Vec::new(),
127 kinds: Vec::new(),
128 }
129 }
130}
131
132struct OldSessionRow {
133 session_id: String,
134 session_timestamp: String,
135 cwd: String,
136 repo: String,
137 cli_version: Option<String>,
138 source_file_path: String,
139}
140
141struct OldEventRow {
142 session_id: String,
143 kind: String,
144 role: Option<String>,
145 text: String,
146 command: Option<String>,
147 cwd: Option<String>,
148 exit_code: Option<i64>,
149 source_timestamp: Option<String>,
150 source_file_path: String,
151 source_line_number: i64,
152}
153
154impl Store {
155 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
156 let path = path.as_ref();
157 if let Some(parent) = path.parent() {
158 std::fs::create_dir_all(parent)
159 .with_context(|| format!("create db directory {}", parent.display()))?;
160 }
161
162 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
163 configure_write_connection(&conn)?;
164 let store = Self { conn };
165 store.init_schema()?;
166 Ok(store)
167 }
168
169 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
170 let path = path.as_ref();
171 let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
172 .with_context(|| format!("open db read-only {}", path.display()))?;
173 configure_read_connection(&conn)?;
174 Ok(Self { conn })
175 }
176
177 pub fn index_session(&self, parsed: &ParsedSession) -> Result<()> {
178 self.conn.execute("BEGIN IMMEDIATE", [])?;
179 let result = self.index_session_inner(parsed);
180 match result {
181 Ok(()) => {
182 self.conn.execute("COMMIT", [])?;
183 Ok(())
184 }
185 Err(error) => {
186 let _ = self.conn.execute("ROLLBACK", []);
187 Err(error)
188 }
189 }
190 }
191
192 pub fn stats(&self) -> Result<Stats> {
193 let session_count = self
194 .conn
195 .query_row("SELECT COUNT(*) FROM sessions", [], |row| {
196 row.get::<_, u64>(0)
197 })?;
198 let event_count = self
199 .conn
200 .query_row("SELECT COUNT(*) FROM events", [], |row| {
201 row.get::<_, u64>(0)
202 })?;
203 let source_file_count =
204 self.conn
205 .query_row("SELECT COUNT(*) FROM ingestion_state", [], |row| {
206 row.get::<_, u64>(0)
207 })?;
208 let unique_ingested_sessions = self.conn.query_row(
209 "SELECT COUNT(DISTINCT COALESCE(session_key, session_id)) FROM ingestion_state WHERE COALESCE(session_key, session_id) IS NOT NULL",
210 [],
211 |row| row.get::<_, u64>(0),
212 )?;
213
214 Ok(Stats {
215 session_count,
216 event_count,
217 source_file_count,
218 duplicate_source_file_count: source_file_count.saturating_sub(unique_ingested_sessions),
219 })
220 }
221
222 pub fn quick_check(&self) -> Result<String> {
223 self.conn
224 .query_row("PRAGMA quick_check", [], |row| row.get::<_, String>(0))
225 .map_err(Into::into)
226 }
227
228 pub fn fts_integrity_check(&self) -> Result<()> {
229 self.conn.execute(
230 "INSERT INTO events_fts(events_fts) VALUES('integrity-check')",
231 [],
232 )?;
233 Ok(())
234 }
235
236 pub fn fts_read_check(&self) -> Result<()> {
237 self.conn
238 .query_row("SELECT COUNT(*) FROM events_fts", [], |row| {
239 row.get::<_, i64>(0)
240 })
241 .map(|_| ())
242 .map_err(Into::into)
243 }
244
245 pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
246 self.search_with_options(SearchOptions::new(query, limit))
247 }
248
249 pub fn search_with_options(&self, options: SearchOptions) -> Result<Vec<SearchResult>> {
250 let terms = fts_terms(&options.query);
251 if terms.is_empty() {
252 return Ok(Vec::new());
253 }
254
255 let limit = options.limit.clamp(1, 100);
256 let fetch_limit = limit.saturating_mul(50).clamp(200, 1_000);
257 let results = self.search_with_fts_query(&options, &and_fts_query(&terms), fetch_limit)?;
258 if !results.is_empty() || terms.len() == 1 {
259 return Ok(rank_search_results(
260 results,
261 options.current_repo.as_deref(),
262 limit,
263 options.include_duplicates,
264 ));
265 }
266
267 let results = self.search_with_fts_query(&options, &or_fts_query(&terms), fetch_limit)?;
268 Ok(rank_search_results(
269 results,
270 options.current_repo.as_deref(),
271 limit,
272 options.include_duplicates,
273 ))
274 }
275
276 fn search_with_fts_query(
277 &self,
278 options: &SearchOptions,
279 fts_query: &str,
280 limit: usize,
281 ) -> Result<Vec<SearchResult>> {
282 let mut query_params = Vec::<String>::new();
283 let current_repo_expr = if let Some(current_repo) = &options.current_repo {
284 query_params.push(current_repo.clone());
285 "EXISTS (
286 SELECT 1 FROM session_repos current_repos
287 WHERE current_repos.session_key = sessions.session_key
288 AND lower(current_repos.repo) = lower(?)
289 )"
290 } else {
291 "0"
292 };
293 query_params.push(fts_query.to_owned());
294
295 let mut sql = format!(
296 r#"
297 SELECT
298 events.session_key,
299 events.session_id,
300 sessions.repo,
301 events.kind,
302 events.text,
303 snippet(events_fts, 4, '', '', ' ... ', 16) AS snippet,
304 events_fts.rank AS score,
305 sessions.session_timestamp,
306 sessions.cwd,
307 events.source_file_path,
308 events.source_line_number,
309 events.source_timestamp,
310 {current_repo_expr} AS current_repo_match
311 FROM events_fts
312 JOIN events ON events.id = events_fts.event_id
313 JOIN sessions ON sessions.session_key = events.session_key
314 WHERE events_fts MATCH ?
315 "#,
316 );
317
318 if let Some(repo) = &options.repo {
319 sql.push_str(
320 r#"
321 AND EXISTS (
322 SELECT 1 FROM session_repos filter_repos
323 WHERE filter_repos.session_key = sessions.session_key
324 AND lower(filter_repos.repo) = lower(?)
325 )
326 "#,
327 );
328 query_params.push(repo.clone());
329 }
330 if let Some(cwd) = &options.cwd {
331 sql.push_str(
332 r#"
333 AND (
334 sessions.cwd LIKE '%' || ? || '%'
335 OR EXISTS (
336 SELECT 1 FROM events cwd_events
337 WHERE cwd_events.session_key = sessions.session_key
338 AND cwd_events.cwd LIKE '%' || ? || '%'
339 )
340 )
341 "#,
342 );
343 query_params.push(cwd.clone());
344 query_params.push(cwd.clone());
345 }
346 append_from_until_clauses(
347 &mut sql,
348 &mut query_params,
349 options.since.as_ref(),
350 options.from.as_ref(),
351 options.until.as_ref(),
352 )?;
353 append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
354 append_event_kind_clause(&mut sql, &mut query_params, "events.kind", &options.kinds);
355
356 sql.push_str(" ORDER BY events_fts.rank ASC, events.source_line_number ASC LIMIT ");
357 sql.push_str(&limit.to_string());
358
359 let mut statement = self.conn.prepare(&sql)?;
360
361 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
362 let kind_text: String = row.get(3)?;
363 let kind = kind_text.parse::<EventKind>().map_err(|_| {
364 rusqlite::Error::InvalidColumnType(
365 3,
366 "kind".to_owned(),
367 rusqlite::types::Type::Text,
368 )
369 })?;
370 let source_file_path: String = row.get(9)?;
371 let source_line_number: i64 = row.get(10)?;
372
373 Ok(SearchResult {
374 session_key: row.get(0)?,
375 session_id: row.get(1)?,
376 repo: row.get(2)?,
377 kind,
378 text: row.get(4)?,
379 snippet: row.get(5)?,
380 score: row.get(6)?,
381 session_timestamp: row.get(7)?,
382 cwd: row.get(8)?,
383 source_file_path: PathBuf::from(source_file_path),
384 source_line_number: source_line_number as usize,
385 source_timestamp: row.get(11)?,
386 repo_matches_current: row.get::<_, i64>(12)? != 0,
387 })
388 })?;
389
390 rows.collect::<std::result::Result<Vec<_>, _>>()
391 .map_err(Into::into)
392 }
393
394 pub fn resolve_session_reference(&self, reference: &str) -> Result<Vec<SessionMatch>> {
395 let mut statement = self.conn.prepare(
396 r#"
397 SELECT session_key, session_id, cwd, repo, source_file_path
398 FROM sessions
399 WHERE session_key = ? OR session_id = ?
400 ORDER BY session_timestamp DESC, source_file_path ASC
401 "#,
402 )?;
403 let rows = statement.query_map(params![reference, reference], |row| {
404 let source_file_path: String = row.get(4)?;
405 Ok(SessionMatch {
406 session_key: row.get(0)?,
407 session_id: row.get(1)?,
408 cwd: row.get(2)?,
409 repo: row.get(3)?,
410 source_file_path: PathBuf::from(source_file_path),
411 })
412 })?;
413
414 rows.collect::<std::result::Result<Vec<_>, _>>()
415 .map_err(Into::into)
416 }
417
418 pub fn recent_sessions(&self, options: RecentOptions) -> Result<Vec<RecentSession>> {
419 let limit = options.limit.clamp(1, 100);
420 let fetch_limit = if options.include_duplicates {
421 limit
422 } else {
423 limit.saturating_mul(5).clamp(limit, 500)
424 };
425 let mut query_params = Vec::<String>::new();
426 let mut sql = r#"
427 SELECT
428 sessions.session_key,
429 sessions.session_id,
430 sessions.repo,
431 sessions.cwd,
432 sessions.session_timestamp,
433 sessions.source_file_path
434 FROM sessions
435 WHERE 1 = 1
436 "#
437 .to_owned();
438
439 if let Some(repo) = &options.repo {
440 sql.push_str(
441 r#"
442 AND EXISTS (
443 SELECT 1 FROM session_repos filter_repos
444 WHERE filter_repos.session_key = sessions.session_key
445 AND lower(filter_repos.repo) = lower(?)
446 )
447 "#,
448 );
449 query_params.push(repo.clone());
450 }
451 if let Some(cwd) = &options.cwd {
452 sql.push_str(
453 r#"
454 AND (
455 sessions.cwd LIKE '%' || ? || '%'
456 OR EXISTS (
457 SELECT 1 FROM events cwd_events
458 WHERE cwd_events.session_key = sessions.session_key
459 AND cwd_events.cwd LIKE '%' || ? || '%'
460 )
461 )
462 "#,
463 );
464 query_params.push(cwd.clone());
465 query_params.push(cwd.clone());
466 }
467 append_from_until_clauses(
468 &mut sql,
469 &mut query_params,
470 options.since.as_ref(),
471 options.from.as_ref(),
472 options.until.as_ref(),
473 )?;
474 append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
475 append_recent_event_kind_clause(&mut sql, &mut query_params, &options.kinds);
476
477 sql.push_str(
478 r#"
479 ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
480 sessions.source_file_path ASC
481 LIMIT ?
482 "#,
483 );
484 query_params.push(fetch_limit.to_string());
485
486 let mut statement = self.conn.prepare(&sql)?;
487 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
488 let source_file_path: String = row.get(5)?;
489 Ok(RecentSession {
490 session_key: row.get(0)?,
491 session_id: row.get(1)?,
492 repo: row.get(2)?,
493 cwd: row.get(3)?,
494 session_timestamp: row.get(4)?,
495 source_file_path: PathBuf::from(source_file_path),
496 })
497 })?;
498
499 let mut sessions = rows.collect::<std::result::Result<Vec<_>, _>>()?;
500 if !options.include_duplicates {
501 sessions = dedupe_recent_sessions(sessions);
502 }
503 sessions.truncate(limit);
504 Ok(sessions)
505 }
506
507 pub fn session_events(&self, session_key: &str, limit: usize) -> Result<Vec<SessionEvent>> {
508 self.session_events_with_kinds(session_key, limit, &[])
509 }
510
511 pub fn session_events_with_kinds(
512 &self,
513 session_key: &str,
514 limit: usize,
515 kinds: &[EventKind],
516 ) -> Result<Vec<SessionEvent>> {
517 let limit = limit.clamp(1, 500);
518 let mut query_params = vec![session_key.to_owned()];
519 let mut sql = r#"
520 SELECT
521 events.session_key,
522 events.session_id,
523 events.kind,
524 events.text,
525 sessions.cwd,
526 events.source_file_path,
527 events.source_line_number,
528 events.source_timestamp
529 FROM events
530 JOIN sessions ON sessions.session_key = events.session_key
531 WHERE events.session_key = ?
532 "#
533 .to_owned();
534 append_event_kind_clause(&mut sql, &mut query_params, "events.kind", kinds);
535 sql.push_str(
536 r#"
537 ORDER BY events.source_line_number ASC
538 LIMIT ?
539 "#,
540 );
541 query_params.push(limit.to_string());
542
543 let mut statement = self.conn.prepare(&sql)?;
544
545 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
546 let kind_text: String = row.get(2)?;
547 let kind = kind_text.parse::<EventKind>().map_err(|_| {
548 rusqlite::Error::InvalidColumnType(
549 2,
550 "kind".to_owned(),
551 rusqlite::types::Type::Text,
552 )
553 })?;
554 let source_file_path: String = row.get(5)?;
555 let source_line_number: i64 = row.get(6)?;
556
557 Ok(SessionEvent {
558 session_key: row.get(0)?,
559 session_id: row.get(1)?,
560 kind,
561 text: row.get(3)?,
562 cwd: row.get(4)?,
563 source_file_path: PathBuf::from(source_file_path),
564 source_line_number: source_line_number as usize,
565 source_timestamp: row.get(7)?,
566 })
567 })?;
568
569 rows.collect::<std::result::Result<Vec<_>, _>>()
570 .map_err(Into::into)
571 }
572
573 pub fn session_repos(&self, session_key: &str) -> Result<Vec<String>> {
574 let mut statement = self.conn.prepare(
575 r#"
576 SELECT repo
577 FROM session_repos
578 WHERE session_key = ?
579 ORDER BY lower(repo) ASC
580 "#,
581 )?;
582 let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
583
584 rows.collect::<std::result::Result<Vec<_>, _>>()
585 .map_err(Into::into)
586 }
587
588 pub fn is_source_current(
589 &self,
590 source_file_path: &Path,
591 source_file_mtime_ns: i64,
592 source_file_size: i64,
593 ) -> Result<bool> {
594 let count = self.conn.query_row(
595 r#"
596 SELECT COUNT(*)
597 FROM ingestion_state
598 WHERE source_file_path = ?
599 AND source_file_mtime_ns = ?
600 AND source_file_size = ?
601 AND content_version = ?
602 "#,
603 params![
604 source_file_path.display().to_string(),
605 source_file_mtime_ns,
606 source_file_size,
607 CONTENT_VERSION,
608 ],
609 |row| row.get::<_, i64>(0),
610 )?;
611 Ok(count > 0)
612 }
613
614 pub fn mark_source_indexed(
615 &self,
616 source_file_path: &Path,
617 source_file_mtime_ns: i64,
618 source_file_size: i64,
619 session_id: Option<&str>,
620 session_key: Option<&str>,
621 ) -> Result<()> {
622 self.conn.execute(
623 r#"
624 INSERT INTO ingestion_state (
625 source_file_path, source_file_mtime_ns, source_file_size, session_id, session_key, content_version, indexed_at
626 ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
627 ON CONFLICT(source_file_path) DO UPDATE SET
628 source_file_mtime_ns = excluded.source_file_mtime_ns,
629 source_file_size = excluded.source_file_size,
630 session_id = excluded.session_id,
631 session_key = excluded.session_key,
632 content_version = excluded.content_version,
633 indexed_at = excluded.indexed_at
634 "#,
635 params![
636 source_file_path.display().to_string(),
637 source_file_mtime_ns,
638 source_file_size,
639 session_id,
640 session_key,
641 CONTENT_VERSION,
642 ],
643 )?;
644 Ok(())
645 }
646
647 pub fn last_indexed_at(&self) -> Result<Option<String>> {
648 self.conn
649 .query_row("SELECT MAX(indexed_at) FROM ingestion_state", [], |row| {
650 row.get::<_, Option<String>>(0)
651 })
652 .map_err(Into::into)
653 }
654
655 fn init_schema(&self) -> Result<()> {
656 self.conn.execute_batch(
657 r#"
658 PRAGMA journal_mode = WAL;
659 PRAGMA synchronous = NORMAL;
660 "#,
661 )?;
662
663 if self.table_exists("sessions")? && !self.table_has_column("sessions", "session_key")? {
664 self.migrate_to_session_key_schema()?;
665 }
666
667 self.create_schema_objects()?;
668 self.ensure_ingestion_state_session_key_column()?;
669 self.ensure_ingestion_state_content_version_column()?;
670 self.backfill_session_repos()?;
671 self.backfill_session_repo_memberships()?;
672 self.backfill_ingestion_session_keys()?;
673 Ok(())
674 }
675
676 fn create_schema_objects(&self) -> Result<()> {
677 self.conn.execute_batch(
678 r#"
679 CREATE TABLE IF NOT EXISTS sessions (
680 session_key TEXT PRIMARY KEY,
681 session_id TEXT NOT NULL,
682 session_timestamp TEXT NOT NULL,
683 cwd TEXT NOT NULL,
684 repo TEXT NOT NULL DEFAULT '',
685 cli_version TEXT,
686 source_file_path TEXT NOT NULL
687 );
688
689 CREATE INDEX IF NOT EXISTS sessions_session_id_idx ON sessions(session_id);
690 CREATE INDEX IF NOT EXISTS sessions_repo_idx ON sessions(repo);
691
692 CREATE TABLE IF NOT EXISTS session_repos (
693 session_key TEXT NOT NULL,
694 repo TEXT NOT NULL,
695 PRIMARY KEY(session_key, repo),
696 FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
697 );
698
699 CREATE INDEX IF NOT EXISTS session_repos_repo_idx ON session_repos(repo);
700
701 CREATE TABLE IF NOT EXISTS events (
702 id INTEGER PRIMARY KEY AUTOINCREMENT,
703 session_key TEXT NOT NULL,
704 session_id TEXT NOT NULL,
705 kind TEXT NOT NULL,
706 role TEXT,
707 text TEXT NOT NULL,
708 command TEXT,
709 cwd TEXT,
710 exit_code INTEGER,
711 source_timestamp TEXT,
712 source_file_path TEXT NOT NULL,
713 source_line_number INTEGER NOT NULL,
714 FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
715 );
716
717 CREATE INDEX IF NOT EXISTS events_session_key_idx ON events(session_key);
718 CREATE INDEX IF NOT EXISTS events_session_id_idx ON events(session_id);
719 CREATE INDEX IF NOT EXISTS events_source_idx ON events(source_file_path, source_line_number);
720
721 CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
722 event_id UNINDEXED,
723 session_key UNINDEXED,
724 session_id UNINDEXED,
725 kind UNINDEXED,
726 text,
727 tokenize = 'porter unicode61'
728 );
729
730 CREATE TABLE IF NOT EXISTS ingestion_state (
731 source_file_path TEXT PRIMARY KEY,
732 source_file_mtime_ns INTEGER NOT NULL,
733 source_file_size INTEGER NOT NULL,
734 session_id TEXT,
735 session_key TEXT,
736 content_version INTEGER NOT NULL DEFAULT 2,
737 indexed_at TEXT NOT NULL
738 );
739 "#,
740 )?;
741 Ok(())
742 }
743
744 fn index_session_inner(&self, parsed: &ParsedSession) -> Result<()> {
745 let session_key = session_key(&parsed.session.id, &parsed.session.source_file_path);
746 let repo = repo_slug(&parsed.session.cwd);
747 self.conn.execute(
748 r#"
749 INSERT INTO sessions (
750 session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path
751 ) VALUES (?, ?, ?, ?, ?, ?, ?)
752 ON CONFLICT(session_key) DO UPDATE SET
753 session_id = excluded.session_id,
754 session_timestamp = excluded.session_timestamp,
755 cwd = excluded.cwd,
756 repo = excluded.repo,
757 cli_version = excluded.cli_version,
758 source_file_path = excluded.source_file_path
759 "#,
760 params![
761 session_key.as_str(),
762 parsed.session.id,
763 parsed.session.timestamp,
764 parsed.session.cwd,
765 repo,
766 parsed.session.cli_version,
767 parsed.session.source_file_path.display().to_string(),
768 ],
769 )?;
770
771 self.conn.execute(
772 "DELETE FROM events_fts WHERE session_key = ?",
773 params![session_key.as_str()],
774 )?;
775 self.conn.execute(
776 "DELETE FROM events WHERE session_key = ?",
777 params![session_key.as_str()],
778 )?;
779 self.conn.execute(
780 "DELETE FROM session_repos WHERE session_key = ?",
781 params![session_key.as_str()],
782 )?;
783
784 for repo in session_repos(parsed) {
785 self.conn.execute(
786 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
787 params![session_key.as_str(), repo],
788 )?;
789 }
790
791 for event in &parsed.events {
792 self.conn.execute(
793 r#"
794 INSERT INTO events (
795 session_key, session_id, kind, role, text, command, cwd, exit_code,
796 source_timestamp, source_file_path, source_line_number
797 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
798 "#,
799 params![
800 session_key.as_str(),
801 parsed.session.id,
802 event.kind.as_str(),
803 event.role,
804 event.text,
805 event.command,
806 event.cwd,
807 event.exit_code,
808 event.source_timestamp,
809 event.source_file_path.display().to_string(),
810 event.source_line_number as i64,
811 ],
812 )?;
813 let event_id = self.conn.last_insert_rowid();
814 self.conn.execute(
815 "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
816 params![
817 event_id,
818 session_key.as_str(),
819 parsed.session.id,
820 event.kind.as_str(),
821 event.text
822 ],
823 )?;
824 }
825
826 Ok(())
827 }
828
829 fn backfill_session_repos(&self) -> Result<()> {
830 let mut statement = self
831 .conn
832 .prepare("SELECT session_key, cwd FROM sessions WHERE repo = ''")?;
833 let rows = statement.query_map([], |row| {
834 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
835 })?;
836
837 for row in rows {
838 let (session_key, cwd) = row?;
839 self.conn.execute(
840 "UPDATE sessions SET repo = ? WHERE session_key = ?",
841 params![repo_slug(&cwd), session_key],
842 )?;
843 }
844 Ok(())
845 }
846
847 fn table_exists(&self, table_name: &str) -> Result<bool> {
848 let count = self.conn.query_row(
849 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?",
850 params![table_name],
851 |row| row.get::<_, i64>(0),
852 )?;
853 Ok(count > 0)
854 }
855
856 fn table_has_column(&self, table_name: &str, column_name: &str) -> Result<bool> {
857 let mut statement = self
858 .conn
859 .prepare(&format!("PRAGMA table_info({table_name})"))?;
860 let columns = statement
861 .query_map([], |row| row.get::<_, String>(1))?
862 .collect::<std::result::Result<Vec<_>, _>>()?;
863 Ok(columns.iter().any(|column| column == column_name))
864 }
865
866 fn ensure_ingestion_state_session_key_column(&self) -> Result<()> {
867 if self.table_has_column("ingestion_state", "session_key")? {
868 return Ok(());
869 }
870
871 match self.conn.execute(
872 "ALTER TABLE ingestion_state ADD COLUMN session_key TEXT",
873 [],
874 ) {
875 Ok(_) => Ok(()),
876 Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
877 Err(error) => Err(error.into()),
878 }
879 }
880
881 fn ensure_ingestion_state_content_version_column(&self) -> Result<()> {
882 if self.table_has_column("ingestion_state", "content_version")? {
883 return Ok(());
884 }
885
886 match self.conn.execute(
887 "ALTER TABLE ingestion_state ADD COLUMN content_version INTEGER NOT NULL DEFAULT 0",
888 [],
889 ) {
890 Ok(_) => Ok(()),
891 Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
892 Err(error) => Err(error.into()),
893 }
894 }
895
896 fn backfill_ingestion_session_keys(&self) -> Result<()> {
897 if !self.table_exists("ingestion_state")? {
898 return Ok(());
899 }
900
901 self.conn.execute(
902 r#"
903 UPDATE ingestion_state
904 SET session_key = (
905 SELECT sessions.session_key
906 FROM sessions
907 WHERE sessions.source_file_path = ingestion_state.source_file_path
908 LIMIT 1
909 )
910 WHERE session_key IS NULL
911 AND session_id IS NOT NULL
912 "#,
913 [],
914 )?;
915 Ok(())
916 }
917
918 fn backfill_session_repo_memberships(&self) -> Result<()> {
919 self.conn.execute(
920 r#"
921 INSERT OR IGNORE INTO session_repos (session_key, repo)
922 SELECT session_key, repo
923 FROM sessions
924 WHERE repo != ''
925 "#,
926 [],
927 )?;
928
929 let mut statement = self.conn.prepare(
930 r#"
931 SELECT DISTINCT session_key, cwd
932 FROM events
933 WHERE cwd IS NOT NULL
934 AND cwd != ''
935 "#,
936 )?;
937 let rows = statement.query_map([], |row| {
938 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
939 })?;
940
941 for row in rows {
942 let (session_key, cwd) = row?;
943 let repo = repo_slug(&cwd);
944 if repo.is_empty() {
945 continue;
946 }
947 self.conn.execute(
948 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
949 params![session_key, repo],
950 )?;
951 }
952
953 Ok(())
954 }
955
956 fn migrate_to_session_key_schema(&self) -> Result<()> {
957 let old_sessions = self.load_old_sessions()?;
958 let old_events = self.load_old_events()?;
959 let mut keys_by_session_id = HashMap::new();
960 for session in &old_sessions {
961 keys_by_session_id.insert(
962 session.session_id.clone(),
963 session_key(&session.session_id, Path::new(&session.source_file_path)),
964 );
965 }
966
967 self.conn.execute("BEGIN IMMEDIATE", [])?;
968 let result = (|| -> Result<()> {
969 self.conn.execute_batch(
970 r#"
971 DROP TABLE IF EXISTS events_fts;
972 DROP TABLE IF EXISTS events;
973 DROP TABLE IF EXISTS sessions;
974 "#,
975 )?;
976 self.create_schema_objects()?;
977
978 for session in &old_sessions {
979 let session_key = keys_by_session_id
980 .get(&session.session_id)
981 .expect("session key exists");
982 let repo = if session.repo.is_empty() {
983 repo_slug(&session.cwd)
984 } else {
985 session.repo.clone()
986 };
987 self.conn.execute(
988 r#"
989 INSERT INTO sessions (
990 session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path
991 ) VALUES (?, ?, ?, ?, ?, ?, ?)
992 "#,
993 params![
994 session_key,
995 session.session_id,
996 session.session_timestamp,
997 session.cwd,
998 repo,
999 session.cli_version,
1000 session.source_file_path,
1001 ],
1002 )?;
1003 self.conn.execute(
1004 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1005 params![session_key, repo],
1006 )?;
1007 }
1008
1009 for event in &old_events {
1010 let Some(session_key) = keys_by_session_id.get(&event.session_id) else {
1011 continue;
1012 };
1013 self.conn.execute(
1014 r#"
1015 INSERT INTO events (
1016 session_key, session_id, kind, role, text, command, cwd, exit_code,
1017 source_timestamp, source_file_path, source_line_number
1018 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1019 "#,
1020 params![
1021 session_key,
1022 event.session_id,
1023 event.kind,
1024 event.role,
1025 event.text,
1026 event.command,
1027 event.cwd,
1028 event.exit_code,
1029 event.source_timestamp,
1030 event.source_file_path,
1031 event.source_line_number,
1032 ],
1033 )?;
1034 let event_id = self.conn.last_insert_rowid();
1035 self.conn.execute(
1036 "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
1037 params![
1038 event_id,
1039 session_key,
1040 event.session_id,
1041 event.kind,
1042 event.text
1043 ],
1044 )?;
1045 }
1046
1047 Ok(())
1048 })();
1049
1050 match result {
1051 Ok(()) => {
1052 self.conn.execute("COMMIT", [])?;
1053 Ok(())
1054 }
1055 Err(error) => {
1056 let _ = self.conn.execute("ROLLBACK", []);
1057 Err(error)
1058 }
1059 }
1060 }
1061
1062 fn load_old_sessions(&self) -> Result<Vec<OldSessionRow>> {
1063 let has_repo = self.table_has_column("sessions", "repo")?;
1064 let sql = if has_repo {
1065 "SELECT session_id, session_timestamp, cwd, repo, cli_version, source_file_path FROM sessions"
1066 } else {
1067 "SELECT session_id, session_timestamp, cwd, '' AS repo, cli_version, source_file_path FROM sessions"
1068 };
1069 let mut statement = self.conn.prepare(sql)?;
1070 let rows = statement.query_map([], |row| {
1071 Ok(OldSessionRow {
1072 session_id: row.get(0)?,
1073 session_timestamp: row.get(1)?,
1074 cwd: row.get(2)?,
1075 repo: row.get(3)?,
1076 cli_version: row.get(4)?,
1077 source_file_path: row.get(5)?,
1078 })
1079 })?;
1080
1081 rows.collect::<std::result::Result<Vec<_>, _>>()
1082 .map_err(Into::into)
1083 }
1084
1085 fn load_old_events(&self) -> Result<Vec<OldEventRow>> {
1086 if !self.table_exists("events")? {
1087 return Ok(Vec::new());
1088 }
1089
1090 let mut statement = self.conn.prepare(
1091 r#"
1092 SELECT
1093 session_id, kind, role, text, command, cwd, exit_code,
1094 source_timestamp, source_file_path, source_line_number
1095 FROM events
1096 ORDER BY id ASC
1097 "#,
1098 )?;
1099 let rows = statement.query_map([], |row| {
1100 Ok(OldEventRow {
1101 session_id: row.get(0)?,
1102 kind: row.get(1)?,
1103 role: row.get(2)?,
1104 text: row.get(3)?,
1105 command: row.get(4)?,
1106 cwd: row.get(5)?,
1107 exit_code: row.get(6)?,
1108 source_timestamp: row.get(7)?,
1109 source_file_path: row.get(8)?,
1110 source_line_number: row.get(9)?,
1111 })
1112 })?;
1113
1114 rows.collect::<std::result::Result<Vec<_>, _>>()
1115 .map_err(Into::into)
1116 }
1117}
1118
1119fn configure_write_connection(conn: &Connection) -> Result<()> {
1120 conn.busy_timeout(Duration::from_secs(30))?;
1121 conn.execute_batch(
1122 r#"
1123 PRAGMA journal_mode = WAL;
1124 PRAGMA synchronous = NORMAL;
1125 PRAGMA temp_store = MEMORY;
1126 "#,
1127 )?;
1128 Ok(())
1129}
1130
1131fn configure_read_connection(conn: &Connection) -> Result<()> {
1132 conn.busy_timeout(Duration::from_secs(30))?;
1133 conn.execute_batch(
1134 r#"
1135 PRAGMA query_only = ON;
1136 PRAGMA temp_store = MEMORY;
1137 "#,
1138 )?;
1139 Ok(())
1140}
1141
1142enum SinceFilter {
1143 Absolute(String),
1144 LastDays(u32),
1145 Today,
1146 Yesterday,
1147}
1148
1149fn parse_date_filter(value: &str, flag_name: &str) -> Result<SinceFilter> {
1150 let trimmed = value.trim();
1151 let lower = trimmed.to_ascii_lowercase();
1152 if lower == "today" {
1153 return Ok(SinceFilter::Today);
1154 }
1155 if lower == "yesterday" {
1156 return Ok(SinceFilter::Yesterday);
1157 }
1158 if let Some(days) = lower.strip_suffix('d') {
1159 let days = days
1160 .parse::<u32>()
1161 .with_context(|| format!("parse {flag_name} relative day value `{value}`"))?;
1162 if days == 0 {
1163 return Ok(SinceFilter::Today);
1164 }
1165 return Ok(SinceFilter::LastDays(days));
1166 }
1167 if looks_like_absolute_date(trimmed) {
1168 return Ok(SinceFilter::Absolute(trimmed.to_owned()));
1169 }
1170
1171 anyhow::bail!(
1172 "unsupported {flag_name} value `{value}`; use YYYY-MM-DD, today, yesterday, or Nd like 7d"
1173 )
1174}
1175
1176fn looks_like_absolute_date(value: &str) -> bool {
1177 let bytes = value.as_bytes();
1178 bytes.len() >= 10
1179 && bytes[0..4].iter().all(|byte| byte.is_ascii_digit())
1180 && bytes[4] == b'-'
1181 && bytes[5..7].iter().all(|byte| byte.is_ascii_digit())
1182 && bytes[7] == b'-'
1183 && bytes[8..10].iter().all(|byte| byte.is_ascii_digit())
1184}
1185
1186fn append_since_clause(
1187 sql: &mut String,
1188 query_params: &mut Vec<String>,
1189 value: &str,
1190) -> Result<()> {
1191 append_lower_bound_clause(sql, query_params, value, "--since")
1192}
1193
1194fn append_lower_bound_clause(
1195 sql: &mut String,
1196 query_params: &mut Vec<String>,
1197 value: &str,
1198 flag_name: &str,
1199) -> Result<()> {
1200 sql.push_str(
1201 " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) >= ",
1202 );
1203 match parse_date_filter(value, flag_name)? {
1204 SinceFilter::Absolute(value) => {
1205 sql.push_str("datetime(?)");
1206 query_params.push(value);
1207 }
1208 SinceFilter::LastDays(days) => {
1209 sql.push_str("datetime('now', ?)");
1210 query_params.push(format!("-{days} days"));
1211 }
1212 SinceFilter::Today => {
1213 sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
1214 }
1215 SinceFilter::Yesterday => {
1216 sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
1217 }
1218 }
1219 Ok(())
1220}
1221
1222fn append_until_clause(
1223 sql: &mut String,
1224 query_params: &mut Vec<String>,
1225 value: &str,
1226) -> Result<()> {
1227 sql.push_str(
1228 " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) < ",
1229 );
1230 match parse_date_filter(value, "--until")? {
1231 SinceFilter::Absolute(value) => {
1232 sql.push_str("datetime(?)");
1233 query_params.push(value);
1234 }
1235 SinceFilter::LastDays(days) => {
1236 sql.push_str("datetime('now', ?)");
1237 query_params.push(format!("-{days} days"));
1238 }
1239 SinceFilter::Today => {
1240 sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
1241 }
1242 SinceFilter::Yesterday => {
1243 sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
1244 }
1245 }
1246 Ok(())
1247}
1248
1249fn append_from_until_clauses(
1250 sql: &mut String,
1251 query_params: &mut Vec<String>,
1252 since: Option<&String>,
1253 from: Option<&String>,
1254 until: Option<&String>,
1255) -> Result<()> {
1256 if since.is_some() && from.is_some() {
1257 bail!("use either --since or --from, not both");
1258 }
1259 if let Some(since) = since {
1260 append_since_clause(sql, query_params, since)?;
1261 } else if let Some(from) = from {
1262 append_lower_bound_clause(sql, query_params, from, "--from")?;
1263 }
1264 if let Some(until) = until {
1265 append_until_clause(sql, query_params, until)?;
1266 }
1267 Ok(())
1268}
1269
1270fn append_excluded_sessions_clause(
1271 sql: &mut String,
1272 query_params: &mut Vec<String>,
1273 excluded_sessions: &[String],
1274) {
1275 for excluded_session in excluded_sessions {
1276 sql.push_str(" AND sessions.session_id != ? AND sessions.session_key != ?");
1277 query_params.push(excluded_session.clone());
1278 query_params.push(excluded_session.clone());
1279 }
1280}
1281
1282fn append_event_kind_clause(
1283 sql: &mut String,
1284 query_params: &mut Vec<String>,
1285 column_name: &str,
1286 kinds: &[EventKind],
1287) {
1288 if kinds.is_empty() {
1289 return;
1290 }
1291 sql.push_str(" AND ");
1292 sql.push_str(column_name);
1293 sql.push_str(" IN (");
1294 sql.push_str(&placeholders(kinds.len()));
1295 sql.push(')');
1296 append_kind_params(query_params, kinds);
1297}
1298
1299fn append_recent_event_kind_clause(
1300 sql: &mut String,
1301 query_params: &mut Vec<String>,
1302 kinds: &[EventKind],
1303) {
1304 if kinds.is_empty() {
1305 return;
1306 }
1307 sql.push_str(
1308 r#"
1309 AND EXISTS (
1310 SELECT 1 FROM events kind_events
1311 WHERE kind_events.session_key = sessions.session_key
1312 AND kind_events.kind IN (
1313 "#,
1314 );
1315 sql.push_str(&placeholders(kinds.len()));
1316 sql.push_str("))");
1317 append_kind_params(query_params, kinds);
1318}
1319
1320fn placeholders(count: usize) -> String {
1321 std::iter::repeat_n("?", count)
1322 .collect::<Vec<_>>()
1323 .join(", ")
1324}
1325
1326fn append_kind_params(query_params: &mut Vec<String>, kinds: &[EventKind]) {
1327 query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
1328}
1329
1330struct SessionGroup {
1331 session_key: String,
1332 session_id: String,
1333 source_file_path: PathBuf,
1334 repo_matches_current: bool,
1335 hit_count: usize,
1336 best_score: f64,
1337 best_kind_weight: u8,
1338 session_timestamp: String,
1339 results: Vec<SearchResult>,
1340}
1341
1342fn rank_search_results(
1343 results: Vec<SearchResult>,
1344 _current_repo: Option<&str>,
1345 limit: usize,
1346 include_duplicates: bool,
1347) -> Vec<SearchResult> {
1348 let mut groups = Vec::<SessionGroup>::new();
1349
1350 for result in results {
1351 let kind_weight = event_kind_weight(result.kind);
1352 if let Some(group) = groups
1353 .iter_mut()
1354 .find(|group| group.session_key == result.session_key)
1355 {
1356 group.hit_count += 1;
1357 group.best_score = group.best_score.min(result.score);
1358 group.best_kind_weight = group.best_kind_weight.min(kind_weight);
1359 group.results.push(result);
1360 } else {
1361 groups.push(SessionGroup {
1362 session_key: result.session_key.clone(),
1363 session_id: result.session_id.clone(),
1364 source_file_path: result.source_file_path.clone(),
1365 repo_matches_current: result.repo_matches_current,
1366 hit_count: 1,
1367 best_score: result.score,
1368 best_kind_weight: kind_weight,
1369 session_timestamp: result.session_timestamp.clone(),
1370 results: vec![result],
1371 });
1372 }
1373 }
1374
1375 if !include_duplicates {
1376 groups = dedupe_session_groups(groups);
1377 }
1378
1379 groups.sort_by(|left, right| {
1380 right
1381 .repo_matches_current
1382 .cmp(&left.repo_matches_current)
1383 .then_with(|| right.hit_count.cmp(&left.hit_count))
1384 .then_with(|| left.best_kind_weight.cmp(&right.best_kind_weight))
1385 .then_with(|| {
1386 left.best_score
1387 .partial_cmp(&right.best_score)
1388 .unwrap_or(std::cmp::Ordering::Equal)
1389 })
1390 .then_with(|| right.session_timestamp.cmp(&left.session_timestamp))
1391 .then_with(|| left.session_key.cmp(&right.session_key))
1392 });
1393
1394 let mut ranked = Vec::new();
1395 for mut group in groups {
1396 group.results.sort_by(|left, right| {
1397 left.score
1398 .partial_cmp(&right.score)
1399 .unwrap_or(std::cmp::Ordering::Equal)
1400 .then_with(|| event_kind_weight(left.kind).cmp(&event_kind_weight(right.kind)))
1401 .then_with(|| left.source_line_number.cmp(&right.source_line_number))
1402 });
1403 ranked.extend(group.results);
1404 if ranked.len() >= limit {
1405 ranked.truncate(limit);
1406 break;
1407 }
1408 }
1409
1410 ranked
1411}
1412
1413fn dedupe_session_groups(groups: Vec<SessionGroup>) -> Vec<SessionGroup> {
1414 let mut selected = HashMap::<String, SessionGroup>::new();
1415 for group in groups {
1416 match selected.entry(group.session_id.clone()) {
1417 Entry::Occupied(mut entry) => {
1418 if is_preferred_group(&group, entry.get()) {
1419 entry.insert(group);
1420 }
1421 }
1422 Entry::Vacant(entry) => {
1423 entry.insert(group);
1424 }
1425 }
1426 }
1427 selected.into_values().collect()
1428}
1429
1430fn is_preferred_group(candidate: &SessionGroup, current: &SessionGroup) -> bool {
1431 let candidate_priority = source_priority(&candidate.source_file_path);
1432 let current_priority = source_priority(¤t.source_file_path);
1433 candidate_priority < current_priority
1434 || (candidate_priority == current_priority
1435 && candidate.repo_matches_current
1436 && !current.repo_matches_current)
1437 || (candidate_priority == current_priority
1438 && candidate.repo_matches_current == current.repo_matches_current
1439 && candidate.session_timestamp > current.session_timestamp)
1440 || (candidate_priority == current_priority
1441 && candidate.repo_matches_current == current.repo_matches_current
1442 && candidate.session_timestamp == current.session_timestamp
1443 && candidate.session_key < current.session_key)
1444}
1445
1446fn dedupe_recent_sessions(sessions: Vec<RecentSession>) -> Vec<RecentSession> {
1447 let mut selected = HashMap::<String, RecentSession>::new();
1448 for session in sessions {
1449 match selected.entry(session.session_id.clone()) {
1450 Entry::Occupied(mut entry) => {
1451 if is_preferred_recent_session(&session, entry.get()) {
1452 entry.insert(session);
1453 }
1454 }
1455 Entry::Vacant(entry) => {
1456 entry.insert(session);
1457 }
1458 }
1459 }
1460
1461 let mut sessions = selected.into_values().collect::<Vec<_>>();
1462 sessions.sort_by(|left, right| {
1463 right
1464 .session_timestamp
1465 .cmp(&left.session_timestamp)
1466 .then_with(|| {
1467 source_priority(&left.source_file_path)
1468 .cmp(&source_priority(&right.source_file_path))
1469 })
1470 .then_with(|| left.session_key.cmp(&right.session_key))
1471 });
1472 sessions
1473}
1474
1475fn is_preferred_recent_session(candidate: &RecentSession, current: &RecentSession) -> bool {
1476 let candidate_priority = source_priority(&candidate.source_file_path);
1477 let current_priority = source_priority(¤t.source_file_path);
1478 candidate_priority < current_priority
1479 || (candidate_priority == current_priority
1480 && candidate.session_timestamp > current.session_timestamp)
1481 || (candidate_priority == current_priority
1482 && candidate.session_timestamp == current.session_timestamp
1483 && candidate.session_key < current.session_key)
1484}
1485
1486fn source_priority(path: &Path) -> u8 {
1487 if path
1488 .components()
1489 .any(|component| component.as_os_str() == "archived_sessions")
1490 {
1491 return 2;
1492 }
1493 if path
1494 .components()
1495 .any(|component| component.as_os_str() == "sessions")
1496 {
1497 return 0;
1498 }
1499 1
1500}
1501
1502fn event_kind_weight(kind: EventKind) -> u8 {
1503 match kind {
1504 EventKind::UserMessage => 0,
1505 EventKind::AssistantMessage => 1,
1506 EventKind::Command => 2,
1507 }
1508}
1509
1510fn session_repos(parsed: &ParsedSession) -> BTreeSet<String> {
1511 let mut repos = BTreeSet::new();
1512 let session_repo = repo_slug(&parsed.session.cwd);
1513 if !session_repo.is_empty() {
1514 repos.insert(session_repo);
1515 }
1516
1517 for event in &parsed.events {
1518 let Some(cwd) = &event.cwd else {
1519 continue;
1520 };
1521 let repo = repo_slug(cwd);
1522 if !repo.is_empty() {
1523 repos.insert(repo);
1524 }
1525 }
1526
1527 repos
1528}
1529
1530fn fts_terms(query: &str) -> Vec<String> {
1531 let mut terms = Vec::new();
1532 let mut current = String::new();
1533
1534 for ch in query.chars() {
1535 if ch.is_alphanumeric() || ch == '_' {
1536 current.push(ch);
1537 } else if !current.is_empty() {
1538 terms.push(std::mem::take(&mut current));
1539 }
1540 }
1541
1542 if !current.is_empty() {
1543 terms.push(current);
1544 }
1545
1546 terms
1547}
1548
1549fn quote_fts_term(term: &str) -> String {
1550 format!("\"{}\"", term.replace('"', "\"\""))
1551}
1552
1553fn and_fts_query(terms: &[String]) -> String {
1554 terms
1555 .iter()
1556 .map(|term| quote_fts_term(term))
1557 .collect::<Vec<_>>()
1558 .join(" AND ")
1559}
1560
1561fn or_fts_query(terms: &[String]) -> String {
1562 terms
1563 .iter()
1564 .map(|term| quote_fts_term(term))
1565 .collect::<Vec<_>>()
1566 .join(" OR ")
1567}
1568
1569pub fn build_session_key(session_id: &str, source_file_path: &Path) -> String {
1570 session_key(session_id, source_file_path)
1571}
1572
1573fn session_key(session_id: &str, source_file_path: &Path) -> String {
1574 format!(
1575 "{}:{:016x}",
1576 session_id,
1577 fnv1a64(source_file_path.display().to_string().as_bytes())
1578 )
1579}
1580
1581fn fnv1a64(bytes: &[u8]) -> u64 {
1582 let mut hash = 0xcbf29ce484222325u64;
1583 for byte in bytes {
1584 hash ^= u64::from(*byte);
1585 hash = hash.wrapping_mul(0x100000001b3);
1586 }
1587 hash
1588}
1589
1590fn repo_slug(cwd: &str) -> String {
1591 Path::new(cwd)
1592 .file_name()
1593 .and_then(|name| name.to_str())
1594 .unwrap_or(cwd)
1595 .to_owned()
1596}
1597
1598#[cfg(test)]
1599mod tests;