1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_core::session::{is_auxiliary_session, working_directory};
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde_json::Value;
9use std::fs;
10use std::io::{BufRead, BufReader};
11use std::path::PathBuf;
12use std::sync::Mutex;
13
14use git::{normalize_repo_name, GitContext};
15
16const SUMMARY_WORKER_TITLE_PREFIX_LOWER: &str =
17 "convert a real coding session into semantic compression.";
18
19#[derive(Debug, Clone)]
21pub struct LocalSessionRow {
22 pub id: String,
23 pub source_path: Option<String>,
24 pub sync_status: String,
25 pub last_synced_at: Option<String>,
26 pub user_id: Option<String>,
27 pub nickname: Option<String>,
28 pub team_id: Option<String>,
29 pub tool: String,
30 pub agent_provider: Option<String>,
31 pub agent_model: Option<String>,
32 pub title: Option<String>,
33 pub description: Option<String>,
34 pub tags: Option<String>,
35 pub created_at: String,
36 pub uploaded_at: Option<String>,
37 pub message_count: i64,
38 pub user_message_count: i64,
39 pub task_count: i64,
40 pub event_count: i64,
41 pub duration_seconds: i64,
42 pub total_input_tokens: i64,
43 pub total_output_tokens: i64,
44 pub git_remote: Option<String>,
45 pub git_branch: Option<String>,
46 pub git_commit: Option<String>,
47 pub git_repo_name: Option<String>,
48 pub pr_number: Option<i64>,
49 pub pr_url: Option<String>,
50 pub working_directory: Option<String>,
51 pub files_modified: Option<String>,
52 pub files_read: Option<String>,
53 pub has_errors: bool,
54 pub max_active_agents: i64,
55 pub is_auxiliary: bool,
56}
57
58#[derive(Debug, Clone)]
60pub struct LocalSessionLink {
61 pub session_id: String,
62 pub linked_session_id: String,
63 pub link_type: String,
64 pub created_at: String,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SessionSemanticSummaryRow {
70 pub session_id: String,
71 pub summary_json: String,
72 pub generated_at: String,
73 pub provider: String,
74 pub model: Option<String>,
75 pub source_kind: String,
76 pub generation_kind: String,
77 pub prompt_fingerprint: Option<String>,
78 pub source_details_json: Option<String>,
79 pub diff_tree_json: Option<String>,
80 pub error: Option<String>,
81 pub updated_at: String,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct SessionSemanticSummaryUpsert<'a> {
87 pub session_id: &'a str,
88 pub summary_json: &'a str,
89 pub generated_at: &'a str,
90 pub provider: &'a str,
91 pub model: Option<&'a str>,
92 pub source_kind: &'a str,
93 pub generation_kind: &'a str,
94 pub prompt_fingerprint: Option<&'a str>,
95 pub source_details_json: Option<&'a str>,
96 pub diff_tree_json: Option<&'a str>,
97 pub error: Option<&'a str>,
98}
99
100#[derive(Debug, Clone, PartialEq)]
102pub struct VectorChunkUpsert {
103 pub chunk_id: String,
104 pub session_id: String,
105 pub chunk_index: u32,
106 pub start_line: u32,
107 pub end_line: u32,
108 pub line_count: u32,
109 pub content: String,
110 pub content_hash: String,
111 pub embedding: Vec<f32>,
112}
113
114#[derive(Debug, Clone, PartialEq)]
116pub struct VectorChunkCandidateRow {
117 pub chunk_id: String,
118 pub session_id: String,
119 pub start_line: u32,
120 pub end_line: u32,
121 pub content: String,
122 pub embedding: Vec<f32>,
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct VectorIndexJobRow {
128 pub status: String,
129 pub processed_sessions: u32,
130 pub total_sessions: u32,
131 pub message: Option<String>,
132 pub started_at: Option<String>,
133 pub finished_at: Option<String>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct SummaryBatchJobRow {
139 pub status: String,
140 pub processed_sessions: u32,
141 pub total_sessions: u32,
142 pub failed_sessions: u32,
143 pub message: Option<String>,
144 pub started_at: Option<String>,
145 pub finished_at: Option<String>,
146}
147
148fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
149 let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
150
151 if source_path.contains("/.codex/sessions/")
152 || source_path.contains("\\.codex\\sessions\\")
153 || source_path.contains("/codex/sessions/")
154 || source_path.contains("\\codex\\sessions\\")
155 {
156 return Some("codex");
157 }
158
159 if source_path.contains("/.claude/projects/")
160 || source_path.contains("\\.claude\\projects\\")
161 || source_path.contains("/claude/projects/")
162 || source_path.contains("\\claude\\projects\\")
163 {
164 return Some("claude-code");
165 }
166
167 None
168}
169
170fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
171 infer_tool_from_source_path(source_path)
172 .unwrap_or(current_tool)
173 .to_string()
174}
175
176fn normalize_non_empty(value: Option<&str>) -> Option<String> {
177 value
178 .map(str::trim)
179 .filter(|value| !value.is_empty())
180 .map(ToOwned::to_owned)
181}
182
183fn build_fts_query(raw: &str) -> Option<String> {
184 let mut parts: Vec<String> = Vec::new();
185 for token in raw.split_whitespace() {
186 let trimmed = token.trim();
187 if trimmed.is_empty() {
188 continue;
189 }
190 let escaped = trimmed.replace('"', "\"\"");
191 parts.push(format!("\"{escaped}\""));
192 }
193 if parts.is_empty() {
194 return None;
195 }
196 Some(parts.join(" OR "))
197}
198
199fn json_object_string(value: &Value, keys: &[&str]) -> Option<String> {
200 let obj = value.as_object()?;
201 for key in keys {
202 if let Some(found) = obj.get(*key).and_then(Value::as_str) {
203 let normalized = found.trim();
204 if !normalized.is_empty() {
205 return Some(normalized.to_string());
206 }
207 }
208 }
209 None
210}
211
212fn git_context_from_session_attributes(session: &Session) -> GitContext {
213 let attrs = &session.context.attributes;
214
215 let mut remote = normalize_non_empty(attrs.get("git_remote").and_then(Value::as_str));
216 let mut branch = normalize_non_empty(attrs.get("git_branch").and_then(Value::as_str));
217 let mut commit = normalize_non_empty(attrs.get("git_commit").and_then(Value::as_str));
218 let mut repo_name = normalize_non_empty(attrs.get("git_repo_name").and_then(Value::as_str));
219
220 if let Some(git_value) = attrs.get("git") {
221 if remote.is_none() {
222 remote = json_object_string(
223 git_value,
224 &["remote", "repository_url", "repo_url", "origin", "url"],
225 );
226 }
227 if branch.is_none() {
228 branch = json_object_string(
229 git_value,
230 &["branch", "git_branch", "current_branch", "ref", "head"],
231 );
232 }
233 if commit.is_none() {
234 commit = json_object_string(git_value, &["commit", "commit_hash", "sha", "git_commit"]);
235 }
236 if repo_name.is_none() {
237 repo_name = json_object_string(git_value, &["repo_name", "repository", "repo", "name"]);
238 }
239 }
240
241 if repo_name.is_none() {
242 repo_name = remote
243 .as_deref()
244 .and_then(normalize_repo_name)
245 .map(ToOwned::to_owned);
246 }
247
248 GitContext {
249 remote,
250 branch,
251 commit,
252 repo_name,
253 }
254}
255
256fn git_context_has_any_field(git: &GitContext) -> bool {
257 git.remote.is_some() || git.branch.is_some() || git.commit.is_some() || git.repo_name.is_some()
258}
259
260fn merge_git_context(preferred: &GitContext, fallback: &GitContext) -> GitContext {
261 GitContext {
262 remote: preferred.remote.clone().or_else(|| fallback.remote.clone()),
263 branch: preferred.branch.clone().or_else(|| fallback.branch.clone()),
264 commit: preferred.commit.clone().or_else(|| fallback.commit.clone()),
265 repo_name: preferred
266 .repo_name
267 .clone()
268 .or_else(|| fallback.repo_name.clone()),
269 }
270}
271
272#[derive(Debug, Clone)]
274pub struct LocalSessionFilter {
275 pub team_id: Option<String>,
276 pub sync_status: Option<String>,
277 pub git_repo_name: Option<String>,
278 pub search: Option<String>,
279 pub exclude_low_signal: bool,
280 pub tool: Option<String>,
281 pub sort: LocalSortOrder,
282 pub time_range: LocalTimeRange,
283 pub limit: Option<u32>,
284 pub offset: Option<u32>,
285}
286
287impl Default for LocalSessionFilter {
288 fn default() -> Self {
289 Self {
290 team_id: None,
291 sync_status: None,
292 git_repo_name: None,
293 search: None,
294 exclude_low_signal: false,
295 tool: None,
296 sort: LocalSortOrder::Recent,
297 time_range: LocalTimeRange::All,
298 limit: None,
299 offset: None,
300 }
301 }
302}
303
304#[derive(Debug, Clone, Default, PartialEq, Eq)]
306pub enum LocalSortOrder {
307 #[default]
308 Recent,
309 Popular,
310 Longest,
311}
312
313#[derive(Debug, Clone, Default, PartialEq, Eq)]
315pub enum LocalTimeRange {
316 Hours24,
317 Days7,
318 Days30,
319 #[default]
320 All,
321}
322
323#[derive(Debug, Clone)]
325pub struct RemoteSessionSummary {
326 pub id: String,
327 pub user_id: Option<String>,
328 pub nickname: Option<String>,
329 pub team_id: String,
330 pub tool: String,
331 pub agent_provider: Option<String>,
332 pub agent_model: Option<String>,
333 pub title: Option<String>,
334 pub description: Option<String>,
335 pub tags: Option<String>,
336 pub created_at: String,
337 pub uploaded_at: String,
338 pub message_count: i64,
339 pub task_count: i64,
340 pub event_count: i64,
341 pub duration_seconds: i64,
342 pub total_input_tokens: i64,
343 pub total_output_tokens: i64,
344 pub git_remote: Option<String>,
345 pub git_branch: Option<String>,
346 pub git_commit: Option<String>,
347 pub git_repo_name: Option<String>,
348 pub pr_number: Option<i64>,
349 pub pr_url: Option<String>,
350 pub working_directory: Option<String>,
351 pub files_modified: Option<String>,
352 pub files_read: Option<String>,
353 pub has_errors: bool,
354 pub max_active_agents: i64,
355}
356
357#[derive(Debug, Default)]
359pub struct LogFilter {
360 pub tool: Option<String>,
362 pub model: Option<String>,
364 pub since: Option<String>,
366 pub before: Option<String>,
368 pub touches: Option<String>,
370 pub grep: Option<String>,
372 pub has_errors: Option<bool>,
374 pub working_directory: Option<String>,
376 pub git_repo_name: Option<String>,
378 pub limit: Option<u32>,
380 pub offset: Option<u32>,
382}
383
384const FROM_CLAUSE: &str = "\
386FROM sessions s \
387LEFT JOIN session_sync ss ON ss.session_id = s.id \
388LEFT JOIN users u ON u.id = s.user_id";
389
390pub struct LocalDb {
394 conn: Mutex<Connection>,
395}
396
397impl LocalDb {
398 pub fn open() -> Result<Self> {
401 let path = default_db_path()?;
402 Self::open_path(&path)
403 }
404
405 pub fn open_path(path: &PathBuf) -> Result<Self> {
407 if let Some(parent) = path.parent() {
408 std::fs::create_dir_all(parent)
409 .with_context(|| format!("create dir for {}", path.display()))?;
410 }
411 let conn = open_connection_with_latest_schema(path)
412 .with_context(|| format!("open local db {}", path.display()))?;
413 Ok(Self {
414 conn: Mutex::new(conn),
415 })
416 }
417
418 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
419 self.conn.lock().expect("local db mutex poisoned")
420 }
421
422 pub fn upsert_local_session(
425 &self,
426 session: &Session,
427 source_path: &str,
428 git: &GitContext,
429 ) -> Result<()> {
430 let is_empty_signal = session.stats.event_count == 0
431 && session.stats.message_count == 0
432 && session.stats.user_message_count == 0
433 && session.stats.task_count == 0;
434 if is_empty_signal {
435 self.delete_session(&session.session_id)?;
438 return Ok(());
439 }
440
441 let title = session.context.title.as_deref();
442 let description = session.context.description.as_deref();
443 let tags = if session.context.tags.is_empty() {
444 None
445 } else {
446 Some(session.context.tags.join(","))
447 };
448 let created_at = session.context.created_at.to_rfc3339();
449 let cwd = working_directory(session).map(String::from);
450 let is_auxiliary = is_auxiliary_session(session);
451
452 let (files_modified, files_read, has_errors) =
454 opensession_core::extract::extract_file_metadata(session);
455 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
456 let normalized_tool =
457 normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
458 let git_from_session = git_context_from_session_attributes(session);
459 let has_session_git = git_context_has_any_field(&git_from_session);
460 let merged_git = merge_git_context(&git_from_session, git);
461
462 let conn = self.conn();
463 conn.execute(
465 "INSERT INTO sessions \
466 (id, team_id, tool, agent_provider, agent_model, \
467 title, description, tags, created_at, \
468 message_count, user_message_count, task_count, event_count, duration_seconds, \
469 total_input_tokens, total_output_tokens, body_storage_key, \
470 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
471 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
472 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24,?25) \
473 ON CONFLICT(id) DO UPDATE SET \
474 tool=excluded.tool, agent_provider=excluded.agent_provider, \
475 agent_model=excluded.agent_model, \
476 title=excluded.title, description=excluded.description, \
477 tags=excluded.tags, \
478 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
479 task_count=excluded.task_count, \
480 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
481 total_input_tokens=excluded.total_input_tokens, \
482 total_output_tokens=excluded.total_output_tokens, \
483 git_remote=CASE WHEN ?26=1 THEN excluded.git_remote ELSE COALESCE(git_remote, excluded.git_remote) END, \
484 git_branch=CASE WHEN ?26=1 THEN excluded.git_branch ELSE COALESCE(git_branch, excluded.git_branch) END, \
485 git_commit=CASE WHEN ?26=1 THEN excluded.git_commit ELSE COALESCE(git_commit, excluded.git_commit) END, \
486 git_repo_name=CASE WHEN ?26=1 THEN excluded.git_repo_name ELSE COALESCE(git_repo_name, excluded.git_repo_name) END, \
487 working_directory=excluded.working_directory, \
488 files_modified=excluded.files_modified, files_read=excluded.files_read, \
489 has_errors=excluded.has_errors, \
490 max_active_agents=excluded.max_active_agents, \
491 is_auxiliary=excluded.is_auxiliary",
492 params![
493 &session.session_id,
494 &normalized_tool,
495 &session.agent.provider,
496 &session.agent.model,
497 title,
498 description,
499 &tags,
500 &created_at,
501 session.stats.message_count as i64,
502 session.stats.user_message_count as i64,
503 session.stats.task_count as i64,
504 session.stats.event_count as i64,
505 session.stats.duration_seconds as i64,
506 session.stats.total_input_tokens as i64,
507 session.stats.total_output_tokens as i64,
508 &merged_git.remote,
509 &merged_git.branch,
510 &merged_git.commit,
511 &merged_git.repo_name,
512 &cwd,
513 &files_modified,
514 &files_read,
515 has_errors,
516 max_active_agents,
517 is_auxiliary as i64,
518 has_session_git as i64,
519 ],
520 )?;
521
522 conn.execute(
523 "INSERT INTO session_sync (session_id, source_path, sync_status) \
524 VALUES (?1, ?2, 'local_only') \
525 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
526 params![&session.session_id, source_path],
527 )?;
528 Ok(())
529 }
530
531 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
534 let conn = self.conn();
535 conn.execute(
537 "INSERT INTO sessions \
538 (id, user_id, team_id, tool, agent_provider, agent_model, \
539 title, description, tags, created_at, uploaded_at, \
540 message_count, task_count, event_count, duration_seconds, \
541 total_input_tokens, total_output_tokens, body_storage_key, \
542 git_remote, git_branch, git_commit, git_repo_name, \
543 pr_number, pr_url, working_directory, \
544 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
545 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,0) \
546 ON CONFLICT(id) DO UPDATE SET \
547 title=excluded.title, description=excluded.description, \
548 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
549 message_count=excluded.message_count, task_count=excluded.task_count, \
550 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
551 total_input_tokens=excluded.total_input_tokens, \
552 total_output_tokens=excluded.total_output_tokens, \
553 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
554 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
555 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
556 working_directory=excluded.working_directory, \
557 files_modified=excluded.files_modified, files_read=excluded.files_read, \
558 has_errors=excluded.has_errors, \
559 max_active_agents=excluded.max_active_agents, \
560 is_auxiliary=excluded.is_auxiliary",
561 params![
562 &summary.id,
563 &summary.user_id,
564 &summary.team_id,
565 &summary.tool,
566 &summary.agent_provider,
567 &summary.agent_model,
568 &summary.title,
569 &summary.description,
570 &summary.tags,
571 &summary.created_at,
572 &summary.uploaded_at,
573 summary.message_count,
574 summary.task_count,
575 summary.event_count,
576 summary.duration_seconds,
577 summary.total_input_tokens,
578 summary.total_output_tokens,
579 &summary.git_remote,
580 &summary.git_branch,
581 &summary.git_commit,
582 &summary.git_repo_name,
583 summary.pr_number,
584 &summary.pr_url,
585 &summary.working_directory,
586 &summary.files_modified,
587 &summary.files_read,
588 summary.has_errors,
589 summary.max_active_agents,
590 ],
591 )?;
592
593 conn.execute(
594 "INSERT INTO session_sync (session_id, sync_status) \
595 VALUES (?1, 'remote_only') \
596 ON CONFLICT(session_id) DO UPDATE SET \
597 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
598 params![&summary.id],
599 )?;
600 Ok(())
601 }
602
603 fn build_local_session_where_clause(
606 filter: &LocalSessionFilter,
607 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
608 let mut where_clauses = vec![
609 "1=1".to_string(),
610 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
611 format!(
612 "NOT (LOWER(COALESCE(s.tool, '')) = 'codex' \
613 AND LOWER(COALESCE(s.title, '')) LIKE '{}%')",
614 SUMMARY_WORKER_TITLE_PREFIX_LOWER
615 ),
616 ];
617 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
618 let mut idx = 1u32;
619
620 if let Some(ref team_id) = filter.team_id {
621 where_clauses.push(format!("s.team_id = ?{idx}"));
622 param_values.push(Box::new(team_id.clone()));
623 idx += 1;
624 }
625
626 if let Some(ref sync_status) = filter.sync_status {
627 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
628 param_values.push(Box::new(sync_status.clone()));
629 idx += 1;
630 }
631
632 if let Some(ref repo) = filter.git_repo_name {
633 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
634 param_values.push(Box::new(repo.clone()));
635 idx += 1;
636 }
637
638 if let Some(ref tool) = filter.tool {
639 where_clauses.push(format!("s.tool = ?{idx}"));
640 param_values.push(Box::new(tool.clone()));
641 idx += 1;
642 }
643
644 if let Some(ref search) = filter.search {
645 let like = format!("%{search}%");
646 where_clauses.push(format!(
647 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
648 i1 = idx,
649 i2 = idx + 1,
650 i3 = idx + 2,
651 ));
652 param_values.push(Box::new(like.clone()));
653 param_values.push(Box::new(like.clone()));
654 param_values.push(Box::new(like));
655 idx += 3;
656 }
657
658 if filter.exclude_low_signal {
659 where_clauses.push(
660 "NOT (COALESCE(s.message_count, 0) = 0 \
661 AND COALESCE(s.user_message_count, 0) = 0 \
662 AND COALESCE(s.task_count, 0) = 0 \
663 AND COALESCE(s.event_count, 0) <= 2 \
664 AND (s.title IS NULL OR TRIM(s.title) = ''))"
665 .to_string(),
666 );
667 }
668
669 let interval = match filter.time_range {
670 LocalTimeRange::Hours24 => Some("-1 day"),
671 LocalTimeRange::Days7 => Some("-7 days"),
672 LocalTimeRange::Days30 => Some("-30 days"),
673 LocalTimeRange::All => None,
674 };
675 if let Some(interval) = interval {
676 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
677 param_values.push(Box::new(interval.to_string()));
678 }
679
680 (where_clauses.join(" AND "), param_values)
681 }
682
683 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
684 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
685 let order_clause = match filter.sort {
686 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
687 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
688 LocalSortOrder::Recent => "s.created_at DESC",
689 };
690
691 let mut sql = format!(
692 "SELECT {LOCAL_SESSION_COLUMNS} \
693 {FROM_CLAUSE} WHERE {where_str} \
694 ORDER BY {order_clause}"
695 );
696
697 if let Some(limit) = filter.limit {
698 sql.push_str(" LIMIT ?");
699 param_values.push(Box::new(limit));
700 if let Some(offset) = filter.offset {
701 sql.push_str(" OFFSET ?");
702 param_values.push(Box::new(offset));
703 }
704 }
705
706 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
707 param_values.iter().map(|p| p.as_ref()).collect();
708 let conn = self.conn();
709 let mut stmt = conn.prepare(&sql)?;
710 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
711
712 let mut result = Vec::new();
713 for row in rows {
714 result.push(row?);
715 }
716
717 Ok(result)
718 }
719
720 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
722 let mut count_filter = filter.clone();
723 count_filter.limit = None;
724 count_filter.offset = None;
725 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
726 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
727 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
728 param_values.iter().map(|p| p.as_ref()).collect();
729 let conn = self.conn();
730 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
731 Ok(count)
732 }
733
734 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
736 let mut tool_filter = filter.clone();
737 tool_filter.tool = None;
738 tool_filter.limit = None;
739 tool_filter.offset = None;
740 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
741 let sql = format!(
742 "SELECT DISTINCT s.tool \
743 {FROM_CLAUSE} WHERE {where_str} \
744 ORDER BY s.tool ASC"
745 );
746 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
747 param_values.iter().map(|p| p.as_ref()).collect();
748 let conn = self.conn();
749 let mut stmt = conn.prepare(&sql)?;
750 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
751
752 let mut tools = Vec::new();
753 for row in rows {
754 let tool = row?;
755 if !tool.trim().is_empty() {
756 tools.push(tool);
757 }
758 }
759 Ok(tools)
760 }
761
762 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
766 let mut where_clauses = vec![
767 "1=1".to_string(),
768 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
769 format!(
770 "NOT (LOWER(COALESCE(s.tool, '')) = 'codex' \
771 AND LOWER(COALESCE(s.title, '')) LIKE '{}%')",
772 SUMMARY_WORKER_TITLE_PREFIX_LOWER
773 ),
774 ];
775 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
776 let mut idx = 1u32;
777
778 if let Some(ref tool) = filter.tool {
779 where_clauses.push(format!("s.tool = ?{idx}"));
780 param_values.push(Box::new(tool.clone()));
781 idx += 1;
782 }
783
784 if let Some(ref model) = filter.model {
785 let like = model.replace('*', "%");
786 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
787 param_values.push(Box::new(like));
788 idx += 1;
789 }
790
791 if let Some(ref since) = filter.since {
792 where_clauses.push(format!("s.created_at >= ?{idx}"));
793 param_values.push(Box::new(since.clone()));
794 idx += 1;
795 }
796
797 if let Some(ref before) = filter.before {
798 where_clauses.push(format!("s.created_at < ?{idx}"));
799 param_values.push(Box::new(before.clone()));
800 idx += 1;
801 }
802
803 if let Some(ref touches) = filter.touches {
804 let like = format!("%\"{touches}\"%");
805 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
806 param_values.push(Box::new(like));
807 idx += 1;
808 }
809
810 if let Some(ref grep) = filter.grep {
811 let like = format!("%{grep}%");
812 where_clauses.push(format!(
813 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
814 i1 = idx,
815 i2 = idx + 1,
816 i3 = idx + 2,
817 ));
818 param_values.push(Box::new(like.clone()));
819 param_values.push(Box::new(like.clone()));
820 param_values.push(Box::new(like));
821 idx += 3;
822 }
823
824 if let Some(true) = filter.has_errors {
825 where_clauses.push("s.has_errors = 1".to_string());
826 }
827
828 if let Some(ref wd) = filter.working_directory {
829 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
830 param_values.push(Box::new(format!("{wd}%")));
831 idx += 1;
832 }
833
834 if let Some(ref repo) = filter.git_repo_name {
835 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
836 param_values.push(Box::new(repo.clone()));
837 idx += 1;
838 }
839
840 let _ = idx; let where_str = where_clauses.join(" AND ");
843 let mut sql = format!(
844 "SELECT {LOCAL_SESSION_COLUMNS} \
845 {FROM_CLAUSE} WHERE {where_str} \
846 ORDER BY s.created_at DESC"
847 );
848
849 if let Some(limit) = filter.limit {
850 sql.push_str(" LIMIT ?");
851 param_values.push(Box::new(limit));
852 if let Some(offset) = filter.offset {
853 sql.push_str(" OFFSET ?");
854 param_values.push(Box::new(offset));
855 }
856 }
857
858 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
859 param_values.iter().map(|p| p.as_ref()).collect();
860 let conn = self.conn();
861 let mut stmt = conn.prepare(&sql)?;
862 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
863
864 let mut result = Vec::new();
865 for row in rows {
866 result.push(row?);
867 }
868 Ok(result)
869 }
870
871 pub fn get_sessions_by_tool_latest(
873 &self,
874 tool: &str,
875 count: u32,
876 ) -> Result<Vec<LocalSessionRow>> {
877 let sql = format!(
878 "SELECT {LOCAL_SESSION_COLUMNS} \
879 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
880 ORDER BY s.created_at DESC"
881 );
882 let conn = self.conn();
883 let mut stmt = conn.prepare(&sql)?;
884 let rows = stmt.query_map(params![tool], row_to_local_session)?;
885 let mut result = Vec::new();
886 for row in rows {
887 result.push(row?);
888 }
889
890 result.truncate(count as usize);
891 Ok(result)
892 }
893
894 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
896 let sql = format!(
897 "SELECT {LOCAL_SESSION_COLUMNS} \
898 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
899 ORDER BY s.created_at DESC"
900 );
901 let conn = self.conn();
902 let mut stmt = conn.prepare(&sql)?;
903 let rows = stmt.query_map([], row_to_local_session)?;
904 let mut result = Vec::new();
905 for row in rows {
906 result.push(row?);
907 }
908
909 result.truncate(count as usize);
910 Ok(result)
911 }
912
913 pub fn get_session_by_tool_offset(
915 &self,
916 tool: &str,
917 offset: u32,
918 ) -> Result<Option<LocalSessionRow>> {
919 let sql = format!(
920 "SELECT {LOCAL_SESSION_COLUMNS} \
921 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
922 ORDER BY s.created_at DESC"
923 );
924 let conn = self.conn();
925 let mut stmt = conn.prepare(&sql)?;
926 let rows = stmt.query_map(params![tool], row_to_local_session)?;
927 let result = rows.collect::<Result<Vec<_>, _>>()?;
928 Ok(result.into_iter().nth(offset as usize))
929 }
930
931 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
933 let sql = format!(
934 "SELECT {LOCAL_SESSION_COLUMNS} \
935 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
936 ORDER BY s.created_at DESC"
937 );
938 let conn = self.conn();
939 let mut stmt = conn.prepare(&sql)?;
940 let rows = stmt.query_map([], row_to_local_session)?;
941 let result = rows.collect::<Result<Vec<_>, _>>()?;
942 Ok(result.into_iter().nth(offset as usize))
943 }
944
945 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
947 let conn = self.conn();
948 let result = conn
949 .query_row(
950 "SELECT source_path FROM session_sync WHERE session_id = ?1",
951 params![session_id],
952 |row| row.get(0),
953 )
954 .optional()?;
955
956 Ok(result)
957 }
958
959 pub fn list_session_source_paths(&self) -> Result<Vec<(String, String)>> {
961 let conn = self.conn();
962 let mut stmt = conn.prepare(
963 "SELECT session_id, source_path \
964 FROM session_sync \
965 WHERE source_path IS NOT NULL AND TRIM(source_path) != ''",
966 )?;
967 let rows = stmt.query_map([], |row| {
968 let session_id: String = row.get(0)?;
969 let source_path: String = row.get(1)?;
970 Ok((session_id, source_path))
971 })?;
972 rows.collect::<std::result::Result<Vec<_>, _>>()
973 .map_err(Into::into)
974 }
975
976 pub fn get_session_by_id(&self, session_id: &str) -> Result<Option<LocalSessionRow>> {
978 let sql = format!(
979 "SELECT {LOCAL_SESSION_COLUMNS} \
980 {FROM_CLAUSE} WHERE s.id = ?1 LIMIT 1"
981 );
982 let conn = self.conn();
983 let mut stmt = conn.prepare(&sql)?;
984 let row = stmt
985 .query_map(params![session_id], row_to_local_session)?
986 .next()
987 .transpose()?;
988 Ok(row)
989 }
990
991 pub fn list_session_links(&self, session_id: &str) -> Result<Vec<LocalSessionLink>> {
993 let conn = self.conn();
994 let mut stmt = conn.prepare(
995 "SELECT session_id, linked_session_id, link_type, created_at \
996 FROM session_links WHERE session_id = ?1 ORDER BY created_at ASC",
997 )?;
998 let rows = stmt.query_map(params![session_id], |row| {
999 Ok(LocalSessionLink {
1000 session_id: row.get(0)?,
1001 linked_session_id: row.get(1)?,
1002 link_type: row.get(2)?,
1003 created_at: row.get(3)?,
1004 })
1005 })?;
1006 rows.collect::<std::result::Result<Vec<_>, _>>()
1007 .map_err(Into::into)
1008 }
1009
1010 pub fn session_count(&self) -> Result<i64> {
1012 let count = self
1013 .conn()
1014 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
1015 Ok(count)
1016 }
1017
1018 pub fn delete_session(&self, session_id: &str) -> Result<()> {
1021 let conn = self.conn();
1022 conn.execute(
1023 "DELETE FROM session_links WHERE session_id = ?1 OR linked_session_id = ?1",
1024 params![session_id],
1025 )?;
1026 conn.execute(
1027 "DELETE FROM vector_embeddings \
1028 WHERE chunk_id IN (SELECT id FROM vector_chunks WHERE session_id = ?1)",
1029 params![session_id],
1030 )?;
1031 conn.execute(
1032 "DELETE FROM vector_chunks_fts WHERE session_id = ?1",
1033 params![session_id],
1034 )?;
1035 conn.execute(
1036 "DELETE FROM vector_chunks WHERE session_id = ?1",
1037 params![session_id],
1038 )?;
1039 conn.execute(
1040 "DELETE FROM vector_index_sessions WHERE session_id = ?1",
1041 params![session_id],
1042 )?;
1043 conn.execute(
1044 "DELETE FROM session_semantic_summaries WHERE session_id = ?1",
1045 params![session_id],
1046 )?;
1047 conn.execute(
1048 "DELETE FROM body_cache WHERE session_id = ?1",
1049 params![session_id],
1050 )?;
1051 conn.execute(
1052 "DELETE FROM session_sync WHERE session_id = ?1",
1053 params![session_id],
1054 )?;
1055 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
1056 Ok(())
1057 }
1058
1059 pub fn upsert_session_semantic_summary(
1062 &self,
1063 payload: &SessionSemanticSummaryUpsert<'_>,
1064 ) -> Result<()> {
1065 self.conn().execute(
1066 "INSERT INTO session_semantic_summaries (\
1067 session_id, summary_json, generated_at, provider, model, \
1068 source_kind, generation_kind, prompt_fingerprint, source_details_json, \
1069 diff_tree_json, error, updated_at\
1070 ) VALUES (\
1071 ?1, ?2, ?3, ?4, ?5, \
1072 ?6, ?7, ?8, ?9, \
1073 ?10, ?11, datetime('now')\
1074 ) \
1075 ON CONFLICT(session_id) DO UPDATE SET \
1076 summary_json=excluded.summary_json, \
1077 generated_at=excluded.generated_at, \
1078 provider=excluded.provider, \
1079 model=excluded.model, \
1080 source_kind=excluded.source_kind, \
1081 generation_kind=excluded.generation_kind, \
1082 prompt_fingerprint=excluded.prompt_fingerprint, \
1083 source_details_json=excluded.source_details_json, \
1084 diff_tree_json=excluded.diff_tree_json, \
1085 error=excluded.error, \
1086 updated_at=datetime('now')",
1087 params![
1088 payload.session_id,
1089 payload.summary_json,
1090 payload.generated_at,
1091 payload.provider,
1092 payload.model,
1093 payload.source_kind,
1094 payload.generation_kind,
1095 payload.prompt_fingerprint,
1096 payload.source_details_json,
1097 payload.diff_tree_json,
1098 payload.error,
1099 ],
1100 )?;
1101 Ok(())
1102 }
1103
1104 pub fn list_expired_session_ids(&self, keep_days: u32) -> Result<Vec<String>> {
1105 let conn = self.conn();
1106 let mut stmt = conn.prepare(
1107 "SELECT id FROM sessions \
1108 WHERE julianday(created_at) <= julianday('now') - ?1 \
1109 ORDER BY created_at ASC",
1110 )?;
1111 let rows = stmt.query_map(params![keep_days as i64], |row| row.get(0))?;
1112 rows.collect::<std::result::Result<Vec<_>, _>>()
1113 .map_err(Into::into)
1114 }
1115
1116 pub fn list_all_session_ids(&self) -> Result<Vec<String>> {
1118 let conn = self.conn();
1119 let mut stmt = conn.prepare("SELECT id FROM sessions ORDER BY id ASC")?;
1120 let rows = stmt.query_map([], |row| row.get(0))?;
1121 rows.collect::<std::result::Result<Vec<_>, _>>()
1122 .map_err(Into::into)
1123 }
1124
1125 pub fn list_session_semantic_summary_ids(&self) -> Result<Vec<String>> {
1127 let conn = self.conn();
1128 let mut stmt = conn
1129 .prepare("SELECT session_id FROM session_semantic_summaries ORDER BY session_id ASC")?;
1130 let rows = stmt.query_map([], |row| row.get(0))?;
1131 rows.collect::<std::result::Result<Vec<_>, _>>()
1132 .map_err(Into::into)
1133 }
1134
1135 pub fn get_session_semantic_summary(
1136 &self,
1137 session_id: &str,
1138 ) -> Result<Option<SessionSemanticSummaryRow>> {
1139 let row = self
1140 .conn()
1141 .query_row(
1142 "SELECT session_id, summary_json, generated_at, provider, model, \
1143 source_kind, generation_kind, prompt_fingerprint, source_details_json, \
1144 diff_tree_json, error, updated_at \
1145 FROM session_semantic_summaries WHERE session_id = ?1 LIMIT 1",
1146 params![session_id],
1147 |row| {
1148 Ok(SessionSemanticSummaryRow {
1149 session_id: row.get(0)?,
1150 summary_json: row.get(1)?,
1151 generated_at: row.get(2)?,
1152 provider: row.get(3)?,
1153 model: row.get(4)?,
1154 source_kind: row.get(5)?,
1155 generation_kind: row.get(6)?,
1156 prompt_fingerprint: row.get(7)?,
1157 source_details_json: row.get(8)?,
1158 diff_tree_json: row.get(9)?,
1159 error: row.get(10)?,
1160 updated_at: row.get(11)?,
1161 })
1162 },
1163 )
1164 .optional()?;
1165 Ok(row)
1166 }
1167
1168 pub fn delete_expired_session_summaries(&self, keep_days: u32) -> Result<u32> {
1169 let deleted = self.conn().execute(
1170 "DELETE FROM session_semantic_summaries \
1171 WHERE julianday(generated_at) <= julianday('now') - ?1",
1172 params![keep_days as i64],
1173 )?;
1174 Ok(deleted as u32)
1175 }
1176
1177 pub fn vector_index_source_hash(&self, session_id: &str) -> Result<Option<String>> {
1180 let hash = self
1181 .conn()
1182 .query_row(
1183 "SELECT source_hash FROM vector_index_sessions WHERE session_id = ?1",
1184 params![session_id],
1185 |row| row.get(0),
1186 )
1187 .optional()?;
1188 Ok(hash)
1189 }
1190
1191 pub fn clear_vector_index(&self) -> Result<()> {
1192 let conn = self.conn();
1193 conn.execute("DELETE FROM vector_embeddings", [])?;
1194 conn.execute("DELETE FROM vector_chunks_fts", [])?;
1195 conn.execute("DELETE FROM vector_chunks", [])?;
1196 conn.execute("DELETE FROM vector_index_sessions", [])?;
1197 Ok(())
1198 }
1199
1200 pub fn replace_session_vector_chunks(
1201 &self,
1202 session_id: &str,
1203 source_hash: &str,
1204 model: &str,
1205 chunks: &[VectorChunkUpsert],
1206 ) -> Result<()> {
1207 let mut conn = self.conn();
1208 let tx = conn.transaction()?;
1209
1210 tx.execute(
1211 "DELETE FROM vector_embeddings \
1212 WHERE chunk_id IN (SELECT id FROM vector_chunks WHERE session_id = ?1)",
1213 params![session_id],
1214 )?;
1215 tx.execute(
1216 "DELETE FROM vector_chunks_fts WHERE session_id = ?1",
1217 params![session_id],
1218 )?;
1219 tx.execute(
1220 "DELETE FROM vector_chunks WHERE session_id = ?1",
1221 params![session_id],
1222 )?;
1223
1224 for chunk in chunks {
1225 let embedding_json = serde_json::to_string(&chunk.embedding)
1226 .context("serialize vector embedding for local cache")?;
1227 tx.execute(
1228 "INSERT INTO vector_chunks \
1229 (id, session_id, chunk_index, start_line, end_line, line_count, content, content_hash, created_at, updated_at) \
1230 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, datetime('now'), datetime('now'))",
1231 params![
1232 &chunk.chunk_id,
1233 &chunk.session_id,
1234 chunk.chunk_index as i64,
1235 chunk.start_line as i64,
1236 chunk.end_line as i64,
1237 chunk.line_count as i64,
1238 &chunk.content,
1239 &chunk.content_hash,
1240 ],
1241 )?;
1242 tx.execute(
1243 "INSERT INTO vector_embeddings \
1244 (chunk_id, model, embedding_dim, embedding_json, updated_at) \
1245 VALUES (?1, ?2, ?3, ?4, datetime('now'))",
1246 params![
1247 &chunk.chunk_id,
1248 model,
1249 chunk.embedding.len() as i64,
1250 &embedding_json
1251 ],
1252 )?;
1253 tx.execute(
1254 "INSERT INTO vector_chunks_fts (chunk_id, session_id, content) VALUES (?1, ?2, ?3)",
1255 params![&chunk.chunk_id, &chunk.session_id, &chunk.content],
1256 )?;
1257 }
1258
1259 tx.execute(
1260 "INSERT INTO vector_index_sessions \
1261 (session_id, source_hash, chunk_count, last_indexed_at, updated_at) \
1262 VALUES (?1, ?2, ?3, datetime('now'), datetime('now')) \
1263 ON CONFLICT(session_id) DO UPDATE SET \
1264 source_hash=excluded.source_hash, \
1265 chunk_count=excluded.chunk_count, \
1266 last_indexed_at=datetime('now'), \
1267 updated_at=datetime('now')",
1268 params![session_id, source_hash, chunks.len() as i64],
1269 )?;
1270
1271 tx.commit()?;
1272 Ok(())
1273 }
1274
1275 pub fn list_vector_chunk_candidates(
1276 &self,
1277 query: &str,
1278 model: &str,
1279 limit: u32,
1280 ) -> Result<Vec<VectorChunkCandidateRow>> {
1281 let Some(fts_query) = build_fts_query(query) else {
1282 return Ok(Vec::new());
1283 };
1284 let conn = self.conn();
1285 let mut stmt = conn.prepare(
1286 "SELECT c.id, c.session_id, c.start_line, c.end_line, c.content, e.embedding_json \
1287 FROM vector_chunks_fts f \
1288 INNER JOIN vector_chunks c ON c.id = f.chunk_id \
1289 INNER JOIN vector_embeddings e ON e.chunk_id = c.id \
1290 WHERE f.content MATCH ?1 AND e.model = ?2 \
1291 ORDER BY bm25(vector_chunks_fts) ASC, c.updated_at DESC \
1292 LIMIT ?3",
1293 )?;
1294 let rows = stmt.query_map(params![fts_query, model, limit as i64], |row| {
1295 let embedding_json: String = row.get(5)?;
1296 let embedding =
1297 serde_json::from_str::<Vec<f32>>(&embedding_json).unwrap_or_else(|_| Vec::new());
1298 Ok(VectorChunkCandidateRow {
1299 chunk_id: row.get(0)?,
1300 session_id: row.get(1)?,
1301 start_line: row.get::<_, i64>(2)?.max(0) as u32,
1302 end_line: row.get::<_, i64>(3)?.max(0) as u32,
1303 content: row.get(4)?,
1304 embedding,
1305 })
1306 })?;
1307 rows.collect::<std::result::Result<Vec<_>, _>>()
1308 .map_err(Into::into)
1309 }
1310
1311 pub fn list_recent_vector_chunks_for_model(
1312 &self,
1313 model: &str,
1314 limit: u32,
1315 ) -> Result<Vec<VectorChunkCandidateRow>> {
1316 let conn = self.conn();
1317 let mut stmt = conn.prepare(
1318 "SELECT c.id, c.session_id, c.start_line, c.end_line, c.content, e.embedding_json \
1319 FROM vector_chunks c \
1320 INNER JOIN vector_embeddings e ON e.chunk_id = c.id \
1321 WHERE e.model = ?1 \
1322 ORDER BY c.updated_at DESC \
1323 LIMIT ?2",
1324 )?;
1325 let rows = stmt.query_map(params![model, limit as i64], |row| {
1326 let embedding_json: String = row.get(5)?;
1327 let embedding =
1328 serde_json::from_str::<Vec<f32>>(&embedding_json).unwrap_or_else(|_| Vec::new());
1329 Ok(VectorChunkCandidateRow {
1330 chunk_id: row.get(0)?,
1331 session_id: row.get(1)?,
1332 start_line: row.get::<_, i64>(2)?.max(0) as u32,
1333 end_line: row.get::<_, i64>(3)?.max(0) as u32,
1334 content: row.get(4)?,
1335 embedding,
1336 })
1337 })?;
1338 rows.collect::<std::result::Result<Vec<_>, _>>()
1339 .map_err(Into::into)
1340 }
1341
1342 pub fn set_vector_index_job(&self, payload: &VectorIndexJobRow) -> Result<()> {
1343 self.conn().execute(
1344 "INSERT INTO vector_index_jobs \
1345 (id, status, processed_sessions, total_sessions, message, started_at, finished_at, updated_at) \
1346 VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, datetime('now')) \
1347 ON CONFLICT(id) DO UPDATE SET \
1348 status=excluded.status, \
1349 processed_sessions=excluded.processed_sessions, \
1350 total_sessions=excluded.total_sessions, \
1351 message=excluded.message, \
1352 started_at=excluded.started_at, \
1353 finished_at=excluded.finished_at, \
1354 updated_at=datetime('now')",
1355 params![
1356 payload.status,
1357 payload.processed_sessions as i64,
1358 payload.total_sessions as i64,
1359 payload.message,
1360 payload.started_at,
1361 payload.finished_at,
1362 ],
1363 )?;
1364 Ok(())
1365 }
1366
1367 pub fn get_vector_index_job(&self) -> Result<Option<VectorIndexJobRow>> {
1368 let row = self
1369 .conn()
1370 .query_row(
1371 "SELECT status, processed_sessions, total_sessions, message, started_at, finished_at \
1372 FROM vector_index_jobs WHERE id = 1 LIMIT 1",
1373 [],
1374 |row| {
1375 Ok(VectorIndexJobRow {
1376 status: row.get(0)?,
1377 processed_sessions: row.get::<_, i64>(1)?.max(0) as u32,
1378 total_sessions: row.get::<_, i64>(2)?.max(0) as u32,
1379 message: row.get(3)?,
1380 started_at: row.get(4)?,
1381 finished_at: row.get(5)?,
1382 })
1383 },
1384 )
1385 .optional()?;
1386 Ok(row)
1387 }
1388
1389 pub fn set_summary_batch_job(&self, payload: &SummaryBatchJobRow) -> Result<()> {
1390 self.conn().execute(
1391 "INSERT INTO summary_batch_jobs \
1392 (id, status, processed_sessions, total_sessions, failed_sessions, message, started_at, finished_at, updated_at) \
1393 VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, ?7, datetime('now')) \
1394 ON CONFLICT(id) DO UPDATE SET \
1395 status=excluded.status, \
1396 processed_sessions=excluded.processed_sessions, \
1397 total_sessions=excluded.total_sessions, \
1398 failed_sessions=excluded.failed_sessions, \
1399 message=excluded.message, \
1400 started_at=excluded.started_at, \
1401 finished_at=excluded.finished_at, \
1402 updated_at=datetime('now')",
1403 params![
1404 payload.status,
1405 payload.processed_sessions as i64,
1406 payload.total_sessions as i64,
1407 payload.failed_sessions as i64,
1408 payload.message,
1409 payload.started_at,
1410 payload.finished_at,
1411 ],
1412 )?;
1413 Ok(())
1414 }
1415
1416 pub fn get_summary_batch_job(&self) -> Result<Option<SummaryBatchJobRow>> {
1417 let row = self
1418 .conn()
1419 .query_row(
1420 "SELECT status, processed_sessions, total_sessions, failed_sessions, message, started_at, finished_at \
1421 FROM summary_batch_jobs WHERE id = 1 LIMIT 1",
1422 [],
1423 |row| {
1424 Ok(SummaryBatchJobRow {
1425 status: row.get(0)?,
1426 processed_sessions: row.get::<_, i64>(1)?.max(0) as u32,
1427 total_sessions: row.get::<_, i64>(2)?.max(0) as u32,
1428 failed_sessions: row.get::<_, i64>(3)?.max(0) as u32,
1429 message: row.get(4)?,
1430 started_at: row.get(5)?,
1431 finished_at: row.get(6)?,
1432 })
1433 },
1434 )
1435 .optional()?;
1436 Ok(row)
1437 }
1438
1439 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
1442 let cursor = self
1443 .conn()
1444 .query_row(
1445 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
1446 params![team_id],
1447 |row| row.get(0),
1448 )
1449 .optional()?;
1450 Ok(cursor)
1451 }
1452
1453 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
1454 self.conn().execute(
1455 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
1456 VALUES (?1, ?2, datetime('now')) \
1457 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
1458 params![team_id, cursor],
1459 )?;
1460 Ok(())
1461 }
1462
1463 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
1467 let sql = format!(
1468 "SELECT {LOCAL_SESSION_COLUMNS} \
1469 FROM sessions s \
1470 INNER JOIN session_sync ss ON ss.session_id = s.id \
1471 LEFT JOIN users u ON u.id = s.user_id \
1472 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
1473 ORDER BY s.created_at ASC"
1474 );
1475 let conn = self.conn();
1476 let mut stmt = conn.prepare(&sql)?;
1477 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
1478 let mut result = Vec::new();
1479 for row in rows {
1480 result.push(row?);
1481 }
1482 Ok(result)
1483 }
1484
1485 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
1486 self.conn().execute(
1487 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
1488 WHERE session_id = ?1",
1489 params![session_id],
1490 )?;
1491 Ok(())
1492 }
1493
1494 pub fn was_uploaded_after(
1496 &self,
1497 source_path: &str,
1498 modified: &chrono::DateTime<chrono::Utc>,
1499 ) -> Result<bool> {
1500 let result: Option<String> = self
1501 .conn()
1502 .query_row(
1503 "SELECT last_synced_at FROM session_sync \
1504 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1505 params![source_path],
1506 |row| row.get(0),
1507 )
1508 .optional()?;
1509
1510 if let Some(synced_at) = result {
1511 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1512 return Ok(dt >= *modified);
1513 }
1514 }
1515 Ok(false)
1516 }
1517
1518 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1521 self.conn().execute(
1522 "INSERT INTO body_cache (session_id, body, cached_at) \
1523 VALUES (?1, ?2, datetime('now')) \
1524 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1525 params![session_id, body],
1526 )?;
1527 Ok(())
1528 }
1529
1530 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1531 let body = self
1532 .conn()
1533 .query_row(
1534 "SELECT body FROM body_cache WHERE session_id = ?1",
1535 params![session_id],
1536 |row| row.get(0),
1537 )
1538 .optional()?;
1539 Ok(body)
1540 }
1541
1542 pub fn find_active_session_for_repo(
1546 &self,
1547 repo_path: &str,
1548 since_minutes: u32,
1549 ) -> Result<Option<LocalSessionRow>> {
1550 let sql = format!(
1551 "SELECT {LOCAL_SESSION_COLUMNS} \
1552 {FROM_CLAUSE} \
1553 WHERE s.working_directory LIKE ?1 \
1554 AND COALESCE(s.is_auxiliary, 0) = 0 \
1555 AND s.created_at >= datetime('now', ?2) \
1556 ORDER BY s.created_at DESC LIMIT 1"
1557 );
1558 let since = format!("-{since_minutes} minutes");
1559 let like = format!("{repo_path}%");
1560 let conn = self.conn();
1561 let mut stmt = conn.prepare(&sql)?;
1562 let row = stmt
1563 .query_map(params![like, since], row_to_local_session)?
1564 .next()
1565 .transpose()?;
1566 Ok(row)
1567 }
1568
1569 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1571 let conn = self.conn();
1572 let mut stmt = conn
1573 .prepare("SELECT id FROM sessions")
1574 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1575 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1576 let mut set = std::collections::HashSet::new();
1577 if let Ok(rows) = rows {
1578 for row in rows.flatten() {
1579 set.insert(row);
1580 }
1581 }
1582 set
1583 }
1584
1585 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1587 let title = session.context.title.as_deref();
1588 let description = session.context.description.as_deref();
1589 let (files_modified, files_read, has_errors) =
1590 opensession_core::extract::extract_file_metadata(session);
1591 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1592 let is_auxiliary = is_auxiliary_session(session);
1593
1594 self.conn().execute(
1595 "UPDATE sessions SET \
1596 title=?2, description=?3, \
1597 message_count=?4, user_message_count=?5, task_count=?6, \
1598 event_count=?7, duration_seconds=?8, \
1599 total_input_tokens=?9, total_output_tokens=?10, \
1600 files_modified=?11, files_read=?12, has_errors=?13, \
1601 max_active_agents=?14, is_auxiliary=?15 \
1602 WHERE id=?1",
1603 params![
1604 &session.session_id,
1605 title,
1606 description,
1607 session.stats.message_count as i64,
1608 session.stats.user_message_count as i64,
1609 session.stats.task_count as i64,
1610 session.stats.event_count as i64,
1611 session.stats.duration_seconds as i64,
1612 session.stats.total_input_tokens as i64,
1613 session.stats.total_output_tokens as i64,
1614 &files_modified,
1615 &files_read,
1616 has_errors,
1617 max_active_agents,
1618 is_auxiliary as i64,
1619 ],
1620 )?;
1621 Ok(())
1622 }
1623
1624 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1626 self.conn().execute(
1627 "INSERT INTO session_sync (session_id, source_path) \
1628 VALUES (?1, ?2) \
1629 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1630 params![session_id, source_path],
1631 )?;
1632 Ok(())
1633 }
1634
1635 pub fn list_repos(&self) -> Result<Vec<String>> {
1637 let conn = self.conn();
1638 let mut stmt = conn.prepare(
1639 "SELECT DISTINCT git_repo_name FROM sessions \
1640 WHERE git_repo_name IS NOT NULL AND COALESCE(is_auxiliary, 0) = 0 \
1641 ORDER BY git_repo_name ASC",
1642 )?;
1643 let rows = stmt.query_map([], |row| row.get(0))?;
1644 let mut result = Vec::new();
1645 for row in rows {
1646 result.push(row?);
1647 }
1648 Ok(result)
1649 }
1650
1651 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1653 let conn = self.conn();
1654 let mut stmt = conn.prepare(
1655 "SELECT DISTINCT working_directory FROM sessions \
1656 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1657 AND COALESCE(is_auxiliary, 0) = 0 \
1658 ORDER BY working_directory ASC",
1659 )?;
1660 let rows = stmt.query_map([], |row| row.get(0))?;
1661 let mut result = Vec::new();
1662 for row in rows {
1663 result.push(row?);
1664 }
1665 Ok(result)
1666 }
1667}
1668
1669fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1672 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1673 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1674
1675 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1677
1678 apply_local_migrations(&conn)?;
1679 repair_session_tools_from_source_path(&conn)?;
1680 repair_auxiliary_flags_from_source_path(&conn)?;
1681 validate_local_schema(&conn)?;
1682
1683 Ok(conn)
1684}
1685
1686fn apply_local_migrations(conn: &Connection) -> Result<()> {
1687 conn.execute_batch(
1688 "CREATE TABLE IF NOT EXISTS _migrations (
1689 id INTEGER PRIMARY KEY,
1690 name TEXT NOT NULL UNIQUE,
1691 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1692 );",
1693 )
1694 .context("create _migrations table for local db")?;
1695
1696 for (name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1697 let already_applied: bool = conn
1698 .query_row(
1699 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1700 [name],
1701 |row| row.get(0),
1702 )
1703 .unwrap_or(false);
1704
1705 if already_applied {
1706 continue;
1707 }
1708
1709 conn.execute_batch(sql)
1710 .with_context(|| format!("apply local migration {name}"))?;
1711
1712 conn.execute(
1713 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1714 [name],
1715 )
1716 .with_context(|| format!("record local migration {name}"))?;
1717 }
1718
1719 Ok(())
1720}
1721
1722fn validate_local_schema(conn: &Connection) -> Result<()> {
1723 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1724 conn.prepare(&sql)
1725 .map(|_| ())
1726 .context("validate local session schema")
1727}
1728
1729fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1730 let mut stmt = conn.prepare(
1731 "SELECT s.id, s.tool, ss.source_path \
1732 FROM sessions s \
1733 LEFT JOIN session_sync ss ON ss.session_id = s.id \
1734 WHERE ss.source_path IS NOT NULL",
1735 )?;
1736 let rows = stmt.query_map([], |row| {
1737 Ok((
1738 row.get::<_, String>(0)?,
1739 row.get::<_, String>(1)?,
1740 row.get::<_, Option<String>>(2)?,
1741 ))
1742 })?;
1743
1744 let mut updates: Vec<(String, String)> = Vec::new();
1745 for row in rows {
1746 let (id, current_tool, source_path) = row?;
1747 let normalized = normalize_tool_for_source_path(¤t_tool, source_path.as_deref());
1748 if normalized != current_tool {
1749 updates.push((id, normalized));
1750 }
1751 }
1752 drop(stmt);
1753
1754 for (id, tool) in updates {
1755 conn.execute(
1756 "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1757 params![tool, id],
1758 )?;
1759 }
1760
1761 Ok(())
1762}
1763
1764fn repair_auxiliary_flags_from_source_path(conn: &Connection) -> Result<()> {
1765 let mut stmt = conn.prepare(
1766 "SELECT s.id, ss.source_path \
1767 FROM sessions s \
1768 LEFT JOIN session_sync ss ON ss.session_id = s.id \
1769 WHERE ss.source_path IS NOT NULL \
1770 AND COALESCE(s.is_auxiliary, 0) = 0",
1771 )?;
1772 let rows = stmt.query_map([], |row| {
1773 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
1774 })?;
1775
1776 let mut updates: Vec<String> = Vec::new();
1777 for row in rows {
1778 let (id, source_path) = row?;
1779 let Some(source_path) = source_path else {
1780 continue;
1781 };
1782 if infer_tool_from_source_path(Some(&source_path)) != Some("codex") {
1783 continue;
1784 }
1785 if is_codex_auxiliary_source_file(&source_path) {
1786 updates.push(id);
1787 }
1788 }
1789 drop(stmt);
1790
1791 for id in updates {
1792 conn.execute(
1793 "UPDATE sessions SET is_auxiliary = 1 WHERE id = ?1",
1794 params![id],
1795 )?;
1796 }
1797
1798 Ok(())
1799}
1800
1801fn is_codex_auxiliary_source_file(source_path: &str) -> bool {
1802 let Ok(file) = fs::File::open(source_path) else {
1803 return false;
1804 };
1805 let reader = BufReader::new(file);
1806 for line in reader.lines().take(32) {
1807 let Ok(raw) = line else {
1808 continue;
1809 };
1810 let line = raw.trim();
1811 if line.is_empty() {
1812 continue;
1813 }
1814
1815 if line.contains("\"source\":{\"subagent\"")
1816 || line.contains("\"source\": {\"subagent\"")
1817 || line.contains("\"agent_role\":\"awaiter\"")
1818 || line.contains("\"agent_role\":\"worker\"")
1819 || line.contains("\"agent_role\":\"explorer\"")
1820 || line.contains("\"agent_role\":\"subagent\"")
1821 {
1822 return true;
1823 }
1824
1825 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
1826 let is_session_meta =
1827 parsed.get("type").and_then(|v| v.as_str()) == Some("session_meta");
1828 let payload = if is_session_meta {
1829 parsed.get("payload")
1830 } else {
1831 Some(&parsed)
1832 };
1833 if let Some(payload) = payload {
1834 if payload.pointer("/source/subagent").is_some() {
1835 return true;
1836 }
1837 let role = payload
1838 .get("agent_role")
1839 .and_then(|v| v.as_str())
1840 .map(str::to_ascii_lowercase);
1841 if matches!(
1842 role.as_deref(),
1843 Some("awaiter") | Some("worker") | Some("explorer") | Some("subagent")
1844 ) {
1845 return true;
1846 }
1847 }
1848 }
1849 }
1850 false
1851}
1852
1853pub const LOCAL_SESSION_COLUMNS: &str = "\
1855s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1856s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1857s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1858s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1859s.total_input_tokens, s.total_output_tokens, \
1860s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1861s.pr_number, s.pr_url, s.working_directory, \
1862s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1), COALESCE(s.is_auxiliary, 0)";
1863
1864fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1865 let source_path: Option<String> = row.get(1)?;
1866 let tool: String = row.get(7)?;
1867 let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1868
1869 Ok(LocalSessionRow {
1870 id: row.get(0)?,
1871 source_path,
1872 sync_status: row.get(2)?,
1873 last_synced_at: row.get(3)?,
1874 user_id: row.get(4)?,
1875 nickname: row.get(5)?,
1876 team_id: row.get(6)?,
1877 tool: normalized_tool,
1878 agent_provider: row.get(8)?,
1879 agent_model: row.get(9)?,
1880 title: row.get(10)?,
1881 description: row.get(11)?,
1882 tags: row.get(12)?,
1883 created_at: row.get(13)?,
1884 uploaded_at: row.get(14)?,
1885 message_count: row.get(15)?,
1886 user_message_count: row.get(16)?,
1887 task_count: row.get(17)?,
1888 event_count: row.get(18)?,
1889 duration_seconds: row.get(19)?,
1890 total_input_tokens: row.get(20)?,
1891 total_output_tokens: row.get(21)?,
1892 git_remote: row.get(22)?,
1893 git_branch: row.get(23)?,
1894 git_commit: row.get(24)?,
1895 git_repo_name: row.get(25)?,
1896 pr_number: row.get(26)?,
1897 pr_url: row.get(27)?,
1898 working_directory: row.get(28)?,
1899 files_modified: row.get(29)?,
1900 files_read: row.get(30)?,
1901 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1902 max_active_agents: row.get(32).unwrap_or(1),
1903 is_auxiliary: row.get::<_, i64>(33).unwrap_or(0) != 0,
1904 })
1905}
1906
1907fn default_db_path() -> Result<PathBuf> {
1908 let home = std::env::var("HOME")
1909 .or_else(|_| std::env::var("USERPROFILE"))
1910 .context("Could not determine home directory")?;
1911 Ok(PathBuf::from(home)
1912 .join(".local")
1913 .join("share")
1914 .join("opensession")
1915 .join("local.db"))
1916}
1917
1918#[cfg(test)]
1919mod tests {
1920 use super::*;
1921
1922 use std::fs::{create_dir_all, write};
1923 use tempfile::tempdir;
1924
1925 fn test_db() -> LocalDb {
1926 let dir = tempdir().unwrap();
1927 let path = dir.keep().join("test.db");
1928 LocalDb::open_path(&path).unwrap()
1929 }
1930
1931 #[test]
1932 fn test_open_and_schema() {
1933 let _db = test_db();
1934 }
1935
1936 #[test]
1937 fn test_open_repairs_codex_tool_hint_from_source_path() {
1938 let dir = tempfile::tempdir().unwrap();
1939 let path = dir.path().join("repair.db");
1940
1941 {
1942 let _ = LocalDb::open_path(&path).unwrap();
1943 }
1944
1945 {
1946 let conn = Connection::open(&path).unwrap();
1947 conn.execute(
1948 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1949 params!["rollout-repair", "2026-02-20T00:00:00Z"],
1950 )
1951 .unwrap();
1952 conn.execute(
1953 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1954 params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1955 )
1956 .unwrap();
1957 }
1958
1959 let db = LocalDb::open_path(&path).unwrap();
1960 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1961 let row = rows
1962 .iter()
1963 .find(|row| row.id == "rollout-repair")
1964 .expect("repaired row");
1965 assert_eq!(row.tool, "codex");
1966 }
1967
1968 #[test]
1969 fn test_open_repairs_codex_auxiliary_flag_from_source_path() {
1970 let dir = tempfile::tempdir().unwrap();
1971 let path = dir.path().join("repair-auxiliary.db");
1972 let codex_dir = dir
1973 .path()
1974 .join(".codex")
1975 .join("sessions")
1976 .join("2026")
1977 .join("02")
1978 .join("20");
1979 create_dir_all(&codex_dir).unwrap();
1980 let source_path = codex_dir.join("rollout-subagent.jsonl");
1981 write(
1982 &source_path,
1983 r#"{"timestamp":"2026-02-20T00:00:00.000Z","type":"session_meta","payload":{"id":"rollout-subagent","timestamp":"2026-02-20T00:00:00.000Z","cwd":"/tmp","originator":"Codex Desktop","cli_version":"0.105.0","source":{"subagent":{"thread_spawn":{"parent_thread_id":"parent-session-id","depth":1,"agent_role":"awaiter"}}},"agent_role":"awaiter"}}\n"#,
1984 )
1985 .unwrap();
1986
1987 {
1988 let _ = LocalDb::open_path(&path).unwrap();
1989 }
1990
1991 {
1992 let conn = Connection::open(&path).unwrap();
1993 conn.execute(
1994 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key, is_auxiliary) VALUES (?1, 'personal', 'codex', ?2, '', 0)",
1995 params!["rollout-subagent", "2026-02-20T00:00:00Z"],
1996 )
1997 .unwrap();
1998 conn.execute(
1999 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2000 params!["rollout-subagent", source_path.to_string_lossy().to_string()],
2001 )
2002 .unwrap();
2003 }
2004
2005 let db = LocalDb::open_path(&path).unwrap();
2006 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2007 assert!(
2008 rows.iter().all(|row| row.id != "rollout-subagent"),
2009 "auxiliary codex session should be hidden after repair"
2010 );
2011 }
2012
2013 #[test]
2014 fn test_open_repairs_codex_auxiliary_flag_when_session_meta_is_not_first_line() {
2015 let dir = tempfile::tempdir().unwrap();
2016 let path = dir.path().join("repair-auxiliary-shifted.db");
2017 let codex_dir = dir
2018 .path()
2019 .join(".codex")
2020 .join("sessions")
2021 .join("2026")
2022 .join("03")
2023 .join("03");
2024 create_dir_all(&codex_dir).unwrap();
2025 let source_path = codex_dir.join("rollout-subagent-shifted.jsonl");
2026 write(
2027 &source_path,
2028 [
2029 r#"{"timestamp":"2026-03-03T00:00:00.010Z","type":"event_msg","payload":{"type":"agent_message","message":"bootstrap line"}}"#,
2030 r#"{"timestamp":"2026-03-03T00:00:00.020Z","type":"session_meta","payload":{"id":"rollout-subagent-shifted","timestamp":"2026-03-03T00:00:00.000Z","cwd":"/tmp","originator":"Codex Desktop","cli_version":"0.108.0","source":{"subagent":{"thread_spawn":{"parent_thread_id":"parent-session-id","depth":1,"agent_role":"worker"}}},"agent_role":"worker"}}"#,
2031 ]
2032 .join("\n"),
2033 )
2034 .unwrap();
2035
2036 {
2037 let _ = LocalDb::open_path(&path).unwrap();
2038 }
2039
2040 {
2041 let conn = Connection::open(&path).unwrap();
2042 conn.execute(
2043 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key, is_auxiliary) VALUES (?1, 'personal', 'codex', ?2, '', 0)",
2044 params!["rollout-subagent-shifted", "2026-03-03T00:00:00Z"],
2045 )
2046 .unwrap();
2047 conn.execute(
2048 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2049 params!["rollout-subagent-shifted", source_path.to_string_lossy().to_string()],
2050 )
2051 .unwrap();
2052 }
2053
2054 let db = LocalDb::open_path(&path).unwrap();
2055 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2056 assert!(
2057 rows.iter().all(|row| row.id != "rollout-subagent-shifted"),
2058 "auxiliary codex session should be hidden after repair even if session_meta is not the first line"
2059 );
2060 }
2061
2062 #[test]
2063 fn test_upsert_local_session_normalizes_tool_from_source_path() {
2064 let db = test_db();
2065 let mut session = Session::new(
2066 "rollout-upsert".to_string(),
2067 opensession_core::trace::Agent {
2068 provider: "openai".to_string(),
2069 model: "gpt-5".to_string(),
2070 tool: "claude-code".to_string(),
2071 tool_version: None,
2072 },
2073 );
2074 session.stats.event_count = 1;
2075
2076 db.upsert_local_session(
2077 &session,
2078 "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
2079 &crate::git::GitContext::default(),
2080 )
2081 .unwrap();
2082
2083 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2084 let row = rows
2085 .iter()
2086 .find(|row| row.id == "rollout-upsert")
2087 .expect("upserted row");
2088 assert_eq!(row.tool, "codex");
2089 }
2090
2091 #[test]
2092 fn test_upsert_local_session_preserves_existing_git_when_session_has_no_git_metadata() {
2093 let db = test_db();
2094 let mut session = Session::new(
2095 "preserve-git".to_string(),
2096 opensession_core::trace::Agent {
2097 provider: "openai".to_string(),
2098 model: "gpt-5".to_string(),
2099 tool: "codex".to_string(),
2100 tool_version: None,
2101 },
2102 );
2103 session.stats.event_count = 1;
2104
2105 let first_git = crate::git::GitContext {
2106 remote: Some("https://github.com/acme/repo.git".to_string()),
2107 branch: Some("feature/original".to_string()),
2108 commit: Some("1111111".to_string()),
2109 repo_name: Some("acme/repo".to_string()),
2110 };
2111 db.upsert_local_session(
2112 &session,
2113 "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
2114 &first_git,
2115 )
2116 .unwrap();
2117
2118 let second_git = crate::git::GitContext {
2119 remote: Some("https://github.com/acme/repo.git".to_string()),
2120 branch: Some("feature/current-head".to_string()),
2121 commit: Some("2222222".to_string()),
2122 repo_name: Some("acme/repo".to_string()),
2123 };
2124 db.upsert_local_session(
2125 &session,
2126 "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
2127 &second_git,
2128 )
2129 .unwrap();
2130
2131 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2132 let row = rows
2133 .iter()
2134 .find(|row| row.id == "preserve-git")
2135 .expect("row exists");
2136 assert_eq!(row.git_branch.as_deref(), Some("feature/original"));
2137 assert_eq!(row.git_commit.as_deref(), Some("1111111"));
2138 }
2139
2140 #[test]
2141 fn test_upsert_local_session_prefers_git_branch_from_session_attributes() {
2142 let db = test_db();
2143 let mut session = Session::new(
2144 "session-git-branch".to_string(),
2145 opensession_core::trace::Agent {
2146 provider: "anthropic".to_string(),
2147 model: "claude-opus-4-6".to_string(),
2148 tool: "claude-code".to_string(),
2149 tool_version: None,
2150 },
2151 );
2152 session.stats.event_count = 1;
2153 session.context.attributes.insert(
2154 "git_branch".to_string(),
2155 serde_json::Value::String("from-session".to_string()),
2156 );
2157
2158 let fallback_git = crate::git::GitContext {
2159 remote: Some("https://github.com/acme/repo.git".to_string()),
2160 branch: Some("fallback-branch".to_string()),
2161 commit: Some("aaaaaaaa".to_string()),
2162 repo_name: Some("acme/repo".to_string()),
2163 };
2164 db.upsert_local_session(
2165 &session,
2166 "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
2167 &fallback_git,
2168 )
2169 .unwrap();
2170
2171 session.context.attributes.insert(
2172 "git_branch".to_string(),
2173 serde_json::Value::String("from-session-updated".to_string()),
2174 );
2175 db.upsert_local_session(
2176 &session,
2177 "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
2178 &fallback_git,
2179 )
2180 .unwrap();
2181
2182 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2183 let row = rows
2184 .iter()
2185 .find(|row| row.id == "session-git-branch")
2186 .expect("row exists");
2187 assert_eq!(row.git_branch.as_deref(), Some("from-session-updated"));
2188 }
2189
2190 #[test]
2191 fn test_upsert_local_session_marks_parented_sessions_auxiliary() {
2192 let db = test_db();
2193 let mut session = Session::new(
2194 "aux-upsert".to_string(),
2195 opensession_core::trace::Agent {
2196 provider: "openai".to_string(),
2197 model: "gpt-5".to_string(),
2198 tool: "opencode".to_string(),
2199 tool_version: None,
2200 },
2201 );
2202 session.stats.event_count = 1;
2203 session.context.attributes.insert(
2204 opensession_core::session::ATTR_PARENT_SESSION_ID.to_string(),
2205 serde_json::Value::String("parent-session".to_string()),
2206 );
2207
2208 db.upsert_local_session(
2209 &session,
2210 "/Users/test/.opencode/storage/session/project/aux-upsert.json",
2211 &crate::git::GitContext::default(),
2212 )
2213 .unwrap();
2214
2215 let is_auxiliary: i64 = db
2216 .conn()
2217 .query_row(
2218 "SELECT is_auxiliary FROM sessions WHERE id = ?1",
2219 params!["aux-upsert"],
2220 |row| row.get(0),
2221 )
2222 .unwrap();
2223 assert_eq!(is_auxiliary, 1);
2224
2225 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2226 assert!(
2227 rows.iter().all(|row| row.id != "aux-upsert"),
2228 "auxiliary sessions should be hidden from default listing"
2229 );
2230 }
2231
2232 #[test]
2233 fn test_upsert_local_session_primary_role_overrides_parent_link() {
2234 let db = test_db();
2235 let mut session = Session::new(
2236 "primary-override".to_string(),
2237 opensession_core::trace::Agent {
2238 provider: "openai".to_string(),
2239 model: "gpt-5".to_string(),
2240 tool: "opencode".to_string(),
2241 tool_version: None,
2242 },
2243 );
2244 session.stats.event_count = 1;
2245 session.context.attributes.insert(
2246 opensession_core::session::ATTR_PARENT_SESSION_ID.to_string(),
2247 serde_json::Value::String("parent-session".to_string()),
2248 );
2249 session.context.attributes.insert(
2250 opensession_core::session::ATTR_SESSION_ROLE.to_string(),
2251 serde_json::Value::String("primary".to_string()),
2252 );
2253
2254 db.upsert_local_session(
2255 &session,
2256 "/Users/test/.opencode/storage/session/project/primary-override.json",
2257 &crate::git::GitContext::default(),
2258 )
2259 .unwrap();
2260
2261 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2262 let row = rows
2263 .iter()
2264 .find(|row| row.id == "primary-override")
2265 .expect("session with explicit primary role should stay visible");
2266 assert!(!row.is_auxiliary);
2267 }
2268
2269 #[test]
2270 fn test_upsert_local_session_skips_empty_signal_rows() {
2271 let db = test_db();
2272 let session = Session::new(
2273 "empty-signal-local".to_string(),
2274 opensession_core::trace::Agent {
2275 provider: "sourcegraph".to_string(),
2276 model: "amp-model".to_string(),
2277 tool: "amp".to_string(),
2278 tool_version: None,
2279 },
2280 );
2281
2282 db.upsert_local_session(
2283 &session,
2284 "/Users/test/.local/share/amp/threads/T-empty-signal-local.json",
2285 &crate::git::GitContext::default(),
2286 )
2287 .expect("upsert empty-signal session should not fail");
2288
2289 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2290 assert!(
2291 rows.iter().all(|row| row.id != "empty-signal-local"),
2292 "empty-signal local sessions must not be listed",
2293 );
2294 }
2295
2296 #[test]
2297 fn test_upsert_local_session_empty_signal_deletes_existing_row() {
2298 let db = test_db();
2299 let mut populated = Session::new(
2300 "empty-signal-replace".to_string(),
2301 opensession_core::trace::Agent {
2302 provider: "sourcegraph".to_string(),
2303 model: "amp-model".to_string(),
2304 tool: "amp".to_string(),
2305 tool_version: None,
2306 },
2307 );
2308 populated.stats.event_count = 2;
2309 populated.stats.message_count = 1;
2310 populated.stats.user_message_count = 1;
2311
2312 db.upsert_local_session(
2313 &populated,
2314 "/Users/test/.local/share/amp/threads/T-empty-signal-replace.json",
2315 &crate::git::GitContext::default(),
2316 )
2317 .expect("seed populated row");
2318 assert!(db
2319 .get_session_by_id("empty-signal-replace")
2320 .unwrap()
2321 .is_some());
2322
2323 let empty = Session::new(
2324 "empty-signal-replace".to_string(),
2325 opensession_core::trace::Agent {
2326 provider: "sourcegraph".to_string(),
2327 model: "amp-model".to_string(),
2328 tool: "amp".to_string(),
2329 tool_version: None,
2330 },
2331 );
2332 db.upsert_local_session(
2333 &empty,
2334 "/Users/test/.local/share/amp/threads/T-empty-signal-replace.json",
2335 &crate::git::GitContext::default(),
2336 )
2337 .expect("upsert empty-signal replacement");
2338
2339 assert!(
2340 db.get_session_by_id("empty-signal-replace")
2341 .unwrap()
2342 .is_none(),
2343 "existing local row must be removed when source becomes empty-signal",
2344 );
2345 }
2346
2347 #[test]
2348 fn test_list_sessions_hides_codex_summary_worker_titles() {
2349 let db = test_db();
2350 let mut codex_summary_worker = Session::new(
2351 "codex-summary-worker".to_string(),
2352 opensession_core::trace::Agent {
2353 provider: "openai".to_string(),
2354 model: "gpt-5".to_string(),
2355 tool: "codex".to_string(),
2356 tool_version: None,
2357 },
2358 );
2359 codex_summary_worker.context.title = Some(
2360 "Convert a real coding session into semantic compression. Pipeline: ...".to_string(),
2361 );
2362 codex_summary_worker.stats.event_count = 2;
2363 codex_summary_worker.stats.message_count = 1;
2364
2365 db.upsert_local_session(
2366 &codex_summary_worker,
2367 "/Users/test/.codex/sessions/2026/03/05/summary-worker.jsonl",
2368 &crate::git::GitContext::default(),
2369 )
2370 .expect("upsert codex summary worker session");
2371
2372 let mut non_codex_same_title = Session::new(
2373 "claude-similar-title".to_string(),
2374 opensession_core::trace::Agent {
2375 provider: "anthropic".to_string(),
2376 model: "claude-opus-4-6".to_string(),
2377 tool: "claude-code".to_string(),
2378 tool_version: None,
2379 },
2380 );
2381 non_codex_same_title.context.title = Some(
2382 "Convert a real coding session into semantic compression. Pipeline: ...".to_string(),
2383 );
2384 non_codex_same_title.stats.event_count = 2;
2385 non_codex_same_title.stats.message_count = 1;
2386
2387 db.upsert_local_session(
2388 &non_codex_same_title,
2389 "/Users/test/.claude/projects/p1/claude-similar-title.jsonl",
2390 &crate::git::GitContext::default(),
2391 )
2392 .expect("upsert non-codex session");
2393
2394 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2395 assert!(
2396 rows.iter().all(|row| row.id != "codex-summary-worker"),
2397 "codex summary worker sessions should be hidden from default listing"
2398 );
2399 assert!(
2400 rows.iter().any(|row| row.id == "claude-similar-title"),
2401 "non-codex sessions must remain visible even with similar title"
2402 );
2403
2404 let count = db
2405 .count_sessions_filtered(&LocalSessionFilter::default())
2406 .unwrap();
2407 assert_eq!(count, 1);
2408 }
2409
2410 #[test]
2411 fn test_sync_cursor() {
2412 let db = test_db();
2413 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
2414 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
2415 assert_eq!(
2416 db.get_sync_cursor("team1").unwrap(),
2417 Some("2024-01-01T00:00:00Z".to_string())
2418 );
2419 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
2421 assert_eq!(
2422 db.get_sync_cursor("team1").unwrap(),
2423 Some("2024-06-01T00:00:00Z".to_string())
2424 );
2425 }
2426
2427 #[test]
2428 fn test_list_session_source_paths_returns_non_empty_paths_only() {
2429 let db = test_db();
2430 let mut s1 = Session::new(
2431 "source-path-1".to_string(),
2432 opensession_core::trace::Agent {
2433 provider: "openai".to_string(),
2434 model: "gpt-5".to_string(),
2435 tool: "codex".to_string(),
2436 tool_version: None,
2437 },
2438 );
2439 s1.stats.event_count = 1;
2440 db.upsert_local_session(
2441 &s1,
2442 "/tmp/source-path-1.jsonl",
2443 &crate::git::GitContext::default(),
2444 )
2445 .expect("upsert first session");
2446
2447 let mut s2 = Session::new(
2448 "source-path-2".to_string(),
2449 opensession_core::trace::Agent {
2450 provider: "openai".to_string(),
2451 model: "gpt-5".to_string(),
2452 tool: "codex".to_string(),
2453 tool_version: None,
2454 },
2455 );
2456 s2.stats.event_count = 1;
2457 db.upsert_local_session(&s2, "", &crate::git::GitContext::default())
2458 .expect("upsert second session");
2459
2460 let paths = db
2461 .list_session_source_paths()
2462 .expect("list source paths should work");
2463 assert!(paths
2464 .iter()
2465 .any(|(id, path)| id == "source-path-1" && path == "/tmp/source-path-1.jsonl"));
2466 assert!(paths.iter().all(|(id, _)| id != "source-path-2"));
2467 }
2468
2469 #[test]
2470 fn test_body_cache() {
2471 let db = test_db();
2472 assert_eq!(db.get_cached_body("s1").unwrap(), None);
2473 db.cache_body("s1", b"hello world").unwrap();
2474 assert_eq!(
2475 db.get_cached_body("s1").unwrap(),
2476 Some(b"hello world".to_vec())
2477 );
2478 }
2479
2480 #[test]
2481 fn test_get_session_by_id_and_list_session_links() {
2482 let db = test_db();
2483 db.upsert_remote_session(&make_summary(
2484 "parent-session",
2485 "codex",
2486 "Parent session",
2487 "2024-01-01T00:00:00Z",
2488 ))
2489 .unwrap();
2490 db.upsert_remote_session(&make_summary(
2491 "child-session",
2492 "codex",
2493 "Child session",
2494 "2024-01-01T01:00:00Z",
2495 ))
2496 .unwrap();
2497
2498 db.conn()
2499 .execute(
2500 "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) VALUES (?1, ?2, ?3, ?4)",
2501 params!["parent-session", "child-session", "handoff", "2024-01-01T01:00:00Z"],
2502 )
2503 .unwrap();
2504
2505 let parent = db
2506 .get_session_by_id("parent-session")
2507 .unwrap()
2508 .expect("session should exist");
2509 assert_eq!(parent.id, "parent-session");
2510 assert_eq!(parent.title.as_deref(), Some("Parent session"));
2511
2512 let links = db.list_session_links("parent-session").unwrap();
2513 assert_eq!(links.len(), 1);
2514 assert_eq!(links[0].session_id, "parent-session");
2515 assert_eq!(links[0].linked_session_id, "child-session");
2516 assert_eq!(links[0].link_type, "handoff");
2517 }
2518
2519 #[test]
2520 fn test_local_migrations_are_loaded_from_api_crate() {
2521 let migration_names: Vec<&str> = super::LOCAL_MIGRATIONS
2522 .iter()
2523 .map(|(name, _)| *name)
2524 .collect();
2525 assert!(
2526 migration_names.contains(&"local_0001_schema"),
2527 "expected local_0001_schema migration from opensession-api"
2528 );
2529 assert!(
2530 migration_names.contains(&"local_0002_session_summaries"),
2531 "expected local_0002_session_summaries migration from opensession-api"
2532 );
2533 assert!(
2534 migration_names.contains(&"local_0003_vector_index"),
2535 "expected local_0003_vector_index migration from opensession-api"
2536 );
2537 assert!(
2538 migration_names.contains(&"local_0004_summary_batch_status"),
2539 "expected local_0004_summary_batch_status migration from opensession-api"
2540 );
2541 assert_eq!(
2542 migration_names.len(),
2543 4,
2544 "local schema should include baseline + summary cache + vector index + summary batch status steps"
2545 );
2546
2547 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2548 let migrations_dir = manifest_dir.join("migrations");
2549 if migrations_dir.exists() {
2550 let sql_files = std::fs::read_dir(migrations_dir)
2551 .expect("read local-db migrations directory")
2552 .filter_map(Result::ok)
2553 .map(|entry| entry.file_name().to_string_lossy().to_string())
2554 .filter(|name| name.ends_with(".sql"))
2555 .collect::<Vec<_>>();
2556 assert!(
2557 sql_files.is_empty(),
2558 "local-db must not ship duplicated migration SQL files"
2559 );
2560 }
2561 }
2562
2563 #[test]
2564 fn test_local_schema_bootstrap_includes_is_auxiliary_column() {
2565 let dir = tempdir().unwrap();
2566 let path = dir.path().join("local.db");
2567 let db = LocalDb::open_path(&path).unwrap();
2568 let conn = db.conn();
2569 let mut stmt = conn.prepare("PRAGMA table_info(sessions)").unwrap();
2570 let columns = stmt
2571 .query_map([], |row| row.get::<_, String>(1))
2572 .unwrap()
2573 .collect::<std::result::Result<Vec<_>, _>>()
2574 .unwrap();
2575 assert!(
2576 columns.iter().any(|name| name == "is_auxiliary"),
2577 "sessions schema must include is_auxiliary column in bootstrap migration"
2578 );
2579 }
2580
2581 #[test]
2582 fn test_upsert_remote_session() {
2583 let db = test_db();
2584 let summary = RemoteSessionSummary {
2585 id: "remote-1".to_string(),
2586 user_id: Some("u1".to_string()),
2587 nickname: Some("alice".to_string()),
2588 team_id: "t1".to_string(),
2589 tool: "claude-code".to_string(),
2590 agent_provider: None,
2591 agent_model: None,
2592 title: Some("Test session".to_string()),
2593 description: None,
2594 tags: None,
2595 created_at: "2024-01-01T00:00:00Z".to_string(),
2596 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2597 message_count: 10,
2598 task_count: 2,
2599 event_count: 20,
2600 duration_seconds: 300,
2601 total_input_tokens: 1000,
2602 total_output_tokens: 500,
2603 git_remote: None,
2604 git_branch: None,
2605 git_commit: None,
2606 git_repo_name: None,
2607 pr_number: None,
2608 pr_url: None,
2609 working_directory: None,
2610 files_modified: None,
2611 files_read: None,
2612 has_errors: false,
2613 max_active_agents: 1,
2614 };
2615 db.upsert_remote_session(&summary).unwrap();
2616
2617 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2618 assert_eq!(sessions.len(), 1);
2619 assert_eq!(sessions[0].id, "remote-1");
2620 assert_eq!(sessions[0].sync_status, "remote_only");
2621 assert_eq!(sessions[0].nickname, None); assert!(!sessions[0].is_auxiliary);
2623 }
2624
2625 #[test]
2626 fn test_list_filter_by_repo() {
2627 let db = test_db();
2628 let summary1 = RemoteSessionSummary {
2630 id: "s1".to_string(),
2631 user_id: None,
2632 nickname: None,
2633 team_id: "t1".to_string(),
2634 tool: "claude-code".to_string(),
2635 agent_provider: None,
2636 agent_model: None,
2637 title: Some("Session 1".to_string()),
2638 description: None,
2639 tags: None,
2640 created_at: "2024-01-01T00:00:00Z".to_string(),
2641 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2642 message_count: 5,
2643 task_count: 0,
2644 event_count: 10,
2645 duration_seconds: 60,
2646 total_input_tokens: 100,
2647 total_output_tokens: 50,
2648 git_remote: None,
2649 git_branch: None,
2650 git_commit: None,
2651 git_repo_name: None,
2652 pr_number: None,
2653 pr_url: None,
2654 working_directory: None,
2655 files_modified: None,
2656 files_read: None,
2657 has_errors: false,
2658 max_active_agents: 1,
2659 };
2660 db.upsert_remote_session(&summary1).unwrap();
2661
2662 let filter = LocalSessionFilter {
2664 team_id: Some("t1".to_string()),
2665 ..Default::default()
2666 };
2667 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2668
2669 let filter = LocalSessionFilter {
2670 team_id: Some("t999".to_string()),
2671 ..Default::default()
2672 };
2673 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2674 }
2675
2676 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2679 RemoteSessionSummary {
2680 id: id.to_string(),
2681 user_id: None,
2682 nickname: None,
2683 team_id: "t1".to_string(),
2684 tool: tool.to_string(),
2685 agent_provider: Some("anthropic".to_string()),
2686 agent_model: Some("claude-opus-4-6".to_string()),
2687 title: Some(title.to_string()),
2688 description: None,
2689 tags: None,
2690 created_at: created_at.to_string(),
2691 uploaded_at: created_at.to_string(),
2692 message_count: 5,
2693 task_count: 1,
2694 event_count: 10,
2695 duration_seconds: 300,
2696 total_input_tokens: 1000,
2697 total_output_tokens: 500,
2698 git_remote: None,
2699 git_branch: None,
2700 git_commit: None,
2701 git_repo_name: None,
2702 pr_number: None,
2703 pr_url: None,
2704 working_directory: None,
2705 files_modified: None,
2706 files_read: None,
2707 has_errors: false,
2708 max_active_agents: 1,
2709 }
2710 }
2711
2712 fn seed_sessions(db: &LocalDb) {
2713 db.upsert_remote_session(&make_summary(
2715 "s1",
2716 "claude-code",
2717 "First session",
2718 "2024-01-01T00:00:00Z",
2719 ))
2720 .unwrap();
2721 db.upsert_remote_session(&make_summary(
2722 "s2",
2723 "claude-code",
2724 "JWT auth work",
2725 "2024-01-02T00:00:00Z",
2726 ))
2727 .unwrap();
2728 db.upsert_remote_session(&make_summary(
2729 "s3",
2730 "gemini",
2731 "Gemini test",
2732 "2024-01-03T00:00:00Z",
2733 ))
2734 .unwrap();
2735 db.upsert_remote_session(&make_summary(
2736 "s4",
2737 "claude-code",
2738 "Error handling",
2739 "2024-01-04T00:00:00Z",
2740 ))
2741 .unwrap();
2742 db.upsert_remote_session(&make_summary(
2743 "s5",
2744 "claude-code",
2745 "Final polish",
2746 "2024-01-05T00:00:00Z",
2747 ))
2748 .unwrap();
2749 }
2750
2751 #[test]
2754 fn test_log_no_filters() {
2755 let db = test_db();
2756 seed_sessions(&db);
2757 let filter = LogFilter::default();
2758 let results = db.list_sessions_log(&filter).unwrap();
2759 assert_eq!(results.len(), 5);
2760 assert_eq!(results[0].id, "s5");
2762 assert_eq!(results[4].id, "s1");
2763 }
2764
2765 #[test]
2766 fn test_log_filter_by_tool() {
2767 let db = test_db();
2768 seed_sessions(&db);
2769 let filter = LogFilter {
2770 tool: Some("claude-code".to_string()),
2771 ..Default::default()
2772 };
2773 let results = db.list_sessions_log(&filter).unwrap();
2774 assert_eq!(results.len(), 4);
2775 assert!(results.iter().all(|s| s.tool == "claude-code"));
2776 }
2777
2778 #[test]
2779 fn test_log_filter_by_model_wildcard() {
2780 let db = test_db();
2781 seed_sessions(&db);
2782 let filter = LogFilter {
2783 model: Some("claude*".to_string()),
2784 ..Default::default()
2785 };
2786 let results = db.list_sessions_log(&filter).unwrap();
2787 assert_eq!(results.len(), 5); }
2789
2790 #[test]
2791 fn test_log_filter_since() {
2792 let db = test_db();
2793 seed_sessions(&db);
2794 let filter = LogFilter {
2795 since: Some("2024-01-03T00:00:00Z".to_string()),
2796 ..Default::default()
2797 };
2798 let results = db.list_sessions_log(&filter).unwrap();
2799 assert_eq!(results.len(), 3); }
2801
2802 #[test]
2803 fn test_log_filter_before() {
2804 let db = test_db();
2805 seed_sessions(&db);
2806 let filter = LogFilter {
2807 before: Some("2024-01-03T00:00:00Z".to_string()),
2808 ..Default::default()
2809 };
2810 let results = db.list_sessions_log(&filter).unwrap();
2811 assert_eq!(results.len(), 2); }
2813
2814 #[test]
2815 fn test_log_filter_since_and_before() {
2816 let db = test_db();
2817 seed_sessions(&db);
2818 let filter = LogFilter {
2819 since: Some("2024-01-02T00:00:00Z".to_string()),
2820 before: Some("2024-01-04T00:00:00Z".to_string()),
2821 ..Default::default()
2822 };
2823 let results = db.list_sessions_log(&filter).unwrap();
2824 assert_eq!(results.len(), 2); }
2826
2827 #[test]
2828 fn test_log_filter_grep() {
2829 let db = test_db();
2830 seed_sessions(&db);
2831 let filter = LogFilter {
2832 grep: Some("JWT".to_string()),
2833 ..Default::default()
2834 };
2835 let results = db.list_sessions_log(&filter).unwrap();
2836 assert_eq!(results.len(), 1);
2837 assert_eq!(results[0].id, "s2");
2838 }
2839
2840 #[test]
2841 fn test_log_limit_and_offset() {
2842 let db = test_db();
2843 seed_sessions(&db);
2844 let filter = LogFilter {
2845 limit: Some(2),
2846 offset: Some(1),
2847 ..Default::default()
2848 };
2849 let results = db.list_sessions_log(&filter).unwrap();
2850 assert_eq!(results.len(), 2);
2851 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2853 }
2854
2855 #[test]
2856 fn test_log_limit_only() {
2857 let db = test_db();
2858 seed_sessions(&db);
2859 let filter = LogFilter {
2860 limit: Some(3),
2861 ..Default::default()
2862 };
2863 let results = db.list_sessions_log(&filter).unwrap();
2864 assert_eq!(results.len(), 3);
2865 }
2866
2867 #[test]
2868 fn test_list_sessions_limit_offset() {
2869 let db = test_db();
2870 seed_sessions(&db);
2871 let filter = LocalSessionFilter {
2872 limit: Some(2),
2873 offset: Some(1),
2874 ..Default::default()
2875 };
2876 let results = db.list_sessions(&filter).unwrap();
2877 assert_eq!(results.len(), 2);
2878 assert_eq!(results[0].id, "s4");
2879 assert_eq!(results[1].id, "s3");
2880 }
2881
2882 #[test]
2883 fn test_count_sessions_filtered() {
2884 let db = test_db();
2885 seed_sessions(&db);
2886 let count = db
2887 .count_sessions_filtered(&LocalSessionFilter::default())
2888 .unwrap();
2889 assert_eq!(count, 5);
2890 }
2891
2892 #[test]
2893 fn test_list_and_count_filters_match_when_auxiliary_rows_exist() {
2894 let db = test_db();
2895 seed_sessions(&db);
2896 db.conn()
2897 .execute(
2898 "UPDATE sessions SET is_auxiliary = 1 WHERE id IN ('s2', 's3')",
2899 [],
2900 )
2901 .unwrap();
2902
2903 let default_filter = LocalSessionFilter::default();
2904 let rows = db.list_sessions(&default_filter).unwrap();
2905 let count = db.count_sessions_filtered(&default_filter).unwrap();
2906 assert_eq!(rows.len() as i64, count);
2907 assert!(rows.iter().all(|row| !row.is_auxiliary));
2908
2909 let gemini_filter = LocalSessionFilter {
2910 tool: Some("gemini".to_string()),
2911 ..Default::default()
2912 };
2913 let gemini_rows = db.list_sessions(&gemini_filter).unwrap();
2914 let gemini_count = db.count_sessions_filtered(&gemini_filter).unwrap();
2915 assert_eq!(gemini_rows.len() as i64, gemini_count);
2916 assert!(gemini_rows.is_empty());
2917 assert_eq!(gemini_count, 0);
2918 }
2919
2920 #[test]
2921 fn test_exclude_low_signal_filter_hides_metadata_only_sessions() {
2922 let db = test_db();
2923
2924 let mut low_signal = make_summary("meta-only", "claude-code", "", "2024-01-01T00:00:00Z");
2925 low_signal.title = None;
2926 low_signal.message_count = 0;
2927 low_signal.task_count = 0;
2928 low_signal.event_count = 2;
2929 low_signal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2930
2931 let mut normal = make_summary(
2932 "real-work",
2933 "opencode",
2934 "Socket.IO decision",
2935 "2024-01-02T00:00:00Z",
2936 );
2937 normal.message_count = 14;
2938 normal.task_count = 2;
2939 normal.event_count = 38;
2940 normal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2941
2942 db.upsert_remote_session(&low_signal).unwrap();
2943 db.upsert_remote_session(&normal).unwrap();
2944
2945 let default_filter = LocalSessionFilter {
2946 git_repo_name: Some("frontend/aviss-react-front".to_string()),
2947 ..Default::default()
2948 };
2949 assert_eq!(db.list_sessions(&default_filter).unwrap().len(), 2);
2950 assert_eq!(db.count_sessions_filtered(&default_filter).unwrap(), 2);
2951
2952 let repo_filter = LocalSessionFilter {
2953 git_repo_name: Some("frontend/aviss-react-front".to_string()),
2954 exclude_low_signal: true,
2955 ..Default::default()
2956 };
2957 let rows = db.list_sessions(&repo_filter).unwrap();
2958 assert_eq!(rows.len(), 1);
2959 assert_eq!(rows[0].id, "real-work");
2960 assert_eq!(db.count_sessions_filtered(&repo_filter).unwrap(), 1);
2961 }
2962
2963 #[test]
2964 fn test_list_working_directories_distinct_non_empty() {
2965 let db = test_db();
2966
2967 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2968 a.working_directory = Some("/tmp/repo-a".to_string());
2969 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2970 b.working_directory = Some("/tmp/repo-a".to_string());
2971 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2972 c.working_directory = Some("/tmp/repo-b".to_string());
2973 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2974 d.working_directory = Some("".to_string());
2975
2976 db.upsert_remote_session(&a).unwrap();
2977 db.upsert_remote_session(&b).unwrap();
2978 db.upsert_remote_session(&c).unwrap();
2979 db.upsert_remote_session(&d).unwrap();
2980
2981 let dirs = db.list_working_directories().unwrap();
2982 assert_eq!(
2983 dirs,
2984 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2985 );
2986 }
2987
2988 #[test]
2989 fn test_list_session_tools() {
2990 let db = test_db();
2991 seed_sessions(&db);
2992 let tools = db
2993 .list_session_tools(&LocalSessionFilter::default())
2994 .unwrap();
2995 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2996 }
2997
2998 #[test]
2999 fn test_log_combined_filters() {
3000 let db = test_db();
3001 seed_sessions(&db);
3002 let filter = LogFilter {
3003 tool: Some("claude-code".to_string()),
3004 since: Some("2024-01-03T00:00:00Z".to_string()),
3005 limit: Some(1),
3006 ..Default::default()
3007 };
3008 let results = db.list_sessions_log(&filter).unwrap();
3009 assert_eq!(results.len(), 1);
3010 assert_eq!(results[0].id, "s5"); }
3012
3013 #[test]
3016 fn test_get_session_by_offset() {
3017 let db = test_db();
3018 seed_sessions(&db);
3019 let row = db.get_session_by_offset(0).unwrap().unwrap();
3020 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
3022 assert_eq!(row.id, "s3");
3023 assert!(db.get_session_by_offset(10).unwrap().is_none());
3024 }
3025
3026 #[test]
3027 fn test_get_session_by_tool_offset() {
3028 let db = test_db();
3029 seed_sessions(&db);
3030 let row = db
3031 .get_session_by_tool_offset("claude-code", 0)
3032 .unwrap()
3033 .unwrap();
3034 assert_eq!(row.id, "s5");
3035 let row = db
3036 .get_session_by_tool_offset("claude-code", 1)
3037 .unwrap()
3038 .unwrap();
3039 assert_eq!(row.id, "s4");
3040 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
3041 assert_eq!(row.id, "s3");
3042 assert!(db
3043 .get_session_by_tool_offset("gemini", 1)
3044 .unwrap()
3045 .is_none());
3046 }
3047
3048 #[test]
3049 fn test_get_sessions_latest() {
3050 let db = test_db();
3051 seed_sessions(&db);
3052 let rows = db.get_sessions_latest(3).unwrap();
3053 assert_eq!(rows.len(), 3);
3054 assert_eq!(rows[0].id, "s5");
3055 assert_eq!(rows[1].id, "s4");
3056 assert_eq!(rows[2].id, "s3");
3057 }
3058
3059 #[test]
3060 fn test_get_sessions_by_tool_latest() {
3061 let db = test_db();
3062 seed_sessions(&db);
3063 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
3064 assert_eq!(rows.len(), 2);
3065 assert_eq!(rows[0].id, "s5");
3066 assert_eq!(rows[1].id, "s4");
3067 }
3068
3069 #[test]
3070 fn test_get_sessions_latest_more_than_available() {
3071 let db = test_db();
3072 seed_sessions(&db);
3073 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
3074 assert_eq!(rows.len(), 1); }
3076
3077 #[test]
3078 fn test_upsert_and_get_session_semantic_summary() {
3079 let db = test_db();
3080 seed_sessions(&db);
3081
3082 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3083 session_id: "s1",
3084 summary_json: r#"{"changes":"updated files","auth_security":"none detected","layer_file_changes":[]}"#,
3085 generated_at: "2026-03-04T10:00:00Z",
3086 provider: "codex_exec",
3087 model: Some("gpt-5"),
3088 source_kind: "session_signals",
3089 generation_kind: "provider",
3090 prompt_fingerprint: Some("abc123"),
3091 source_details_json: Some(r#"{"source":"session"}"#),
3092 diff_tree_json: Some(r#"[]"#),
3093 error: None,
3094 })
3095 .expect("upsert semantic summary");
3096
3097 let row = db
3098 .get_session_semantic_summary("s1")
3099 .expect("query semantic summary")
3100 .expect("summary row exists");
3101 assert_eq!(row.session_id, "s1");
3102 assert_eq!(row.provider, "codex_exec");
3103 assert_eq!(row.model.as_deref(), Some("gpt-5"));
3104 assert_eq!(row.source_kind, "session_signals");
3105 assert_eq!(row.generation_kind, "provider");
3106 assert_eq!(row.prompt_fingerprint.as_deref(), Some("abc123"));
3107 assert!(row.error.is_none());
3108 }
3109
3110 #[test]
3111 fn test_delete_session_removes_semantic_summary_row() {
3112 let db = test_db();
3113 seed_sessions(&db);
3114
3115 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3116 session_id: "s1",
3117 summary_json: r#"{"changes":"updated files","auth_security":"none detected","layer_file_changes":[]}"#,
3118 generated_at: "2026-03-04T10:00:00Z",
3119 provider: "heuristic",
3120 model: None,
3121 source_kind: "heuristic",
3122 generation_kind: "heuristic_fallback",
3123 prompt_fingerprint: None,
3124 source_details_json: None,
3125 diff_tree_json: None,
3126 error: Some("provider disabled"),
3127 })
3128 .expect("upsert semantic summary");
3129
3130 db.delete_session("s1").expect("delete session");
3131
3132 let missing = db
3133 .get_session_semantic_summary("s1")
3134 .expect("query semantic summary");
3135 assert!(missing.is_none());
3136 }
3137
3138 #[test]
3139 fn test_delete_session_removes_session_links_bidirectionally() {
3140 let db = test_db();
3141 seed_sessions(&db);
3142
3143 db.conn()
3144 .execute(
3145 "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) \
3146 VALUES (?1, ?2, 'handoff', datetime('now'))",
3147 params!["s1", "s2"],
3148 )
3149 .expect("insert forward link");
3150 db.conn()
3151 .execute(
3152 "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) \
3153 VALUES (?1, ?2, 'related', datetime('now'))",
3154 params!["s3", "s1"],
3155 )
3156 .expect("insert reverse link");
3157
3158 db.delete_session("s1").expect("delete root session");
3159
3160 let remaining: i64 = db
3161 .conn()
3162 .query_row(
3163 "SELECT COUNT(*) FROM session_links WHERE session_id = ?1 OR linked_session_id = ?1",
3164 params!["s1"],
3165 |row| row.get(0),
3166 )
3167 .expect("count linked rows");
3168 assert_eq!(remaining, 0);
3169 }
3170
3171 #[test]
3172 fn test_delete_expired_session_summaries_uses_generated_at_ttl() {
3173 let db = test_db();
3174 seed_sessions(&db);
3175
3176 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3177 session_id: "s1",
3178 summary_json: r#"{"changes":"old"}"#,
3179 generated_at: "2020-01-01T00:00:00Z",
3180 provider: "codex_exec",
3181 model: None,
3182 source_kind: "session_signals",
3183 generation_kind: "provider",
3184 prompt_fingerprint: None,
3185 source_details_json: None,
3186 diff_tree_json: None,
3187 error: None,
3188 })
3189 .expect("upsert old summary");
3190 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3191 session_id: "s2",
3192 summary_json: r#"{"changes":"new"}"#,
3193 generated_at: "2999-01-01T00:00:00Z",
3194 provider: "codex_exec",
3195 model: None,
3196 source_kind: "session_signals",
3197 generation_kind: "provider",
3198 prompt_fingerprint: None,
3199 source_details_json: None,
3200 diff_tree_json: None,
3201 error: None,
3202 })
3203 .expect("upsert new summary");
3204
3205 let deleted = db
3206 .delete_expired_session_summaries(30)
3207 .expect("delete expired summaries");
3208 assert_eq!(deleted, 1);
3209 assert!(db
3210 .get_session_semantic_summary("s1")
3211 .expect("query old summary")
3212 .is_none());
3213 assert!(db
3214 .get_session_semantic_summary("s2")
3215 .expect("query new summary")
3216 .is_some());
3217 }
3218
3219 #[test]
3220 fn test_list_all_session_ids_returns_sorted_ids() {
3221 let db = test_db();
3222 seed_sessions(&db);
3223
3224 let ids = db.list_all_session_ids().expect("list all session ids");
3225 assert_eq!(ids, vec!["s1", "s2", "s3", "s4", "s5"]);
3226 }
3227
3228 #[test]
3229 fn test_list_session_semantic_summary_ids_returns_sorted_ids() {
3230 let db = test_db();
3231 seed_sessions(&db);
3232
3233 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3234 session_id: "s4",
3235 summary_json: r#"{"changes":"delta"}"#,
3236 generated_at: "2026-03-04T10:00:00Z",
3237 provider: "codex_exec",
3238 model: Some("gpt-5"),
3239 source_kind: "session_signals",
3240 generation_kind: "provider",
3241 prompt_fingerprint: Some("fingerprint"),
3242 source_details_json: Some(r#"{"source":"session"}"#),
3243 diff_tree_json: Some(r#"[]"#),
3244 error: None,
3245 })
3246 .expect("upsert summary s4");
3247 db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3248 session_id: "s2",
3249 summary_json: r#"{"changes":"delta"}"#,
3250 generated_at: "2026-03-04T10:00:00Z",
3251 provider: "codex_exec",
3252 model: Some("gpt-5"),
3253 source_kind: "session_signals",
3254 generation_kind: "provider",
3255 prompt_fingerprint: Some("fingerprint"),
3256 source_details_json: Some(r#"{"source":"session"}"#),
3257 diff_tree_json: Some(r#"[]"#),
3258 error: None,
3259 })
3260 .expect("upsert summary s2");
3261
3262 let ids = db
3263 .list_session_semantic_summary_ids()
3264 .expect("list semantic summary ids");
3265 assert_eq!(ids, vec!["s2", "s4"]);
3266 }
3267
3268 #[test]
3269 fn test_list_expired_session_ids_uses_created_at_ttl() {
3270 let db = test_db();
3271 seed_sessions(&db);
3272
3273 let expired = db
3274 .list_expired_session_ids(30)
3275 .expect("list expired sessions");
3276 assert!(
3277 expired.contains(&"s1".to_string()),
3278 "older seeded sessions should be expired for 30-day keep"
3279 );
3280
3281 let none_expired = db
3282 .list_expired_session_ids(10_000)
3283 .expect("list non-expired sessions");
3284 assert!(
3285 none_expired.is_empty(),
3286 "seeded sessions should be retained with a large keep window"
3287 );
3288 }
3289
3290 #[test]
3291 fn test_build_fts_query_quotes_tokens() {
3292 assert_eq!(
3293 build_fts_query("parser retry"),
3294 Some("\"parser\" OR \"retry\"".to_string())
3295 );
3296 assert!(build_fts_query(" ").is_none());
3297 }
3298
3299 #[test]
3300 fn test_vector_chunk_replace_and_candidate_query() {
3301 let db = test_db();
3302 seed_sessions(&db);
3303
3304 let chunks = vec![
3305 VectorChunkUpsert {
3306 chunk_id: "chunk-s1-0".to_string(),
3307 session_id: "s1".to_string(),
3308 chunk_index: 0,
3309 start_line: 1,
3310 end_line: 8,
3311 line_count: 8,
3312 content: "parser selection retry after auth error".to_string(),
3313 content_hash: "hash-0".to_string(),
3314 embedding: vec![0.1, 0.2, 0.3],
3315 },
3316 VectorChunkUpsert {
3317 chunk_id: "chunk-s1-1".to_string(),
3318 session_id: "s1".to_string(),
3319 chunk_index: 1,
3320 start_line: 9,
3321 end_line: 15,
3322 line_count: 7,
3323 content: "session list refresh control wired to runtime".to_string(),
3324 content_hash: "hash-1".to_string(),
3325 embedding: vec![0.3, 0.2, 0.1],
3326 },
3327 ];
3328
3329 db.replace_session_vector_chunks("s1", "source-hash-s1", "bge-m3", &chunks)
3330 .expect("replace vector chunks");
3331
3332 let source_hash = db
3333 .vector_index_source_hash("s1")
3334 .expect("read source hash")
3335 .expect("source hash should exist");
3336 assert_eq!(source_hash, "source-hash-s1");
3337
3338 let matches = db
3339 .list_vector_chunk_candidates("parser retry", "bge-m3", 10)
3340 .expect("query vector chunk candidates");
3341 assert!(
3342 !matches.is_empty(),
3343 "vector FTS query should return at least one candidate"
3344 );
3345 assert_eq!(matches[0].session_id, "s1");
3346 assert!(matches[0].content.contains("parser"));
3347 }
3348
3349 #[test]
3350 fn test_delete_session_removes_vector_index_rows() {
3351 let db = test_db();
3352 seed_sessions(&db);
3353
3354 let chunks = vec![VectorChunkUpsert {
3355 chunk_id: "chunk-s1-delete".to_string(),
3356 session_id: "s1".to_string(),
3357 chunk_index: 0,
3358 start_line: 1,
3359 end_line: 2,
3360 line_count: 2,
3361 content: "delete me from vector cache".to_string(),
3362 content_hash: "hash-delete".to_string(),
3363 embedding: vec![0.7, 0.1, 0.2],
3364 }];
3365 db.replace_session_vector_chunks("s1", "delete-hash", "bge-m3", &chunks)
3366 .expect("insert vector chunk");
3367
3368 db.delete_session("s1")
3369 .expect("delete session with vector rows");
3370
3371 let candidates = db
3372 .list_vector_chunk_candidates("delete", "bge-m3", 10)
3373 .expect("query candidates after delete");
3374 assert!(
3375 candidates.iter().all(|row| row.session_id != "s1"),
3376 "vector rows for deleted session should be removed"
3377 );
3378 }
3379
3380 #[test]
3381 fn test_vector_index_job_round_trip() {
3382 let db = test_db();
3383 let payload = VectorIndexJobRow {
3384 status: "running".to_string(),
3385 processed_sessions: 2,
3386 total_sessions: 10,
3387 message: Some("indexing".to_string()),
3388 started_at: Some("2026-03-05T10:00:00Z".to_string()),
3389 finished_at: None,
3390 };
3391 db.set_vector_index_job(&payload)
3392 .expect("set vector index job snapshot");
3393
3394 let loaded = db
3395 .get_vector_index_job()
3396 .expect("read vector index job snapshot")
3397 .expect("vector index job row should exist");
3398 assert_eq!(loaded.status, "running");
3399 assert_eq!(loaded.processed_sessions, 2);
3400 assert_eq!(loaded.total_sessions, 10);
3401 assert_eq!(loaded.message.as_deref(), Some("indexing"));
3402 }
3403
3404 #[test]
3405 fn test_summary_batch_job_round_trip() {
3406 let db = test_db();
3407 let payload = SummaryBatchJobRow {
3408 status: "running".to_string(),
3409 processed_sessions: 4,
3410 total_sessions: 12,
3411 failed_sessions: 1,
3412 message: Some("processing summaries".to_string()),
3413 started_at: Some("2026-03-05T10:00:00Z".to_string()),
3414 finished_at: None,
3415 };
3416 db.set_summary_batch_job(&payload)
3417 .expect("set summary batch job snapshot");
3418
3419 let loaded = db
3420 .get_summary_batch_job()
3421 .expect("read summary batch job snapshot")
3422 .expect("summary batch job row should exist");
3423 assert_eq!(loaded.status, "running");
3424 assert_eq!(loaded.processed_sessions, 4);
3425 assert_eq!(loaded.total_sessions, 12);
3426 assert_eq!(loaded.failed_sessions, 1);
3427 assert_eq!(loaded.message.as_deref(), Some("processing summaries"));
3428 }
3429
3430 #[test]
3431 fn test_session_count() {
3432 let db = test_db();
3433 assert_eq!(db.session_count().unwrap(), 0);
3434 seed_sessions(&db);
3435 assert_eq!(db.session_count().unwrap(), 5);
3436 }
3437}