1use crate::memory::{extract_memories, ExtractedMemory, MemoryKind};
2use crate::parser::{EventKind, ParsedSession};
3use anyhow::{bail, Context, Result};
4use rusqlite::{params, params_from_iter, Connection, OpenFlags};
5use std::collections::{hash_map::Entry, BTreeSet, HashMap};
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8
9const CONTENT_VERSION: i64 = 3;
10
11pub struct Store {
12 conn: Connection,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct Stats {
17 pub session_count: u64,
18 pub event_count: u64,
19 pub source_file_count: u64,
20 pub duplicate_source_file_count: u64,
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct SearchResult {
25 pub session_key: String,
26 pub session_id: String,
27 pub repo: String,
28 pub kind: EventKind,
29 pub text: String,
30 pub snippet: String,
31 pub score: f64,
32 pub session_timestamp: String,
33 pub cwd: String,
34 pub source_file_path: PathBuf,
35 pub source_line_number: usize,
36 pub source_timestamp: Option<String>,
37 pub session_hit_count: usize,
38 pub best_kind_weight: u8,
39 pub repo_matches_current: bool,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct SearchOptions {
44 pub query: String,
45 pub limit: usize,
46 pub repo: Option<String>,
47 pub cwd: Option<String>,
48 pub since: Option<String>,
49 pub from: Option<String>,
50 pub until: Option<String>,
51 pub include_duplicates: bool,
52 pub exclude_sessions: Vec<String>,
53 pub kinds: Vec<EventKind>,
54 pub current_repo: Option<String>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum MatchStrategy {
59 AllTerms,
60 AnyTermFallback,
61}
62
63impl MatchStrategy {
64 pub fn as_str(self) -> &'static str {
65 match self {
66 MatchStrategy::AllTerms => "all_terms",
67 MatchStrategy::AnyTermFallback => "any_terms_fallback",
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct SearchTrace {
74 pub match_strategy: MatchStrategy,
75 pub query_terms: Vec<String>,
76 pub fts_query: String,
77 pub fetch_limit: usize,
78 pub current_repo: Option<String>,
79 pub include_duplicates: bool,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct MemoryObject {
84 pub id: String,
85 pub kind: MemoryKind,
86 pub summary: String,
87 pub normalized_text: String,
88 pub first_seen_at: String,
89 pub last_seen_at: String,
90 pub created_at: String,
91 pub updated_at: String,
92 pub evidence_count: usize,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct MemoryEvidence {
97 pub memory_id: String,
98 pub session_key: String,
99 pub session_id: String,
100 pub repo: String,
101 pub cwd: String,
102 pub session_timestamp: String,
103 pub source_file_path: PathBuf,
104 pub source_line_number: usize,
105 pub source_timestamp: Option<String>,
106 pub event_kind: EventKind,
107 pub evidence_text: String,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct MemorySearchOptions {
112 pub query: Option<String>,
113 pub limit: usize,
114 pub repo: Option<String>,
115 pub cwd: Option<String>,
116 pub since: Option<String>,
117 pub from: Option<String>,
118 pub until: Option<String>,
119 pub kinds: Vec<MemoryKind>,
120}
121
122impl Default for MemorySearchOptions {
123 fn default() -> Self {
124 Self {
125 query: None,
126 limit: 20,
127 repo: None,
128 cwd: None,
129 since: None,
130 from: None,
131 until: None,
132 kinds: Vec::new(),
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct MemoryResult {
139 pub object: MemoryObject,
140 pub repos: Vec<String>,
141 pub session_keys: Vec<String>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub enum DeltaItem {
146 Session {
147 change_id: i64,
148 action: String,
149 session_key: String,
150 session_id: String,
151 repo: String,
152 cwd: String,
153 session_timestamp: String,
154 updated_at: String,
155 },
156 Memory {
157 change_id: i64,
158 action: String,
159 object: MemoryObject,
160 repos: Vec<String>,
161 session_keys: Vec<String>,
162 },
163 Deleted {
164 change_id: i64,
165 object_type: String,
166 object_id: String,
167 action: String,
168 updated_at: String,
169 },
170}
171
172impl DeltaItem {
173 pub fn updated_at(&self) -> &str {
174 match self {
175 DeltaItem::Session { updated_at, .. } => updated_at,
176 DeltaItem::Memory { object, .. } => &object.updated_at,
177 DeltaItem::Deleted { updated_at, .. } => updated_at,
178 }
179 }
180
181 pub fn change_id(&self) -> i64 {
182 match self {
183 DeltaItem::Session { change_id, .. } => *change_id,
184 DeltaItem::Memory { change_id, .. } => *change_id,
185 DeltaItem::Deleted { change_id, .. } => *change_id,
186 }
187 }
188
189 pub fn change_kind(&self) -> &str {
190 match self {
191 DeltaItem::Session { .. } => "session",
192 DeltaItem::Memory { .. } => "memory",
193 DeltaItem::Deleted { object_type, .. } => object_type,
194 }
195 }
196
197 pub fn action(&self) -> &str {
198 match self {
199 DeltaItem::Session { action, .. } => action,
200 DeltaItem::Memory { action, .. } => action,
201 DeltaItem::Deleted { action, .. } => action,
202 }
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct ResourceRecord {
208 pub uri: String,
209 pub name: String,
210 pub description: String,
211 pub mime_type: String,
212 pub object_type: String,
213 pub updated_at: String,
214}
215
216impl SearchOptions {
217 pub fn new(query: impl Into<String>, limit: usize) -> Self {
218 Self {
219 query: query.into(),
220 limit,
221 repo: None,
222 cwd: None,
223 since: None,
224 from: None,
225 until: None,
226 include_duplicates: false,
227 exclude_sessions: Vec::new(),
228 kinds: Vec::new(),
229 current_repo: None,
230 }
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq)]
235pub struct SessionEvent {
236 pub session_key: String,
237 pub session_id: String,
238 pub kind: EventKind,
239 pub text: String,
240 pub cwd: String,
241 pub source_file_path: PathBuf,
242 pub source_line_number: usize,
243 pub source_timestamp: Option<String>,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct SessionMatch {
248 pub session_key: String,
249 pub session_id: String,
250 pub cwd: String,
251 pub repo: String,
252 pub source_file_path: PathBuf,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct RecentSession {
257 pub session_key: String,
258 pub session_id: String,
259 pub repo: String,
260 pub cwd: String,
261 pub session_timestamp: String,
262 pub source_file_path: PathBuf,
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub struct RecentOptions {
267 pub limit: usize,
268 pub repo: Option<String>,
269 pub cwd: Option<String>,
270 pub since: Option<String>,
271 pub from: Option<String>,
272 pub until: Option<String>,
273 pub include_duplicates: bool,
274 pub exclude_sessions: Vec<String>,
275 pub kinds: Vec<EventKind>,
276}
277
278impl Default for RecentOptions {
279 fn default() -> Self {
280 Self {
281 limit: 20,
282 repo: None,
283 cwd: None,
284 since: None,
285 from: None,
286 until: None,
287 include_duplicates: false,
288 exclude_sessions: Vec::new(),
289 kinds: Vec::new(),
290 }
291 }
292}
293
294struct OldSessionRow {
295 session_id: String,
296 session_timestamp: String,
297 cwd: String,
298 repo: String,
299 cli_version: Option<String>,
300 source_file_path: String,
301}
302
303struct OldEventRow {
304 session_id: String,
305 kind: String,
306 role: Option<String>,
307 text: String,
308 command: Option<String>,
309 cwd: Option<String>,
310 exit_code: Option<i64>,
311 source_timestamp: Option<String>,
312 source_file_path: String,
313 source_line_number: i64,
314}
315
316impl Store {
317 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
318 let path = path.as_ref();
319 if let Some(parent) = path.parent() {
320 std::fs::create_dir_all(parent)
321 .with_context(|| format!("create db directory {}", parent.display()))?;
322 }
323
324 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
325 configure_write_connection(&conn)?;
326 let store = Self { conn };
327 store.init_schema()?;
328 Ok(store)
329 }
330
331 pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
332 let path = path.as_ref();
333 let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
334 .with_context(|| format!("open db read-only {}", path.display()))?;
335 configure_read_connection(&conn)?;
336 Ok(Self { conn })
337 }
338
339 pub fn index_session(&self, parsed: &ParsedSession) -> Result<()> {
340 self.conn.execute("BEGIN IMMEDIATE", [])?;
341 let result = self.index_session_inner(parsed);
342 match result {
343 Ok(()) => {
344 self.conn.execute("COMMIT", [])?;
345 Ok(())
346 }
347 Err(error) => {
348 let _ = self.conn.execute("ROLLBACK", []);
349 Err(error)
350 }
351 }
352 }
353
354 pub fn stats(&self) -> Result<Stats> {
355 let session_count = self
356 .conn
357 .query_row("SELECT COUNT(*) FROM sessions", [], |row| {
358 row.get::<_, u64>(0)
359 })?;
360 let event_count = self
361 .conn
362 .query_row("SELECT COUNT(*) FROM events", [], |row| {
363 row.get::<_, u64>(0)
364 })?;
365 let source_file_count =
366 self.conn
367 .query_row("SELECT COUNT(*) FROM ingestion_state", [], |row| {
368 row.get::<_, u64>(0)
369 })?;
370 let unique_ingested_sessions = self.conn.query_row(
371 "SELECT COUNT(DISTINCT COALESCE(session_key, session_id)) FROM ingestion_state WHERE COALESCE(session_key, session_id) IS NOT NULL",
372 [],
373 |row| row.get::<_, u64>(0),
374 )?;
375
376 Ok(Stats {
377 session_count,
378 event_count,
379 source_file_count,
380 duplicate_source_file_count: source_file_count.saturating_sub(unique_ingested_sessions),
381 })
382 }
383
384 pub fn quick_check(&self) -> Result<String> {
385 self.conn
386 .query_row("PRAGMA quick_check", [], |row| row.get::<_, String>(0))
387 .map_err(Into::into)
388 }
389
390 pub fn fts_integrity_check(&self) -> Result<()> {
391 self.conn.execute(
392 "INSERT INTO events_fts(events_fts) VALUES('integrity-check')",
393 [],
394 )?;
395 Ok(())
396 }
397
398 pub fn fts_read_check(&self) -> Result<()> {
399 self.conn
400 .query_row("SELECT COUNT(*) FROM events_fts", [], |row| {
401 row.get::<_, i64>(0)
402 })
403 .map(|_| ())
404 .map_err(Into::into)
405 }
406
407 pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
408 self.search_with_options(SearchOptions::new(query, limit))
409 }
410
411 pub fn search_with_options(&self, options: SearchOptions) -> Result<Vec<SearchResult>> {
412 self.search_with_trace(options).map(|(_, results)| results)
413 }
414
415 pub fn search_with_trace(
416 &self,
417 options: SearchOptions,
418 ) -> Result<(SearchTrace, Vec<SearchResult>)> {
419 let terms = fts_terms(&options.query);
420 if terms.is_empty() {
421 return Ok((
422 SearchTrace {
423 match_strategy: MatchStrategy::AllTerms,
424 query_terms: Vec::new(),
425 fts_query: String::new(),
426 fetch_limit: 0,
427 current_repo: options.current_repo.clone(),
428 include_duplicates: options.include_duplicates,
429 },
430 Vec::new(),
431 ));
432 }
433
434 let limit = options.limit.clamp(1, 100);
435 let fetch_limit = limit.saturating_mul(50).clamp(200, 1_000);
436 let all_terms_query = and_fts_query(&terms);
437 let results = self.search_with_fts_query(&options, &all_terms_query, fetch_limit)?;
438 if !results.is_empty() || terms.len() == 1 {
439 return Ok((
440 SearchTrace {
441 match_strategy: MatchStrategy::AllTerms,
442 query_terms: terms,
443 fts_query: all_terms_query,
444 fetch_limit,
445 current_repo: options.current_repo.clone(),
446 include_duplicates: options.include_duplicates,
447 },
448 rank_search_results(
449 results,
450 options.current_repo.as_deref(),
451 limit,
452 options.include_duplicates,
453 ),
454 ));
455 }
456
457 let any_terms_query = or_fts_query(&terms);
458 let results = self.search_with_fts_query(&options, &any_terms_query, fetch_limit)?;
459 Ok((
460 SearchTrace {
461 match_strategy: MatchStrategy::AnyTermFallback,
462 query_terms: terms,
463 fts_query: any_terms_query,
464 fetch_limit,
465 current_repo: options.current_repo.clone(),
466 include_duplicates: options.include_duplicates,
467 },
468 rank_search_results(
469 results,
470 options.current_repo.as_deref(),
471 limit,
472 options.include_duplicates,
473 ),
474 ))
475 }
476
477 fn search_with_fts_query(
478 &self,
479 options: &SearchOptions,
480 fts_query: &str,
481 limit: usize,
482 ) -> Result<Vec<SearchResult>> {
483 let mut query_params = Vec::<String>::new();
484 let current_repo_expr = if let Some(current_repo) = &options.current_repo {
485 query_params.push(current_repo.clone());
486 "EXISTS (
487 SELECT 1 FROM session_repos current_repos
488 WHERE current_repos.session_key = sessions.session_key
489 AND lower(current_repos.repo) = lower(?)
490 )"
491 } else {
492 "0"
493 };
494 query_params.push(fts_query.to_owned());
495
496 let mut sql = format!(
497 r#"
498 SELECT
499 events.session_key,
500 events.session_id,
501 sessions.repo,
502 events.kind,
503 events.text,
504 snippet(events_fts, 4, '', '', ' ... ', 16) AS snippet,
505 events_fts.rank AS score,
506 sessions.session_timestamp,
507 sessions.cwd,
508 events.source_file_path,
509 events.source_line_number,
510 events.source_timestamp,
511 {current_repo_expr} AS current_repo_match
512 FROM events_fts
513 JOIN events ON events.id = events_fts.event_id
514 JOIN sessions ON sessions.session_key = events.session_key
515 WHERE events_fts MATCH ?
516 "#,
517 );
518
519 if let Some(repo) = &options.repo {
520 sql.push_str(
521 r#"
522 AND EXISTS (
523 SELECT 1 FROM session_repos filter_repos
524 WHERE filter_repos.session_key = sessions.session_key
525 AND lower(filter_repos.repo) = lower(?)
526 )
527 "#,
528 );
529 query_params.push(repo.clone());
530 }
531 if let Some(cwd) = &options.cwd {
532 sql.push_str(
533 r#"
534 AND (
535 sessions.cwd LIKE '%' || ? || '%'
536 OR EXISTS (
537 SELECT 1 FROM events cwd_events
538 WHERE cwd_events.session_key = sessions.session_key
539 AND cwd_events.cwd LIKE '%' || ? || '%'
540 )
541 )
542 "#,
543 );
544 query_params.push(cwd.clone());
545 query_params.push(cwd.clone());
546 }
547 append_from_until_clauses(
548 &mut sql,
549 &mut query_params,
550 options.since.as_ref(),
551 options.from.as_ref(),
552 options.until.as_ref(),
553 )?;
554 append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
555 append_event_kind_clause(&mut sql, &mut query_params, "events.kind", &options.kinds);
556
557 sql.push_str(" ORDER BY events_fts.rank ASC, events.source_line_number ASC LIMIT ");
558 sql.push_str(&limit.to_string());
559
560 let mut statement = self.conn.prepare(&sql)?;
561
562 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
563 let kind_text: String = row.get(3)?;
564 let kind = kind_text.parse::<EventKind>().map_err(|_| {
565 rusqlite::Error::InvalidColumnType(
566 3,
567 "kind".to_owned(),
568 rusqlite::types::Type::Text,
569 )
570 })?;
571 let source_file_path: String = row.get(9)?;
572 let source_line_number: i64 = row.get(10)?;
573
574 Ok(SearchResult {
575 session_key: row.get(0)?,
576 session_id: row.get(1)?,
577 repo: row.get(2)?,
578 kind,
579 text: row.get(4)?,
580 snippet: row.get(5)?,
581 score: row.get(6)?,
582 session_timestamp: row.get(7)?,
583 cwd: row.get(8)?,
584 source_file_path: PathBuf::from(source_file_path),
585 source_line_number: source_line_number as usize,
586 source_timestamp: row.get(11)?,
587 session_hit_count: 0,
588 best_kind_weight: 0,
589 repo_matches_current: row.get::<_, i64>(12)? != 0,
590 })
591 })?;
592
593 rows.collect::<std::result::Result<Vec<_>, _>>()
594 .map_err(Into::into)
595 }
596
597 pub fn resolve_session_reference(&self, reference: &str) -> Result<Vec<SessionMatch>> {
598 let mut statement = self.conn.prepare(
599 r#"
600 SELECT session_key, session_id, cwd, repo, source_file_path
601 FROM sessions
602 WHERE session_key = ? OR session_id = ?
603 ORDER BY session_timestamp DESC, source_file_path ASC
604 "#,
605 )?;
606 let rows = statement.query_map(params![reference, reference], |row| {
607 let source_file_path: String = row.get(4)?;
608 Ok(SessionMatch {
609 session_key: row.get(0)?,
610 session_id: row.get(1)?,
611 cwd: row.get(2)?,
612 repo: row.get(3)?,
613 source_file_path: PathBuf::from(source_file_path),
614 })
615 })?;
616
617 rows.collect::<std::result::Result<Vec<_>, _>>()
618 .map_err(Into::into)
619 }
620
621 pub fn recent_sessions(&self, options: RecentOptions) -> Result<Vec<RecentSession>> {
622 let limit = options.limit.clamp(1, 100);
623 let fetch_limit = if options.include_duplicates {
624 limit
625 } else {
626 limit.saturating_mul(5).clamp(limit, 500)
627 };
628 let mut query_params = Vec::<String>::new();
629 let mut sql = r#"
630 SELECT
631 sessions.session_key,
632 sessions.session_id,
633 sessions.repo,
634 sessions.cwd,
635 sessions.session_timestamp,
636 sessions.source_file_path
637 FROM sessions
638 WHERE 1 = 1
639 "#
640 .to_owned();
641
642 if let Some(repo) = &options.repo {
643 sql.push_str(
644 r#"
645 AND EXISTS (
646 SELECT 1 FROM session_repos filter_repos
647 WHERE filter_repos.session_key = sessions.session_key
648 AND lower(filter_repos.repo) = lower(?)
649 )
650 "#,
651 );
652 query_params.push(repo.clone());
653 }
654 if let Some(cwd) = &options.cwd {
655 sql.push_str(
656 r#"
657 AND (
658 sessions.cwd LIKE '%' || ? || '%'
659 OR EXISTS (
660 SELECT 1 FROM events cwd_events
661 WHERE cwd_events.session_key = sessions.session_key
662 AND cwd_events.cwd LIKE '%' || ? || '%'
663 )
664 )
665 "#,
666 );
667 query_params.push(cwd.clone());
668 query_params.push(cwd.clone());
669 }
670 append_from_until_clauses(
671 &mut sql,
672 &mut query_params,
673 options.since.as_ref(),
674 options.from.as_ref(),
675 options.until.as_ref(),
676 )?;
677 append_excluded_sessions_clause(&mut sql, &mut query_params, &options.exclude_sessions);
678 append_recent_event_kind_clause(&mut sql, &mut query_params, &options.kinds);
679
680 sql.push_str(
681 r#"
682 ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
683 sessions.source_file_path ASC
684 LIMIT ?
685 "#,
686 );
687 query_params.push(fetch_limit.to_string());
688
689 let mut statement = self.conn.prepare(&sql)?;
690 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
691 let source_file_path: String = row.get(5)?;
692 Ok(RecentSession {
693 session_key: row.get(0)?,
694 session_id: row.get(1)?,
695 repo: row.get(2)?,
696 cwd: row.get(3)?,
697 session_timestamp: row.get(4)?,
698 source_file_path: PathBuf::from(source_file_path),
699 })
700 })?;
701
702 let mut sessions = rows.collect::<std::result::Result<Vec<_>, _>>()?;
703 if !options.include_duplicates {
704 sessions = dedupe_recent_sessions(sessions);
705 }
706 sessions.truncate(limit);
707 Ok(sessions)
708 }
709
710 pub fn session_events(&self, session_key: &str, limit: usize) -> Result<Vec<SessionEvent>> {
711 self.session_events_with_kinds(session_key, limit, &[])
712 }
713
714 pub fn session_events_with_kinds(
715 &self,
716 session_key: &str,
717 limit: usize,
718 kinds: &[EventKind],
719 ) -> Result<Vec<SessionEvent>> {
720 let limit = limit.clamp(1, 500);
721 let mut query_params = vec![session_key.to_owned()];
722 let mut sql = r#"
723 SELECT
724 events.session_key,
725 events.session_id,
726 events.kind,
727 events.text,
728 sessions.cwd,
729 events.source_file_path,
730 events.source_line_number,
731 events.source_timestamp
732 FROM events
733 JOIN sessions ON sessions.session_key = events.session_key
734 WHERE events.session_key = ?
735 "#
736 .to_owned();
737 append_event_kind_clause(&mut sql, &mut query_params, "events.kind", kinds);
738 sql.push_str(
739 r#"
740 ORDER BY events.source_line_number ASC
741 LIMIT ?
742 "#,
743 );
744 query_params.push(limit.to_string());
745
746 let mut statement = self.conn.prepare(&sql)?;
747
748 let rows = statement.query_map(params_from_iter(query_params.iter()), |row| {
749 let kind_text: String = row.get(2)?;
750 let kind = kind_text.parse::<EventKind>().map_err(|_| {
751 rusqlite::Error::InvalidColumnType(
752 2,
753 "kind".to_owned(),
754 rusqlite::types::Type::Text,
755 )
756 })?;
757 let source_file_path: String = row.get(5)?;
758 let source_line_number: i64 = row.get(6)?;
759
760 Ok(SessionEvent {
761 session_key: row.get(0)?,
762 session_id: row.get(1)?,
763 kind,
764 text: row.get(3)?,
765 cwd: row.get(4)?,
766 source_file_path: PathBuf::from(source_file_path),
767 source_line_number: source_line_number as usize,
768 source_timestamp: row.get(7)?,
769 })
770 })?;
771
772 rows.collect::<std::result::Result<Vec<_>, _>>()
773 .map_err(Into::into)
774 }
775
776 pub fn session_repos(&self, session_key: &str) -> Result<Vec<String>> {
777 let mut statement = self.conn.prepare(
778 r#"
779 SELECT repo
780 FROM session_repos
781 WHERE session_key = ?
782 ORDER BY lower(repo) ASC
783 "#,
784 )?;
785 let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
786
787 rows.collect::<std::result::Result<Vec<_>, _>>()
788 .map_err(Into::into)
789 }
790
791 pub fn memory_results_with_trace(
792 &self,
793 options: MemorySearchOptions,
794 ) -> Result<(MatchStrategy, Vec<MemoryResult>)> {
795 let limit = options.limit.clamp(1, 100);
796 let Some(query) = options
797 .query
798 .as_ref()
799 .map(|value| value.trim())
800 .filter(|value| !value.is_empty())
801 else {
802 let ids = self.list_memory_ids(&options, limit)?;
803 return Ok((MatchStrategy::AllTerms, self.load_memory_results(&ids)?));
804 };
805
806 let terms = fts_terms(query);
807 if terms.is_empty() {
808 return Ok((MatchStrategy::AllTerms, Vec::new()));
809 }
810
811 let ids = self.search_memory_ids_with_fts(&options, &and_fts_query(&terms), limit)?;
812 if !ids.is_empty() || terms.len() == 1 {
813 return Ok((MatchStrategy::AllTerms, self.load_memory_results(&ids)?));
814 }
815
816 let ids = self.search_memory_ids_with_fts(&options, &or_fts_query(&terms), limit)?;
817 Ok((
818 MatchStrategy::AnyTermFallback,
819 self.load_memory_results(&ids)?,
820 ))
821 }
822
823 pub fn memory_results(&self, options: MemorySearchOptions) -> Result<Vec<MemoryResult>> {
824 self.memory_results_with_trace(options)
825 .map(|(_, results)| results)
826 }
827
828 pub fn memory_by_id(&self, memory_id: &str) -> Result<Option<MemoryObject>> {
829 let mut statement = self.conn.prepare(
830 r#"
831 SELECT id, kind, summary, normalized_text, first_seen_at, last_seen_at, created_at, updated_at
832 FROM memory_objects
833 WHERE id = ?
834 "#,
835 )?;
836 let mut rows = statement.query(params![memory_id])?;
837 let Some(row) = rows.next()? else {
838 return Ok(None);
839 };
840 let kind_text: String = row.get(1)?;
841 let kind = MemoryKind::parse(&kind_text)
842 .ok_or_else(|| anyhow::anyhow!("unknown memory kind `{kind_text}`"))?;
843 let evidence_count: i64 = self.conn.query_row(
844 "SELECT COUNT(*) FROM memory_evidence WHERE memory_id = ?",
845 params![memory_id],
846 |count_row| count_row.get(0),
847 )?;
848
849 Ok(Some(MemoryObject {
850 id: row.get(0)?,
851 kind,
852 summary: row.get(2)?,
853 normalized_text: row.get(3)?,
854 first_seen_at: row.get(4)?,
855 last_seen_at: row.get(5)?,
856 created_at: row.get(6)?,
857 updated_at: row.get(7)?,
858 evidence_count: evidence_count as usize,
859 }))
860 }
861
862 pub fn memory_evidence(&self, memory_id: &str, limit: usize) -> Result<Vec<MemoryEvidence>> {
863 let limit = limit.clamp(1, 200);
864 let mut statement = self.conn.prepare(
865 r#"
866 SELECT
867 memory_evidence.memory_id,
868 memory_evidence.session_key,
869 memory_evidence.session_id,
870 sessions.repo,
871 sessions.cwd,
872 sessions.session_timestamp,
873 memory_evidence.source_file_path,
874 memory_evidence.source_line_number,
875 memory_evidence.source_timestamp,
876 memory_evidence.event_kind,
877 memory_evidence.evidence_text
878 FROM memory_evidence
879 JOIN sessions ON sessions.session_key = memory_evidence.session_key
880 WHERE memory_evidence.memory_id = ?
881 ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
882 memory_evidence.source_line_number ASC
883 LIMIT ?
884 "#,
885 )?;
886 let rows = statement.query_map(params![memory_id, limit.to_string()], |row| {
887 let event_kind_text: String = row.get(9)?;
888 let event_kind = event_kind_text.parse::<EventKind>().map_err(|_| {
889 rusqlite::Error::InvalidColumnType(
890 9,
891 "event_kind".to_owned(),
892 rusqlite::types::Type::Text,
893 )
894 })?;
895 let source_file_path: String = row.get(6)?;
896 let source_line_number: i64 = row.get(7)?;
897 Ok(MemoryEvidence {
898 memory_id: row.get(0)?,
899 session_key: row.get(1)?,
900 session_id: row.get(2)?,
901 repo: row.get(3)?,
902 cwd: row.get(4)?,
903 session_timestamp: row.get(5)?,
904 source_file_path: PathBuf::from(source_file_path),
905 source_line_number: source_line_number as usize,
906 source_timestamp: row.get(8)?,
907 event_kind,
908 evidence_text: row.get(10)?,
909 })
910 })?;
911
912 rows.collect::<std::result::Result<Vec<_>, _>>()
913 .map_err(Into::into)
914 }
915
916 pub fn delta_items(
917 &self,
918 cursor: Option<&str>,
919 limit: usize,
920 repo: Option<&str>,
921 ) -> Result<Vec<DeltaItem>> {
922 if !self.table_exists("change_log")? {
923 return Ok(Vec::new());
924 }
925
926 let limit = limit.clamp(1, 200);
927 let mut next_change_id = cursor
928 .and_then(parse_delta_cursor)
929 .map_or(0, |item| item.change_id);
930 let mut items = Vec::new();
931 let batch_size = limit.saturating_mul(5).clamp(100, 500);
932
933 while items.len() < limit {
934 let rows = self.next_change_rows(next_change_id, batch_size)?;
935 if rows.is_empty() {
936 break;
937 }
938 next_change_id = rows.last().map(|row| row.seq).unwrap_or(next_change_id);
939 for row in rows {
940 if let Some(item) = self.resolve_delta_item(&row, repo)? {
941 items.push(item);
942 if items.len() == limit {
943 break;
944 }
945 }
946 }
947 }
948
949 Ok(items)
950 }
951
952 pub fn related_memories_for_session(
953 &self,
954 session_key: &str,
955 limit: usize,
956 ) -> Result<Vec<MemoryResult>> {
957 let mut statement = self.conn.prepare(
958 r#"
959 SELECT memory_id
960 FROM memory_evidence
961 WHERE session_key = ?
962 GROUP BY memory_id
963 ORDER BY COUNT(*) DESC, memory_id ASC
964 LIMIT ?
965 "#,
966 )?;
967 let rows = statement.query_map(params![session_key, limit.to_string()], |row| {
968 row.get::<_, String>(0)
969 })?;
970 let ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
971 self.load_memory_results(&ids)
972 }
973
974 pub fn related_sessions_for_session(
975 &self,
976 session_key: &str,
977 limit: usize,
978 ) -> Result<Vec<RecentSession>> {
979 let mut statement = self.conn.prepare(
980 r#"
981 SELECT
982 sessions.session_key,
983 sessions.session_id,
984 sessions.repo,
985 sessions.cwd,
986 sessions.session_timestamp,
987 sessions.source_file_path
988 FROM memory_evidence seed
989 JOIN memory_evidence related ON related.memory_id = seed.memory_id
990 JOIN sessions ON sessions.session_key = related.session_key
991 WHERE seed.session_key = ?
992 AND related.session_key != ?
993 GROUP BY sessions.session_key
994 ORDER BY COUNT(DISTINCT related.memory_id) DESC,
995 datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
996 sessions.session_key ASC
997 LIMIT ?
998 "#,
999 )?;
1000 let rows = statement.query_map(
1001 params![session_key, session_key, limit.to_string()],
1002 |row| {
1003 let source_file_path: String = row.get(5)?;
1004 Ok(RecentSession {
1005 session_key: row.get(0)?,
1006 session_id: row.get(1)?,
1007 repo: row.get(2)?,
1008 cwd: row.get(3)?,
1009 session_timestamp: row.get(4)?,
1010 source_file_path: PathBuf::from(source_file_path),
1011 })
1012 },
1013 )?;
1014 rows.collect::<std::result::Result<Vec<_>, _>>()
1015 .map_err(Into::into)
1016 }
1017
1018 pub fn related_sessions_for_memory(
1019 &self,
1020 memory_id: &str,
1021 limit: usize,
1022 ) -> Result<Vec<RecentSession>> {
1023 let mut statement = self.conn.prepare(
1024 r#"
1025 SELECT
1026 sessions.session_key,
1027 sessions.session_id,
1028 sessions.repo,
1029 sessions.cwd,
1030 sessions.session_timestamp,
1031 sessions.source_file_path
1032 FROM memory_evidence
1033 JOIN sessions ON sessions.session_key = memory_evidence.session_key
1034 WHERE memory_evidence.memory_id = ?
1035 GROUP BY sessions.session_key
1036 ORDER BY datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) DESC,
1037 sessions.session_key ASC
1038 LIMIT ?
1039 "#,
1040 )?;
1041 let rows = statement.query_map(params![memory_id, limit.to_string()], |row| {
1042 let source_file_path: String = row.get(5)?;
1043 Ok(RecentSession {
1044 session_key: row.get(0)?,
1045 session_id: row.get(1)?,
1046 repo: row.get(2)?,
1047 cwd: row.get(3)?,
1048 session_timestamp: row.get(4)?,
1049 source_file_path: PathBuf::from(source_file_path),
1050 })
1051 })?;
1052 rows.collect::<std::result::Result<Vec<_>, _>>()
1053 .map_err(Into::into)
1054 }
1055
1056 pub fn cooccurring_memories(&self, memory_id: &str, limit: usize) -> Result<Vec<MemoryResult>> {
1057 let mut statement = self.conn.prepare(
1058 r#"
1059 SELECT related.memory_id
1060 FROM memory_evidence seed
1061 JOIN memory_evidence related ON related.session_key = seed.session_key
1062 WHERE seed.memory_id = ?
1063 AND related.memory_id != ?
1064 GROUP BY related.memory_id
1065 ORDER BY COUNT(DISTINCT related.session_key) DESC, related.memory_id ASC
1066 LIMIT ?
1067 "#,
1068 )?;
1069 let rows = statement
1070 .query_map(params![memory_id, memory_id, limit.to_string()], |row| {
1071 row.get::<_, String>(0)
1072 })?;
1073 let ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1074 self.load_memory_results(&ids)
1075 }
1076
1077 pub fn session_resources(&self, limit: usize) -> Result<Vec<ResourceRecord>> {
1078 let sessions = self.recent_sessions(RecentOptions {
1079 limit,
1080 ..RecentOptions::default()
1081 })?;
1082 Ok(sessions
1083 .into_iter()
1084 .map(|session| ResourceRecord {
1085 uri: format!("codex-recall://session/{}", session.session_key),
1086 name: format!("session {}", session.session_id),
1087 description: format!("{} {}", session.repo, session.cwd),
1088 mime_type: "application/json".to_owned(),
1089 object_type: "session".to_owned(),
1090 updated_at: session.session_timestamp,
1091 })
1092 .collect())
1093 }
1094
1095 pub fn memory_resources(&self, limit: usize) -> Result<Vec<ResourceRecord>> {
1096 let memories = self.memory_results(MemorySearchOptions {
1097 limit,
1098 ..MemorySearchOptions::default()
1099 })?;
1100 Ok(memories
1101 .into_iter()
1102 .map(|memory| ResourceRecord {
1103 uri: format!("codex-recall://memory/{}", memory.object.id),
1104 name: format!("{} {}", memory.object.kind.as_str(), memory.object.id),
1105 description: memory.object.summary,
1106 mime_type: "application/json".to_owned(),
1107 object_type: "memory".to_owned(),
1108 updated_at: memory.object.updated_at,
1109 })
1110 .collect())
1111 }
1112
1113 fn list_memory_ids(&self, options: &MemorySearchOptions, limit: usize) -> Result<Vec<String>> {
1114 let mut sql = "SELECT memory_objects.id FROM memory_objects WHERE 1 = 1".to_owned();
1115 let mut params = Vec::<String>::new();
1116 append_memory_filter_clauses(&mut sql, &mut params, options)?;
1117 sql.push_str(
1118 r#"
1119 ORDER BY datetime(replace(replace(memory_objects.last_seen_at, 'T', ' '), 'Z', '')) DESC,
1120 memory_objects.id ASC
1121 LIMIT ?
1122 "#,
1123 );
1124 params.push(limit.to_string());
1125 let mut statement = self.conn.prepare(&sql)?;
1126 let rows = statement.query_map(params_from_iter(params.iter()), |row| {
1127 row.get::<_, String>(0)
1128 })?;
1129 rows.collect::<std::result::Result<Vec<_>, _>>()
1130 .map_err(Into::into)
1131 }
1132
1133 fn search_memory_ids_with_fts(
1134 &self,
1135 options: &MemorySearchOptions,
1136 fts_query: &str,
1137 limit: usize,
1138 ) -> Result<Vec<String>> {
1139 let fetch_limit = limit.saturating_mul(10).clamp(limit, 500);
1140 let mut sql = r#"
1141 SELECT memory_objects.id
1142 FROM memory_fts
1143 JOIN memory_objects ON memory_objects.id = memory_fts.memory_id
1144 WHERE memory_fts MATCH ?
1145 "#
1146 .to_owned();
1147 let mut params = vec![fts_query.to_owned()];
1148 append_memory_filter_clauses(&mut sql, &mut params, options)?;
1149 sql.push_str(
1150 r#"
1151 ORDER BY memory_fts.rank ASC,
1152 datetime(replace(replace(memory_objects.last_seen_at, 'T', ' '), 'Z', '')) DESC,
1153 memory_objects.id ASC
1154 LIMIT ?
1155 "#,
1156 );
1157 params.push(fetch_limit.to_string());
1158 let mut statement = self.conn.prepare(&sql)?;
1159 let rows = statement.query_map(params_from_iter(params.iter()), |row| {
1160 row.get::<_, String>(0)
1161 })?;
1162 let mut ids = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1163 ids.dedup();
1164 if ids.len() > limit {
1165 ids.truncate(limit);
1166 }
1167 Ok(ids)
1168 }
1169
1170 fn load_memory_results(&self, ids: &[String]) -> Result<Vec<MemoryResult>> {
1171 let mut results = Vec::new();
1172 for id in ids {
1173 let Some(object) = self.memory_by_id(id)? else {
1174 continue;
1175 };
1176 let repos = self.memory_repos(id)?;
1177 let session_keys = self.memory_session_keys(id)?;
1178 results.push(MemoryResult {
1179 object,
1180 repos,
1181 session_keys,
1182 });
1183 }
1184 Ok(results)
1185 }
1186
1187 fn memory_repos(&self, memory_id: &str) -> Result<Vec<String>> {
1188 let mut statement = self.conn.prepare(
1189 r#"
1190 SELECT DISTINCT sessions.repo
1191 FROM memory_evidence
1192 JOIN sessions ON sessions.session_key = memory_evidence.session_key
1193 WHERE memory_evidence.memory_id = ?
1194 AND sessions.repo != ''
1195 ORDER BY lower(sessions.repo) ASC
1196 "#,
1197 )?;
1198 let rows = statement.query_map(params![memory_id], |row| row.get::<_, String>(0))?;
1199 rows.collect::<std::result::Result<Vec<_>, _>>()
1200 .map_err(Into::into)
1201 }
1202
1203 fn memory_session_keys(&self, memory_id: &str) -> Result<Vec<String>> {
1204 let mut statement = self.conn.prepare(
1205 r#"
1206 SELECT DISTINCT session_key
1207 FROM memory_evidence
1208 WHERE memory_id = ?
1209 ORDER BY session_key ASC
1210 "#,
1211 )?;
1212 let rows = statement.query_map(params![memory_id], |row| row.get::<_, String>(0))?;
1213 rows.collect::<std::result::Result<Vec<_>, _>>()
1214 .map_err(Into::into)
1215 }
1216
1217 fn memory_ids_for_session(&self, session_key: &str) -> Result<Vec<String>> {
1218 let mut statement = self.conn.prepare(
1219 r#"
1220 SELECT DISTINCT memory_id
1221 FROM memory_evidence
1222 WHERE session_key = ?
1223 ORDER BY memory_id ASC
1224 "#,
1225 )?;
1226 let rows = statement.query_map(params![session_key], |row| row.get::<_, String>(0))?;
1227 rows.collect::<std::result::Result<Vec<_>, _>>()
1228 .map_err(Into::into)
1229 }
1230
1231 fn upsert_memory_object(
1232 &self,
1233 memory: &ExtractedMemory,
1234 session_timestamp: &str,
1235 ) -> Result<()> {
1236 self.conn.execute(
1237 r#"
1238 INSERT INTO memory_objects (
1239 id, kind, summary, normalized_text, first_seen_at, last_seen_at, created_at, updated_at
1240 ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'), strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1241 ON CONFLICT(id) DO UPDATE SET
1242 kind = excluded.kind,
1243 summary = CASE
1244 WHEN length(excluded.summary) < length(memory_objects.summary) THEN excluded.summary
1245 ELSE memory_objects.summary
1246 END,
1247 normalized_text = excluded.normalized_text,
1248 first_seen_at = MIN(memory_objects.first_seen_at, excluded.first_seen_at),
1249 last_seen_at = MAX(memory_objects.last_seen_at, excluded.last_seen_at),
1250 updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
1251 "#,
1252 params![
1253 memory.id,
1254 memory.kind.as_str(),
1255 memory.summary,
1256 memory.normalized_text,
1257 session_timestamp,
1258 session_timestamp,
1259 ],
1260 )?;
1261 Ok(())
1262 }
1263
1264 fn refresh_memory_objects(&self, memory_ids: &[String]) -> Result<()> {
1265 for memory_id in memory_ids {
1266 let evidence_count: i64 = self.conn.query_row(
1267 "SELECT COUNT(*) FROM memory_evidence WHERE memory_id = ?",
1268 params![memory_id],
1269 |row| row.get(0),
1270 )?;
1271
1272 if evidence_count == 0 {
1273 self.conn.execute(
1274 "DELETE FROM memory_fts WHERE memory_id = ?",
1275 params![memory_id],
1276 )?;
1277 self.conn.execute(
1278 "DELETE FROM memory_objects WHERE id = ?",
1279 params![memory_id],
1280 )?;
1281 self.record_change("memory", memory_id, "delete")?;
1282 continue;
1283 }
1284
1285 let (first_seen_at, last_seen_at, kind, summary, normalized_text): (
1286 String,
1287 String,
1288 String,
1289 String,
1290 String,
1291 ) = self.conn.query_row(
1292 r#"
1293 SELECT
1294 MIN(sessions.session_timestamp),
1295 MAX(sessions.session_timestamp),
1296 memory_objects.kind,
1297 memory_objects.summary,
1298 memory_objects.normalized_text
1299 FROM memory_objects
1300 JOIN memory_evidence ON memory_evidence.memory_id = memory_objects.id
1301 JOIN sessions ON sessions.session_key = memory_evidence.session_key
1302 WHERE memory_objects.id = ?
1303 GROUP BY memory_objects.id
1304 "#,
1305 params![memory_id],
1306 |row| {
1307 Ok((
1308 row.get(0)?,
1309 row.get(1)?,
1310 row.get(2)?,
1311 row.get(3)?,
1312 row.get(4)?,
1313 ))
1314 },
1315 )?;
1316
1317 self.conn.execute(
1318 r#"
1319 UPDATE memory_objects
1320 SET first_seen_at = ?, last_seen_at = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
1321 WHERE id = ?
1322 "#,
1323 params![first_seen_at, last_seen_at, memory_id],
1324 )?;
1325 self.conn.execute(
1326 "DELETE FROM memory_fts WHERE memory_id = ?",
1327 params![memory_id],
1328 )?;
1329 self.conn.execute(
1330 r#"
1331 INSERT INTO memory_fts (memory_id, kind, summary, normalized_text)
1332 VALUES (?, ?, ?, ?)
1333 "#,
1334 params![memory_id, kind, summary, normalized_text],
1335 )?;
1336 self.record_change("memory", memory_id, "upsert")?;
1337 }
1338
1339 Ok(())
1340 }
1341
1342 fn record_change(&self, object_type: &str, object_id: &str, action: &str) -> Result<i64> {
1343 self.conn.execute(
1344 r#"
1345 INSERT INTO change_log (object_type, object_id, action)
1346 VALUES (?, ?, ?)
1347 "#,
1348 params![object_type, object_id, action],
1349 )?;
1350 Ok(self.conn.last_insert_rowid())
1351 }
1352
1353 fn next_change_rows(&self, after_change_id: i64, limit: usize) -> Result<Vec<ChangeLogRow>> {
1354 let mut statement = self.conn.prepare(
1355 r#"
1356 SELECT seq, object_type, object_id, action, recorded_at
1357 FROM change_log
1358 WHERE seq > ?
1359 ORDER BY seq ASC
1360 LIMIT ?
1361 "#,
1362 )?;
1363 let rows = statement.query_map(params![after_change_id, limit as i64], |row| {
1364 Ok(ChangeLogRow {
1365 seq: row.get(0)?,
1366 object_type: row.get(1)?,
1367 object_id: row.get(2)?,
1368 action: row.get(3)?,
1369 recorded_at: row.get(4)?,
1370 })
1371 })?;
1372
1373 rows.collect::<std::result::Result<Vec<_>, _>>()
1374 .map_err(Into::into)
1375 }
1376
1377 fn session_has_repo(&self, session_key: &str, repo: &str) -> Result<bool> {
1378 let count = self.conn.query_row(
1379 r#"
1380 SELECT COUNT(*)
1381 FROM session_repos
1382 WHERE session_key = ?
1383 AND lower(repo) = lower(?)
1384 "#,
1385 params![session_key, repo],
1386 |row| row.get::<_, i64>(0),
1387 )?;
1388 Ok(count > 0)
1389 }
1390
1391 fn memory_result_by_id(&self, memory_id: &str) -> Result<Option<MemoryResult>> {
1392 self.load_memory_results(&[memory_id.to_owned()])
1393 .map(|results| results.into_iter().next())
1394 }
1395
1396 fn resolve_delta_item(
1397 &self,
1398 row: &ChangeLogRow,
1399 repo: Option<&str>,
1400 ) -> Result<Option<DeltaItem>> {
1401 match row.object_type.as_str() {
1402 "session" => {
1403 let mut statement = self.conn.prepare(
1404 r#"
1405 SELECT session_key, session_id, repo, cwd, session_timestamp, indexed_at
1406 FROM sessions
1407 WHERE session_key = ?
1408 "#,
1409 )?;
1410 let mut rows = statement.query(params![row.object_id.as_str()])?;
1411 let Some(session_row) = rows.next()? else {
1412 return Ok(None);
1413 };
1414 let session_key: String = session_row.get(0)?;
1415 if let Some(repo_filter) = repo {
1416 if !self.session_has_repo(&session_key, repo_filter)? {
1417 return Ok(None);
1418 }
1419 }
1420
1421 Ok(Some(DeltaItem::Session {
1422 change_id: row.seq,
1423 action: row.action.clone(),
1424 session_key,
1425 session_id: session_row.get(1)?,
1426 repo: session_row.get(2)?,
1427 cwd: session_row.get(3)?,
1428 session_timestamp: session_row.get(4)?,
1429 updated_at: session_row.get(5)?,
1430 }))
1431 }
1432 "memory" => {
1433 if let Some(result) = self.memory_result_by_id(&row.object_id)? {
1434 if let Some(repo_filter) = repo {
1435 if !result
1436 .repos
1437 .iter()
1438 .any(|item| item.eq_ignore_ascii_case(repo_filter))
1439 {
1440 return Ok(None);
1441 }
1442 }
1443
1444 return Ok(Some(DeltaItem::Memory {
1445 change_id: row.seq,
1446 action: row.action.clone(),
1447 object: result.object,
1448 repos: result.repos,
1449 session_keys: result.session_keys,
1450 }));
1451 }
1452
1453 if repo.is_some() {
1454 return Ok(None);
1455 }
1456
1457 Ok(Some(DeltaItem::Deleted {
1458 change_id: row.seq,
1459 object_type: row.object_type.clone(),
1460 object_id: row.object_id.clone(),
1461 action: row.action.clone(),
1462 updated_at: row.recorded_at.clone(),
1463 }))
1464 }
1465 _ => Ok(None),
1466 }
1467 }
1468
1469 pub fn is_source_current(
1470 &self,
1471 source_file_path: &Path,
1472 source_file_mtime_ns: i64,
1473 source_file_size: i64,
1474 ) -> Result<bool> {
1475 let count = self.conn.query_row(
1476 r#"
1477 SELECT COUNT(*)
1478 FROM ingestion_state
1479 WHERE source_file_path = ?
1480 AND source_file_mtime_ns = ?
1481 AND source_file_size = ?
1482 AND content_version = ?
1483 "#,
1484 params![
1485 source_file_path.display().to_string(),
1486 source_file_mtime_ns,
1487 source_file_size,
1488 CONTENT_VERSION,
1489 ],
1490 |row| row.get::<_, i64>(0),
1491 )?;
1492 Ok(count > 0)
1493 }
1494
1495 pub fn mark_source_indexed(
1496 &self,
1497 source_file_path: &Path,
1498 source_file_mtime_ns: i64,
1499 source_file_size: i64,
1500 session_id: Option<&str>,
1501 session_key: Option<&str>,
1502 ) -> Result<()> {
1503 self.conn.execute(
1504 r#"
1505 INSERT INTO ingestion_state (
1506 source_file_path, source_file_mtime_ns, source_file_size, session_id, session_key, content_version, indexed_at
1507 ) VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1508 ON CONFLICT(source_file_path) DO UPDATE SET
1509 source_file_mtime_ns = excluded.source_file_mtime_ns,
1510 source_file_size = excluded.source_file_size,
1511 session_id = excluded.session_id,
1512 session_key = excluded.session_key,
1513 content_version = excluded.content_version,
1514 indexed_at = excluded.indexed_at
1515 "#,
1516 params![
1517 source_file_path.display().to_string(),
1518 source_file_mtime_ns,
1519 source_file_size,
1520 session_id,
1521 session_key,
1522 CONTENT_VERSION,
1523 ],
1524 )?;
1525 Ok(())
1526 }
1527
1528 pub fn last_indexed_at(&self) -> Result<Option<String>> {
1529 self.conn
1530 .query_row("SELECT MAX(indexed_at) FROM ingestion_state", [], |row| {
1531 row.get::<_, Option<String>>(0)
1532 })
1533 .map_err(Into::into)
1534 }
1535
1536 fn init_schema(&self) -> Result<()> {
1537 self.conn.execute_batch(
1538 r#"
1539 PRAGMA journal_mode = WAL;
1540 PRAGMA synchronous = NORMAL;
1541 "#,
1542 )?;
1543
1544 if self.table_exists("sessions")? && !self.table_has_column("sessions", "session_key")? {
1545 self.migrate_to_session_key_schema()?;
1546 }
1547
1548 self.create_schema_objects()?;
1549 self.ensure_sessions_indexed_at_column()?;
1550 self.ensure_ingestion_state_session_key_column()?;
1551 self.ensure_ingestion_state_content_version_column()?;
1552 self.backfill_session_repos()?;
1553 self.backfill_session_repo_memberships()?;
1554 self.backfill_ingestion_session_keys()?;
1555 Ok(())
1556 }
1557
1558 fn create_schema_objects(&self) -> Result<()> {
1559 self.conn.execute_batch(
1560 r#"
1561 CREATE TABLE IF NOT EXISTS sessions (
1562 session_key TEXT PRIMARY KEY,
1563 session_id TEXT NOT NULL,
1564 session_timestamp TEXT NOT NULL,
1565 cwd TEXT NOT NULL,
1566 repo TEXT NOT NULL DEFAULT '',
1567 cli_version TEXT,
1568 source_file_path TEXT NOT NULL,
1569 indexed_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1570 );
1571
1572 CREATE INDEX IF NOT EXISTS sessions_session_id_idx ON sessions(session_id);
1573 CREATE INDEX IF NOT EXISTS sessions_repo_idx ON sessions(repo);
1574
1575 CREATE TABLE IF NOT EXISTS session_repos (
1576 session_key TEXT NOT NULL,
1577 repo TEXT NOT NULL,
1578 PRIMARY KEY(session_key, repo),
1579 FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1580 );
1581
1582 CREATE INDEX IF NOT EXISTS session_repos_repo_idx ON session_repos(repo);
1583
1584 CREATE TABLE IF NOT EXISTS events (
1585 id INTEGER PRIMARY KEY AUTOINCREMENT,
1586 session_key TEXT NOT NULL,
1587 session_id TEXT NOT NULL,
1588 kind TEXT NOT NULL,
1589 role TEXT,
1590 text TEXT NOT NULL,
1591 command TEXT,
1592 cwd TEXT,
1593 exit_code INTEGER,
1594 source_timestamp TEXT,
1595 source_file_path TEXT NOT NULL,
1596 source_line_number INTEGER NOT NULL,
1597 FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1598 );
1599
1600 CREATE INDEX IF NOT EXISTS events_session_key_idx ON events(session_key);
1601 CREATE INDEX IF NOT EXISTS events_session_id_idx ON events(session_id);
1602 CREATE INDEX IF NOT EXISTS events_source_idx ON events(source_file_path, source_line_number);
1603
1604 CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
1605 event_id UNINDEXED,
1606 session_key UNINDEXED,
1607 session_id UNINDEXED,
1608 kind UNINDEXED,
1609 text,
1610 tokenize = 'porter unicode61'
1611 );
1612
1613 CREATE TABLE IF NOT EXISTS ingestion_state (
1614 source_file_path TEXT PRIMARY KEY,
1615 source_file_mtime_ns INTEGER NOT NULL,
1616 source_file_size INTEGER NOT NULL,
1617 session_id TEXT,
1618 session_key TEXT,
1619 content_version INTEGER NOT NULL DEFAULT 3,
1620 indexed_at TEXT NOT NULL
1621 );
1622
1623 CREATE TABLE IF NOT EXISTS memory_objects (
1624 id TEXT PRIMARY KEY,
1625 kind TEXT NOT NULL,
1626 summary TEXT NOT NULL,
1627 normalized_text TEXT NOT NULL,
1628 first_seen_at TEXT NOT NULL,
1629 last_seen_at TEXT NOT NULL,
1630 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
1631 updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1632 );
1633
1634 CREATE UNIQUE INDEX IF NOT EXISTS memory_objects_kind_normalized_idx
1635 ON memory_objects(kind, normalized_text);
1636 CREATE INDEX IF NOT EXISTS memory_objects_updated_idx
1637 ON memory_objects(updated_at);
1638
1639 CREATE TABLE IF NOT EXISTS memory_evidence (
1640 memory_id TEXT NOT NULL,
1641 session_key TEXT NOT NULL,
1642 session_id TEXT NOT NULL,
1643 source_file_path TEXT NOT NULL,
1644 source_line_number INTEGER NOT NULL,
1645 source_timestamp TEXT,
1646 event_kind TEXT NOT NULL,
1647 evidence_text TEXT NOT NULL,
1648 PRIMARY KEY(memory_id, source_file_path, source_line_number),
1649 FOREIGN KEY(memory_id) REFERENCES memory_objects(id) ON DELETE CASCADE,
1650 FOREIGN KEY(session_key) REFERENCES sessions(session_key) ON DELETE CASCADE
1651 );
1652
1653 CREATE INDEX IF NOT EXISTS memory_evidence_memory_idx ON memory_evidence(memory_id);
1654 CREATE INDEX IF NOT EXISTS memory_evidence_session_idx ON memory_evidence(session_key);
1655
1656 CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(
1657 memory_id UNINDEXED,
1658 kind UNINDEXED,
1659 summary,
1660 normalized_text,
1661 tokenize = 'porter unicode61'
1662 );
1663
1664 CREATE TABLE IF NOT EXISTS change_log (
1665 seq INTEGER PRIMARY KEY AUTOINCREMENT,
1666 object_type TEXT NOT NULL,
1667 object_id TEXT NOT NULL,
1668 action TEXT NOT NULL,
1669 recorded_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1670 );
1671
1672 CREATE INDEX IF NOT EXISTS change_log_object_idx
1673 ON change_log(object_type, object_id, seq);
1674 "#,
1675 )?;
1676 Ok(())
1677 }
1678
1679 fn index_session_inner(&self, parsed: &ParsedSession) -> Result<()> {
1680 let session_key = session_key(&parsed.session.id, &parsed.session.source_file_path);
1681 let repo = repo_slug(&parsed.session.cwd);
1682 self.conn.execute(
1683 r#"
1684 INSERT INTO sessions (
1685 session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path, indexed_at
1686 ) VALUES (?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1687 ON CONFLICT(session_key) DO UPDATE SET
1688 session_id = excluded.session_id,
1689 session_timestamp = excluded.session_timestamp,
1690 cwd = excluded.cwd,
1691 repo = excluded.repo,
1692 cli_version = excluded.cli_version,
1693 source_file_path = excluded.source_file_path,
1694 indexed_at = excluded.indexed_at
1695 "#,
1696 params![
1697 session_key.as_str(),
1698 parsed.session.id,
1699 parsed.session.timestamp,
1700 parsed.session.cwd,
1701 repo,
1702 parsed.session.cli_version,
1703 parsed.session.source_file_path.display().to_string(),
1704 ],
1705 )?;
1706 self.record_change("session", &session_key, "upsert")?;
1707
1708 let mut affected_memory_ids = self.memory_ids_for_session(&session_key)?;
1709
1710 self.conn.execute(
1711 "DELETE FROM events_fts WHERE session_key = ?",
1712 params![session_key.as_str()],
1713 )?;
1714 self.conn.execute(
1715 "DELETE FROM events WHERE session_key = ?",
1716 params![session_key.as_str()],
1717 )?;
1718 self.conn.execute(
1719 "DELETE FROM session_repos WHERE session_key = ?",
1720 params![session_key.as_str()],
1721 )?;
1722 self.conn.execute(
1723 "DELETE FROM memory_evidence WHERE session_key = ?",
1724 params![session_key.as_str()],
1725 )?;
1726
1727 for repo in session_repos(parsed) {
1728 self.conn.execute(
1729 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1730 params![session_key.as_str(), repo],
1731 )?;
1732 }
1733
1734 for event in &parsed.events {
1735 self.conn.execute(
1736 r#"
1737 INSERT INTO events (
1738 session_key, session_id, kind, role, text, command, cwd, exit_code,
1739 source_timestamp, source_file_path, source_line_number
1740 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1741 "#,
1742 params![
1743 session_key.as_str(),
1744 parsed.session.id,
1745 event.kind.as_str(),
1746 event.role,
1747 event.text,
1748 event.command,
1749 event.cwd,
1750 event.exit_code,
1751 event.source_timestamp,
1752 event.source_file_path.display().to_string(),
1753 event.source_line_number as i64,
1754 ],
1755 )?;
1756 let event_id = self.conn.last_insert_rowid();
1757 self.conn.execute(
1758 "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
1759 params![
1760 event_id,
1761 session_key.as_str(),
1762 parsed.session.id,
1763 event.kind.as_str(),
1764 event.text
1765 ],
1766 )?;
1767 }
1768
1769 for memory in extract_memories(parsed) {
1770 self.upsert_memory_object(&memory, &parsed.session.timestamp)?;
1771 self.conn.execute(
1772 r#"
1773 INSERT OR REPLACE INTO memory_evidence (
1774 memory_id, session_key, session_id, source_file_path, source_line_number,
1775 source_timestamp, event_kind, evidence_text
1776 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1777 "#,
1778 params![
1779 memory.id,
1780 session_key.as_str(),
1781 parsed.session.id,
1782 parsed.session.source_file_path.display().to_string(),
1783 memory.source_line_number as i64,
1784 memory.source_timestamp,
1785 memory.event_kind.as_str(),
1786 memory.evidence_text,
1787 ],
1788 )?;
1789 affected_memory_ids.push(memory.id);
1790 }
1791
1792 affected_memory_ids.sort();
1793 affected_memory_ids.dedup();
1794 self.refresh_memory_objects(&affected_memory_ids)?;
1795
1796 Ok(())
1797 }
1798
1799 fn backfill_session_repos(&self) -> Result<()> {
1800 let mut statement = self
1801 .conn
1802 .prepare("SELECT session_key, cwd FROM sessions WHERE repo = ''")?;
1803 let rows = statement.query_map([], |row| {
1804 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1805 })?;
1806
1807 for row in rows {
1808 let (session_key, cwd) = row?;
1809 self.conn.execute(
1810 "UPDATE sessions SET repo = ? WHERE session_key = ?",
1811 params![repo_slug(&cwd), session_key],
1812 )?;
1813 }
1814 Ok(())
1815 }
1816
1817 fn table_exists(&self, table_name: &str) -> Result<bool> {
1818 let count = self.conn.query_row(
1819 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?",
1820 params![table_name],
1821 |row| row.get::<_, i64>(0),
1822 )?;
1823 Ok(count > 0)
1824 }
1825
1826 fn table_has_column(&self, table_name: &str, column_name: &str) -> Result<bool> {
1827 let mut statement = self
1828 .conn
1829 .prepare(&format!("PRAGMA table_info({table_name})"))?;
1830 let columns = statement
1831 .query_map([], |row| row.get::<_, String>(1))?
1832 .collect::<std::result::Result<Vec<_>, _>>()?;
1833 Ok(columns.iter().any(|column| column == column_name))
1834 }
1835
1836 fn ensure_ingestion_state_session_key_column(&self) -> Result<()> {
1837 if self.table_has_column("ingestion_state", "session_key")? {
1838 return Ok(());
1839 }
1840
1841 match self.conn.execute(
1842 "ALTER TABLE ingestion_state ADD COLUMN session_key TEXT",
1843 [],
1844 ) {
1845 Ok(_) => Ok(()),
1846 Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1847 Err(error) => Err(error.into()),
1848 }
1849 }
1850
1851 fn ensure_sessions_indexed_at_column(&self) -> Result<()> {
1852 if self.table_has_column("sessions", "indexed_at")? {
1853 return Ok(());
1854 }
1855
1856 match self.conn.execute(
1857 "ALTER TABLE sessions ADD COLUMN indexed_at TEXT NOT NULL DEFAULT '1970-01-01T00:00:00.000Z'",
1858 [],
1859 ) {
1860 Ok(_) => {
1861 self.conn.execute(
1862 "UPDATE sessions SET indexed_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE indexed_at = '1970-01-01T00:00:00.000Z'",
1863 [],
1864 )?;
1865 Ok(())
1866 }
1867 Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1868 Err(error) => Err(error.into()),
1869 }
1870 }
1871
1872 fn ensure_ingestion_state_content_version_column(&self) -> Result<()> {
1873 if self.table_has_column("ingestion_state", "content_version")? {
1874 return Ok(());
1875 }
1876
1877 match self.conn.execute(
1878 "ALTER TABLE ingestion_state ADD COLUMN content_version INTEGER NOT NULL DEFAULT 0",
1879 [],
1880 ) {
1881 Ok(_) => Ok(()),
1882 Err(error) if error.to_string().contains("duplicate column name") => Ok(()),
1883 Err(error) => Err(error.into()),
1884 }
1885 }
1886
1887 fn backfill_ingestion_session_keys(&self) -> Result<()> {
1888 if !self.table_exists("ingestion_state")? {
1889 return Ok(());
1890 }
1891
1892 self.conn.execute(
1893 r#"
1894 UPDATE ingestion_state
1895 SET session_key = (
1896 SELECT sessions.session_key
1897 FROM sessions
1898 WHERE sessions.source_file_path = ingestion_state.source_file_path
1899 LIMIT 1
1900 )
1901 WHERE session_key IS NULL
1902 AND session_id IS NOT NULL
1903 "#,
1904 [],
1905 )?;
1906 Ok(())
1907 }
1908
1909 fn backfill_session_repo_memberships(&self) -> Result<()> {
1910 self.conn.execute(
1911 r#"
1912 INSERT OR IGNORE INTO session_repos (session_key, repo)
1913 SELECT session_key, repo
1914 FROM sessions
1915 WHERE repo != ''
1916 "#,
1917 [],
1918 )?;
1919
1920 let mut statement = self.conn.prepare(
1921 r#"
1922 SELECT DISTINCT session_key, cwd
1923 FROM events
1924 WHERE cwd IS NOT NULL
1925 AND cwd != ''
1926 "#,
1927 )?;
1928 let rows = statement.query_map([], |row| {
1929 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1930 })?;
1931
1932 for row in rows {
1933 let (session_key, cwd) = row?;
1934 let repo = repo_slug(&cwd);
1935 if repo.is_empty() {
1936 continue;
1937 }
1938 self.conn.execute(
1939 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1940 params![session_key, repo],
1941 )?;
1942 }
1943
1944 Ok(())
1945 }
1946
1947 fn migrate_to_session_key_schema(&self) -> Result<()> {
1948 let old_sessions = self.load_old_sessions()?;
1949 let old_events = self.load_old_events()?;
1950 let mut keys_by_session_id = HashMap::new();
1951 for session in &old_sessions {
1952 keys_by_session_id.insert(
1953 session.session_id.clone(),
1954 session_key(&session.session_id, Path::new(&session.source_file_path)),
1955 );
1956 }
1957
1958 self.conn.execute("BEGIN IMMEDIATE", [])?;
1959 let result = (|| -> Result<()> {
1960 self.conn.execute_batch(
1961 r#"
1962 DROP TABLE IF EXISTS events_fts;
1963 DROP TABLE IF EXISTS events;
1964 DROP TABLE IF EXISTS sessions;
1965 "#,
1966 )?;
1967 self.create_schema_objects()?;
1968
1969 for session in &old_sessions {
1970 let session_key = keys_by_session_id
1971 .get(&session.session_id)
1972 .expect("session key exists");
1973 let repo = if session.repo.is_empty() {
1974 repo_slug(&session.cwd)
1975 } else {
1976 session.repo.clone()
1977 };
1978 self.conn.execute(
1979 r#"
1980 INSERT INTO sessions (
1981 session_key, session_id, session_timestamp, cwd, repo, cli_version, source_file_path, indexed_at
1982 ) VALUES (?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
1983 "#,
1984 params![
1985 session_key,
1986 session.session_id,
1987 session.session_timestamp,
1988 session.cwd,
1989 repo,
1990 session.cli_version,
1991 session.source_file_path,
1992 ],
1993 )?;
1994 self.conn.execute(
1995 "INSERT OR IGNORE INTO session_repos (session_key, repo) VALUES (?, ?)",
1996 params![session_key, repo],
1997 )?;
1998 }
1999
2000 for event in &old_events {
2001 let Some(session_key) = keys_by_session_id.get(&event.session_id) else {
2002 continue;
2003 };
2004 self.conn.execute(
2005 r#"
2006 INSERT INTO events (
2007 session_key, session_id, kind, role, text, command, cwd, exit_code,
2008 source_timestamp, source_file_path, source_line_number
2009 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2010 "#,
2011 params![
2012 session_key,
2013 event.session_id,
2014 event.kind,
2015 event.role,
2016 event.text,
2017 event.command,
2018 event.cwd,
2019 event.exit_code,
2020 event.source_timestamp,
2021 event.source_file_path,
2022 event.source_line_number,
2023 ],
2024 )?;
2025 let event_id = self.conn.last_insert_rowid();
2026 self.conn.execute(
2027 "INSERT INTO events_fts (event_id, session_key, session_id, kind, text) VALUES (?, ?, ?, ?, ?)",
2028 params![
2029 event_id,
2030 session_key,
2031 event.session_id,
2032 event.kind,
2033 event.text
2034 ],
2035 )?;
2036 }
2037
2038 Ok(())
2039 })();
2040
2041 match result {
2042 Ok(()) => {
2043 self.conn.execute("COMMIT", [])?;
2044 Ok(())
2045 }
2046 Err(error) => {
2047 let _ = self.conn.execute("ROLLBACK", []);
2048 Err(error)
2049 }
2050 }
2051 }
2052
2053 fn load_old_sessions(&self) -> Result<Vec<OldSessionRow>> {
2054 let has_repo = self.table_has_column("sessions", "repo")?;
2055 let sql = if has_repo {
2056 "SELECT session_id, session_timestamp, cwd, repo, cli_version, source_file_path FROM sessions"
2057 } else {
2058 "SELECT session_id, session_timestamp, cwd, '' AS repo, cli_version, source_file_path FROM sessions"
2059 };
2060 let mut statement = self.conn.prepare(sql)?;
2061 let rows = statement.query_map([], |row| {
2062 Ok(OldSessionRow {
2063 session_id: row.get(0)?,
2064 session_timestamp: row.get(1)?,
2065 cwd: row.get(2)?,
2066 repo: row.get(3)?,
2067 cli_version: row.get(4)?,
2068 source_file_path: row.get(5)?,
2069 })
2070 })?;
2071
2072 rows.collect::<std::result::Result<Vec<_>, _>>()
2073 .map_err(Into::into)
2074 }
2075
2076 fn load_old_events(&self) -> Result<Vec<OldEventRow>> {
2077 if !self.table_exists("events")? {
2078 return Ok(Vec::new());
2079 }
2080
2081 let mut statement = self.conn.prepare(
2082 r#"
2083 SELECT
2084 session_id, kind, role, text, command, cwd, exit_code,
2085 source_timestamp, source_file_path, source_line_number
2086 FROM events
2087 ORDER BY id ASC
2088 "#,
2089 )?;
2090 let rows = statement.query_map([], |row| {
2091 Ok(OldEventRow {
2092 session_id: row.get(0)?,
2093 kind: row.get(1)?,
2094 role: row.get(2)?,
2095 text: row.get(3)?,
2096 command: row.get(4)?,
2097 cwd: row.get(5)?,
2098 exit_code: row.get(6)?,
2099 source_timestamp: row.get(7)?,
2100 source_file_path: row.get(8)?,
2101 source_line_number: row.get(9)?,
2102 })
2103 })?;
2104
2105 rows.collect::<std::result::Result<Vec<_>, _>>()
2106 .map_err(Into::into)
2107 }
2108}
2109
2110fn configure_write_connection(conn: &Connection) -> Result<()> {
2111 conn.busy_timeout(Duration::from_secs(30))?;
2112 conn.execute_batch(
2113 r#"
2114 PRAGMA journal_mode = WAL;
2115 PRAGMA synchronous = NORMAL;
2116 PRAGMA temp_store = MEMORY;
2117 "#,
2118 )?;
2119 Ok(())
2120}
2121
2122fn configure_read_connection(conn: &Connection) -> Result<()> {
2123 conn.busy_timeout(Duration::from_secs(30))?;
2124 conn.execute_batch(
2125 r#"
2126 PRAGMA query_only = ON;
2127 PRAGMA temp_store = MEMORY;
2128 "#,
2129 )?;
2130 Ok(())
2131}
2132
2133enum SinceFilter {
2134 Absolute(String),
2135 LastDays(u32),
2136 Today,
2137 Yesterday,
2138}
2139
2140fn parse_date_filter(value: &str, flag_name: &str) -> Result<SinceFilter> {
2141 let trimmed = value.trim();
2142 let lower = trimmed.to_ascii_lowercase();
2143 if lower == "today" {
2144 return Ok(SinceFilter::Today);
2145 }
2146 if lower == "yesterday" {
2147 return Ok(SinceFilter::Yesterday);
2148 }
2149 if let Some(days) = lower.strip_suffix('d') {
2150 let days = days
2151 .parse::<u32>()
2152 .with_context(|| format!("parse {flag_name} relative day value `{value}`"))?;
2153 if days == 0 {
2154 return Ok(SinceFilter::Today);
2155 }
2156 return Ok(SinceFilter::LastDays(days));
2157 }
2158 if looks_like_absolute_date(trimmed) {
2159 return Ok(SinceFilter::Absolute(trimmed.to_owned()));
2160 }
2161
2162 anyhow::bail!(
2163 "unsupported {flag_name} value `{value}`; use YYYY-MM-DD, today, yesterday, or Nd like 7d"
2164 )
2165}
2166
2167fn looks_like_absolute_date(value: &str) -> bool {
2168 let bytes = value.as_bytes();
2169 bytes.len() >= 10
2170 && bytes[0..4].iter().all(|byte| byte.is_ascii_digit())
2171 && bytes[4] == b'-'
2172 && bytes[5..7].iter().all(|byte| byte.is_ascii_digit())
2173 && bytes[7] == b'-'
2174 && bytes[8..10].iter().all(|byte| byte.is_ascii_digit())
2175}
2176
2177fn append_since_clause(
2178 sql: &mut String,
2179 query_params: &mut Vec<String>,
2180 value: &str,
2181) -> Result<()> {
2182 append_lower_bound_clause(sql, query_params, value, "--since")
2183}
2184
2185fn append_lower_bound_clause(
2186 sql: &mut String,
2187 query_params: &mut Vec<String>,
2188 value: &str,
2189 flag_name: &str,
2190) -> Result<()> {
2191 sql.push_str(
2192 " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) >= ",
2193 );
2194 match parse_date_filter(value, flag_name)? {
2195 SinceFilter::Absolute(value) => {
2196 sql.push_str("datetime(?)");
2197 query_params.push(value);
2198 }
2199 SinceFilter::LastDays(days) => {
2200 sql.push_str("datetime('now', ?)");
2201 query_params.push(format!("-{days} days"));
2202 }
2203 SinceFilter::Today => {
2204 sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
2205 }
2206 SinceFilter::Yesterday => {
2207 sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
2208 }
2209 }
2210 Ok(())
2211}
2212
2213fn append_until_clause(
2214 sql: &mut String,
2215 query_params: &mut Vec<String>,
2216 value: &str,
2217) -> Result<()> {
2218 sql.push_str(
2219 " AND datetime(replace(replace(sessions.session_timestamp, 'T', ' '), 'Z', '')) < ",
2220 );
2221 match parse_date_filter(value, "--until")? {
2222 SinceFilter::Absolute(value) => {
2223 sql.push_str("datetime(?)");
2224 query_params.push(value);
2225 }
2226 SinceFilter::LastDays(days) => {
2227 sql.push_str("datetime('now', ?)");
2228 query_params.push(format!("-{days} days"));
2229 }
2230 SinceFilter::Today => {
2231 sql.push_str("datetime('now', 'localtime', 'start of day', 'utc')");
2232 }
2233 SinceFilter::Yesterday => {
2234 sql.push_str("datetime('now', 'localtime', 'start of day', '-1 day', 'utc')");
2235 }
2236 }
2237 Ok(())
2238}
2239
2240fn append_from_until_clauses(
2241 sql: &mut String,
2242 query_params: &mut Vec<String>,
2243 since: Option<&String>,
2244 from: Option<&String>,
2245 until: Option<&String>,
2246) -> Result<()> {
2247 if since.is_some() && from.is_some() {
2248 bail!("use either --since or --from, not both");
2249 }
2250 if let Some(since) = since {
2251 append_since_clause(sql, query_params, since)?;
2252 } else if let Some(from) = from {
2253 append_lower_bound_clause(sql, query_params, from, "--from")?;
2254 }
2255 if let Some(until) = until {
2256 append_until_clause(sql, query_params, until)?;
2257 }
2258 Ok(())
2259}
2260
2261fn append_excluded_sessions_clause(
2262 sql: &mut String,
2263 query_params: &mut Vec<String>,
2264 excluded_sessions: &[String],
2265) {
2266 for excluded_session in excluded_sessions {
2267 sql.push_str(" AND sessions.session_id != ? AND sessions.session_key != ?");
2268 query_params.push(excluded_session.clone());
2269 query_params.push(excluded_session.clone());
2270 }
2271}
2272
2273fn append_event_kind_clause(
2274 sql: &mut String,
2275 query_params: &mut Vec<String>,
2276 column_name: &str,
2277 kinds: &[EventKind],
2278) {
2279 if kinds.is_empty() {
2280 return;
2281 }
2282 sql.push_str(" AND ");
2283 sql.push_str(column_name);
2284 sql.push_str(" IN (");
2285 sql.push_str(&placeholders(kinds.len()));
2286 sql.push(')');
2287 append_kind_params(query_params, kinds);
2288}
2289
2290fn append_recent_event_kind_clause(
2291 sql: &mut String,
2292 query_params: &mut Vec<String>,
2293 kinds: &[EventKind],
2294) {
2295 if kinds.is_empty() {
2296 return;
2297 }
2298 sql.push_str(
2299 r#"
2300 AND EXISTS (
2301 SELECT 1 FROM events kind_events
2302 WHERE kind_events.session_key = sessions.session_key
2303 AND kind_events.kind IN (
2304 "#,
2305 );
2306 sql.push_str(&placeholders(kinds.len()));
2307 sql.push_str("))");
2308 append_kind_params(query_params, kinds);
2309}
2310
2311fn append_memory_filter_clauses(
2312 sql: &mut String,
2313 query_params: &mut Vec<String>,
2314 options: &MemorySearchOptions,
2315) -> Result<()> {
2316 append_memory_kind_clause(sql, query_params, &options.kinds);
2317 if let Some(repo) = &options.repo {
2318 sql.push_str(
2319 r#"
2320 AND EXISTS (
2321 SELECT 1
2322 FROM memory_evidence
2323 JOIN sessions ON sessions.session_key = memory_evidence.session_key
2324 WHERE memory_evidence.memory_id = memory_objects.id
2325 AND lower(sessions.repo) = lower(?)
2326 )
2327 "#,
2328 );
2329 query_params.push(repo.clone());
2330 }
2331 if let Some(cwd) = &options.cwd {
2332 sql.push_str(
2333 r#"
2334 AND EXISTS (
2335 SELECT 1
2336 FROM memory_evidence
2337 JOIN sessions ON sessions.session_key = memory_evidence.session_key
2338 WHERE memory_evidence.memory_id = memory_objects.id
2339 AND sessions.cwd LIKE '%' || ? || '%'
2340 )
2341 "#,
2342 );
2343 query_params.push(cwd.clone());
2344 }
2345 if options.since.is_some() || options.from.is_some() || options.until.is_some() {
2346 let mut time_clause = String::new();
2347 let mut time_params = Vec::new();
2348 append_from_until_clauses(
2349 &mut time_clause,
2350 &mut time_params,
2351 options.since.as_ref(),
2352 options.from.as_ref(),
2353 options.until.as_ref(),
2354 )?;
2355 if !time_clause.is_empty() {
2356 let clause = time_clause
2357 .replacen(" AND ", "", 1)
2358 .replace("sessions.", "related_sessions.");
2359 sql.push_str(
2360 r#"
2361 AND EXISTS (
2362 SELECT 1
2363 FROM memory_evidence
2364 JOIN sessions AS related_sessions ON related_sessions.session_key = memory_evidence.session_key
2365 WHERE memory_evidence.memory_id = memory_objects.id
2366 "#,
2367 );
2368 sql.push_str(" AND ");
2369 sql.push_str(&clause);
2370 sql.push(')');
2371 query_params.extend(time_params);
2372 }
2373 }
2374 Ok(())
2375}
2376
2377fn append_memory_kind_clause(
2378 sql: &mut String,
2379 query_params: &mut Vec<String>,
2380 kinds: &[MemoryKind],
2381) {
2382 if kinds.is_empty() {
2383 return;
2384 }
2385 sql.push_str(" AND memory_objects.kind IN (");
2386 sql.push_str(&placeholders(kinds.len()));
2387 sql.push(')');
2388 query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
2389}
2390
2391fn placeholders(count: usize) -> String {
2392 std::iter::repeat_n("?", count)
2393 .collect::<Vec<_>>()
2394 .join(", ")
2395}
2396
2397#[derive(Debug, Clone, PartialEq, Eq)]
2398struct DeltaCursor {
2399 change_id: i64,
2400}
2401
2402#[derive(Debug, Clone, PartialEq, Eq)]
2403struct ChangeLogRow {
2404 seq: i64,
2405 object_type: String,
2406 object_id: String,
2407 action: String,
2408 recorded_at: String,
2409}
2410
2411fn parse_delta_cursor(value: &str) -> Option<DeltaCursor> {
2412 let change_id = value.strip_prefix("chg_")?.parse::<i64>().ok()?;
2413 Some(DeltaCursor { change_id })
2414}
2415
2416pub fn encode_delta_cursor(item: &DeltaItem) -> String {
2417 format!("chg_{}", item.change_id())
2418}
2419
2420fn append_kind_params(query_params: &mut Vec<String>, kinds: &[EventKind]) {
2421 query_params.extend(kinds.iter().map(|kind| kind.as_str().to_owned()));
2422}
2423
2424struct SessionGroup {
2425 session_key: String,
2426 session_id: String,
2427 source_file_path: PathBuf,
2428 repo_matches_current: bool,
2429 hit_count: usize,
2430 best_score: f64,
2431 best_kind_weight: u8,
2432 session_timestamp: String,
2433 results: Vec<SearchResult>,
2434}
2435
2436fn rank_search_results(
2437 results: Vec<SearchResult>,
2438 _current_repo: Option<&str>,
2439 limit: usize,
2440 include_duplicates: bool,
2441) -> Vec<SearchResult> {
2442 let mut groups = Vec::<SessionGroup>::new();
2443
2444 for result in results {
2445 let kind_weight = event_kind_weight(result.kind);
2446 if let Some(group) = groups
2447 .iter_mut()
2448 .find(|group| group.session_key == result.session_key)
2449 {
2450 group.hit_count += 1;
2451 group.best_score = group.best_score.min(result.score);
2452 group.best_kind_weight = group.best_kind_weight.min(kind_weight);
2453 group.results.push(result);
2454 } else {
2455 groups.push(SessionGroup {
2456 session_key: result.session_key.clone(),
2457 session_id: result.session_id.clone(),
2458 source_file_path: result.source_file_path.clone(),
2459 repo_matches_current: result.repo_matches_current,
2460 hit_count: 1,
2461 best_score: result.score,
2462 best_kind_weight: kind_weight,
2463 session_timestamp: result.session_timestamp.clone(),
2464 results: vec![result],
2465 });
2466 }
2467 }
2468
2469 if !include_duplicates {
2470 groups = dedupe_session_groups(groups);
2471 }
2472
2473 groups.sort_by(|left, right| {
2474 right
2475 .repo_matches_current
2476 .cmp(&left.repo_matches_current)
2477 .then_with(|| right.hit_count.cmp(&left.hit_count))
2478 .then_with(|| left.best_kind_weight.cmp(&right.best_kind_weight))
2479 .then_with(|| {
2480 left.best_score
2481 .partial_cmp(&right.best_score)
2482 .unwrap_or(std::cmp::Ordering::Equal)
2483 })
2484 .then_with(|| right.session_timestamp.cmp(&left.session_timestamp))
2485 .then_with(|| left.session_key.cmp(&right.session_key))
2486 });
2487
2488 let mut ranked = Vec::new();
2489 for mut group in groups {
2490 group.results.sort_by(|left, right| {
2491 left.score
2492 .partial_cmp(&right.score)
2493 .unwrap_or(std::cmp::Ordering::Equal)
2494 .then_with(|| event_kind_weight(left.kind).cmp(&event_kind_weight(right.kind)))
2495 .then_with(|| left.source_line_number.cmp(&right.source_line_number))
2496 });
2497 for result in &mut group.results {
2498 result.session_hit_count = group.hit_count;
2499 result.best_kind_weight = group.best_kind_weight;
2500 }
2501 ranked.extend(group.results);
2502 if ranked.len() >= limit {
2503 ranked.truncate(limit);
2504 break;
2505 }
2506 }
2507
2508 ranked
2509}
2510
2511fn dedupe_session_groups(groups: Vec<SessionGroup>) -> Vec<SessionGroup> {
2512 let mut selected = HashMap::<String, SessionGroup>::new();
2513 for group in groups {
2514 match selected.entry(group.session_id.clone()) {
2515 Entry::Occupied(mut entry) => {
2516 if is_preferred_group(&group, entry.get()) {
2517 entry.insert(group);
2518 }
2519 }
2520 Entry::Vacant(entry) => {
2521 entry.insert(group);
2522 }
2523 }
2524 }
2525 selected.into_values().collect()
2526}
2527
2528fn is_preferred_group(candidate: &SessionGroup, current: &SessionGroup) -> bool {
2529 let candidate_priority = source_priority(&candidate.source_file_path);
2530 let current_priority = source_priority(¤t.source_file_path);
2531 candidate_priority < current_priority
2532 || (candidate_priority == current_priority
2533 && candidate.repo_matches_current
2534 && !current.repo_matches_current)
2535 || (candidate_priority == current_priority
2536 && candidate.repo_matches_current == current.repo_matches_current
2537 && candidate.session_timestamp > current.session_timestamp)
2538 || (candidate_priority == current_priority
2539 && candidate.repo_matches_current == current.repo_matches_current
2540 && candidate.session_timestamp == current.session_timestamp
2541 && candidate.session_key < current.session_key)
2542}
2543
2544fn dedupe_recent_sessions(sessions: Vec<RecentSession>) -> Vec<RecentSession> {
2545 let mut selected = HashMap::<String, RecentSession>::new();
2546 for session in sessions {
2547 match selected.entry(session.session_id.clone()) {
2548 Entry::Occupied(mut entry) => {
2549 if is_preferred_recent_session(&session, entry.get()) {
2550 entry.insert(session);
2551 }
2552 }
2553 Entry::Vacant(entry) => {
2554 entry.insert(session);
2555 }
2556 }
2557 }
2558
2559 let mut sessions = selected.into_values().collect::<Vec<_>>();
2560 sessions.sort_by(|left, right| {
2561 right
2562 .session_timestamp
2563 .cmp(&left.session_timestamp)
2564 .then_with(|| {
2565 source_priority(&left.source_file_path)
2566 .cmp(&source_priority(&right.source_file_path))
2567 })
2568 .then_with(|| left.session_key.cmp(&right.session_key))
2569 });
2570 sessions
2571}
2572
2573fn is_preferred_recent_session(candidate: &RecentSession, current: &RecentSession) -> bool {
2574 let candidate_priority = source_priority(&candidate.source_file_path);
2575 let current_priority = source_priority(¤t.source_file_path);
2576 candidate_priority < current_priority
2577 || (candidate_priority == current_priority
2578 && candidate.session_timestamp > current.session_timestamp)
2579 || (candidate_priority == current_priority
2580 && candidate.session_timestamp == current.session_timestamp
2581 && candidate.session_key < current.session_key)
2582}
2583
2584fn source_priority(path: &Path) -> u8 {
2585 if path
2586 .components()
2587 .any(|component| component.as_os_str() == "archived_sessions")
2588 {
2589 return 2;
2590 }
2591 if path
2592 .components()
2593 .any(|component| component.as_os_str() == "sessions")
2594 {
2595 return 0;
2596 }
2597 1
2598}
2599
2600pub fn source_priority_for_path(path: &Path) -> u8 {
2601 source_priority(path)
2602}
2603
2604fn event_kind_weight(kind: EventKind) -> u8 {
2605 match kind {
2606 EventKind::UserMessage => 0,
2607 EventKind::AssistantMessage => 1,
2608 EventKind::Command => 2,
2609 }
2610}
2611
2612fn session_repos(parsed: &ParsedSession) -> BTreeSet<String> {
2613 let mut repos = BTreeSet::new();
2614 let session_repo = repo_slug(&parsed.session.cwd);
2615 if !session_repo.is_empty() {
2616 repos.insert(session_repo);
2617 }
2618
2619 for event in &parsed.events {
2620 let Some(cwd) = &event.cwd else {
2621 continue;
2622 };
2623 let repo = repo_slug(cwd);
2624 if !repo.is_empty() {
2625 repos.insert(repo);
2626 }
2627 }
2628
2629 repos
2630}
2631
2632fn fts_terms(query: &str) -> Vec<String> {
2633 let mut terms = Vec::new();
2634 let mut current = String::new();
2635
2636 for ch in query.chars() {
2637 if ch.is_alphanumeric() || ch == '_' {
2638 current.push(ch);
2639 } else if !current.is_empty() {
2640 terms.push(std::mem::take(&mut current));
2641 }
2642 }
2643
2644 if !current.is_empty() {
2645 terms.push(current);
2646 }
2647
2648 terms
2649}
2650
2651fn quote_fts_term(term: &str) -> String {
2652 format!("\"{}\"", term.replace('"', "\"\""))
2653}
2654
2655fn and_fts_query(terms: &[String]) -> String {
2656 terms
2657 .iter()
2658 .map(|term| quote_fts_term(term))
2659 .collect::<Vec<_>>()
2660 .join(" AND ")
2661}
2662
2663fn or_fts_query(terms: &[String]) -> String {
2664 terms
2665 .iter()
2666 .map(|term| quote_fts_term(term))
2667 .collect::<Vec<_>>()
2668 .join(" OR ")
2669}
2670
2671pub fn build_session_key(session_id: &str, source_file_path: &Path) -> String {
2672 session_key(session_id, source_file_path)
2673}
2674
2675fn session_key(session_id: &str, source_file_path: &Path) -> String {
2676 format!(
2677 "{}:{:016x}",
2678 session_id,
2679 fnv1a64(source_file_path.display().to_string().as_bytes())
2680 )
2681}
2682
2683fn fnv1a64(bytes: &[u8]) -> u64 {
2684 let mut hash = 0xcbf29ce484222325u64;
2685 for byte in bytes {
2686 hash ^= u64::from(*byte);
2687 hash = hash.wrapping_mul(0x100000001b3);
2688 }
2689 hash
2690}
2691
2692fn repo_slug(cwd: &str) -> String {
2693 Path::new(cwd)
2694 .file_name()
2695 .and_then(|name| name.to_str())
2696 .unwrap_or(cwd)
2697 .to_owned()
2698}
2699
2700#[cfg(test)]
2701mod tests;