1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_core::session::{is_auxiliary_session, working_directory};
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Mutex;
13
14use git::GitContext;
15
16#[derive(Debug, Clone)]
18pub struct LocalSessionRow {
19 pub id: String,
20 pub source_path: Option<String>,
21 pub sync_status: String,
22 pub last_synced_at: Option<String>,
23 pub user_id: Option<String>,
24 pub nickname: Option<String>,
25 pub team_id: Option<String>,
26 pub tool: String,
27 pub agent_provider: Option<String>,
28 pub agent_model: Option<String>,
29 pub title: Option<String>,
30 pub description: Option<String>,
31 pub tags: Option<String>,
32 pub created_at: String,
33 pub uploaded_at: Option<String>,
34 pub message_count: i64,
35 pub user_message_count: i64,
36 pub task_count: i64,
37 pub event_count: i64,
38 pub duration_seconds: i64,
39 pub total_input_tokens: i64,
40 pub total_output_tokens: i64,
41 pub git_remote: Option<String>,
42 pub git_branch: Option<String>,
43 pub git_commit: Option<String>,
44 pub git_repo_name: Option<String>,
45 pub pr_number: Option<i64>,
46 pub pr_url: Option<String>,
47 pub working_directory: Option<String>,
48 pub files_modified: Option<String>,
49 pub files_read: Option<String>,
50 pub has_errors: bool,
51 pub max_active_agents: i64,
52 pub is_auxiliary: bool,
53}
54
55#[derive(Debug, Clone)]
57pub struct CommitLink {
58 pub commit_hash: String,
59 pub session_id: String,
60 pub repo_path: Option<String>,
61 pub branch: Option<String>,
62 pub created_at: String,
63}
64
65pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
67 row.tool == "opencode" && row.is_auxiliary
68}
69
70#[deprecated(
72 note = "Use parser/core canonical session role attributes instead of runtime file inspection"
73)]
74pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
75 let text = fs::read_to_string(source_path).ok()?;
76 let json: Value = serde_json::from_str(&text).ok()?;
77 lookup_parent_session_id(&json)
78}
79
80fn lookup_parent_session_id(value: &Value) -> Option<String> {
81 match value {
82 Value::Object(obj) => {
83 for (key, value) in obj {
84 if is_parent_id_key(key) {
85 if let Some(parent_id) = value.as_str() {
86 let parent_id = parent_id.trim();
87 if !parent_id.is_empty() {
88 return Some(parent_id.to_string());
89 }
90 }
91 }
92 if let Some(parent_id) = lookup_parent_session_id(value) {
93 return Some(parent_id);
94 }
95 }
96 None
97 }
98 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
99 _ => None,
100 }
101}
102
103fn is_parent_id_key(key: &str) -> bool {
104 let flat = key
105 .chars()
106 .filter(|c| c.is_ascii_alphanumeric())
107 .map(|c| c.to_ascii_lowercase())
108 .collect::<String>();
109
110 flat == "parentid"
111 || flat == "parentuuid"
112 || flat == "parentsessionid"
113 || flat == "parentsessionuuid"
114 || flat.ends_with("parentsessionid")
115 || (flat.contains("parent") && flat.ends_with("id"))
116 || (flat.contains("parent") && flat.ends_with("uuid"))
117}
118
119pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
121 rows.retain(|row| !row.is_auxiliary);
122 rows
123}
124
125fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
126 let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
127
128 if source_path.contains("/.codex/sessions/")
129 || source_path.contains("\\.codex\\sessions\\")
130 || source_path.contains("/codex/sessions/")
131 || source_path.contains("\\codex\\sessions\\")
132 {
133 return Some("codex");
134 }
135
136 if source_path.contains("/.claude/projects/")
137 || source_path.contains("\\.claude\\projects\\")
138 || source_path.contains("/claude/projects/")
139 || source_path.contains("\\claude\\projects\\")
140 {
141 return Some("claude-code");
142 }
143
144 None
145}
146
147fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
148 infer_tool_from_source_path(source_path)
149 .unwrap_or(current_tool)
150 .to_string()
151}
152
153#[derive(Debug, Clone)]
155pub struct LocalSessionFilter {
156 pub team_id: Option<String>,
157 pub sync_status: Option<String>,
158 pub git_repo_name: Option<String>,
159 pub search: Option<String>,
160 pub tool: Option<String>,
161 pub sort: LocalSortOrder,
162 pub time_range: LocalTimeRange,
163 pub limit: Option<u32>,
164 pub offset: Option<u32>,
165}
166
167impl Default for LocalSessionFilter {
168 fn default() -> Self {
169 Self {
170 team_id: None,
171 sync_status: None,
172 git_repo_name: None,
173 search: None,
174 tool: None,
175 sort: LocalSortOrder::Recent,
176 time_range: LocalTimeRange::All,
177 limit: None,
178 offset: None,
179 }
180 }
181}
182
183#[derive(Debug, Clone, Default, PartialEq, Eq)]
185pub enum LocalSortOrder {
186 #[default]
187 Recent,
188 Popular,
189 Longest,
190}
191
192#[derive(Debug, Clone, Default, PartialEq, Eq)]
194pub enum LocalTimeRange {
195 Hours24,
196 Days7,
197 Days30,
198 #[default]
199 All,
200}
201
202#[derive(Debug, Clone)]
204pub struct RemoteSessionSummary {
205 pub id: String,
206 pub user_id: Option<String>,
207 pub nickname: Option<String>,
208 pub team_id: String,
209 pub tool: String,
210 pub agent_provider: Option<String>,
211 pub agent_model: Option<String>,
212 pub title: Option<String>,
213 pub description: Option<String>,
214 pub tags: Option<String>,
215 pub created_at: String,
216 pub uploaded_at: String,
217 pub message_count: i64,
218 pub task_count: i64,
219 pub event_count: i64,
220 pub duration_seconds: i64,
221 pub total_input_tokens: i64,
222 pub total_output_tokens: i64,
223 pub git_remote: Option<String>,
224 pub git_branch: Option<String>,
225 pub git_commit: Option<String>,
226 pub git_repo_name: Option<String>,
227 pub pr_number: Option<i64>,
228 pub pr_url: Option<String>,
229 pub working_directory: Option<String>,
230 pub files_modified: Option<String>,
231 pub files_read: Option<String>,
232 pub has_errors: bool,
233 pub max_active_agents: i64,
234}
235
236#[derive(Debug, Default)]
238pub struct LogFilter {
239 pub tool: Option<String>,
241 pub model: Option<String>,
243 pub since: Option<String>,
245 pub before: Option<String>,
247 pub touches: Option<String>,
249 pub grep: Option<String>,
251 pub has_errors: Option<bool>,
253 pub working_directory: Option<String>,
255 pub git_repo_name: Option<String>,
257 pub commit: Option<String>,
259 pub limit: Option<u32>,
261 pub offset: Option<u32>,
263}
264
265const FROM_CLAUSE: &str = "\
267FROM sessions s \
268LEFT JOIN session_sync ss ON ss.session_id = s.id \
269LEFT JOIN users u ON u.id = s.user_id";
270
271pub struct LocalDb {
275 conn: Mutex<Connection>,
276}
277
278impl LocalDb {
279 pub fn open() -> Result<Self> {
282 let path = default_db_path()?;
283 Self::open_path(&path)
284 }
285
286 pub fn open_path(path: &PathBuf) -> Result<Self> {
288 if let Some(parent) = path.parent() {
289 std::fs::create_dir_all(parent)
290 .with_context(|| format!("create dir for {}", path.display()))?;
291 }
292 match open_connection_with_latest_schema(path) {
293 Ok(conn) => Ok(Self {
294 conn: Mutex::new(conn),
295 }),
296 Err(err) => {
297 if !is_schema_compat_error(&err) {
298 return Err(err);
299 }
300
301 rotate_legacy_db(path)?;
304
305 let conn = open_connection_with_latest_schema(path)
306 .with_context(|| format!("recreate db {}", path.display()))?;
307 Ok(Self {
308 conn: Mutex::new(conn),
309 })
310 }
311 }
312 }
313
314 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
315 self.conn.lock().expect("local db mutex poisoned")
316 }
317
318 pub fn upsert_local_session(
321 &self,
322 session: &Session,
323 source_path: &str,
324 git: &GitContext,
325 ) -> Result<()> {
326 let title = session.context.title.as_deref();
327 let description = session.context.description.as_deref();
328 let tags = if session.context.tags.is_empty() {
329 None
330 } else {
331 Some(session.context.tags.join(","))
332 };
333 let created_at = session.context.created_at.to_rfc3339();
334 let cwd = working_directory(session).map(String::from);
335 let is_auxiliary = is_auxiliary_session(session);
336
337 let (files_modified, files_read, has_errors) =
339 opensession_core::extract::extract_file_metadata(session);
340 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
341 let normalized_tool =
342 normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
343
344 let conn = self.conn();
345 conn.execute(
348 "INSERT INTO sessions \
349 (id, team_id, tool, agent_provider, agent_model, \
350 title, description, tags, created_at, \
351 message_count, user_message_count, task_count, event_count, duration_seconds, \
352 total_input_tokens, total_output_tokens, body_storage_key, \
353 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
354 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
355 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24,?25) \
356 ON CONFLICT(id) DO UPDATE SET \
357 tool=excluded.tool, agent_provider=excluded.agent_provider, \
358 agent_model=excluded.agent_model, \
359 title=excluded.title, description=excluded.description, \
360 tags=excluded.tags, \
361 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
362 task_count=excluded.task_count, \
363 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
364 total_input_tokens=excluded.total_input_tokens, \
365 total_output_tokens=excluded.total_output_tokens, \
366 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
367 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
368 working_directory=excluded.working_directory, \
369 files_modified=excluded.files_modified, files_read=excluded.files_read, \
370 has_errors=excluded.has_errors, \
371 max_active_agents=excluded.max_active_agents, \
372 is_auxiliary=excluded.is_auxiliary",
373 params![
374 &session.session_id,
375 &normalized_tool,
376 &session.agent.provider,
377 &session.agent.model,
378 title,
379 description,
380 &tags,
381 &created_at,
382 session.stats.message_count as i64,
383 session.stats.user_message_count as i64,
384 session.stats.task_count as i64,
385 session.stats.event_count as i64,
386 session.stats.duration_seconds as i64,
387 session.stats.total_input_tokens as i64,
388 session.stats.total_output_tokens as i64,
389 &git.remote,
390 &git.branch,
391 &git.commit,
392 &git.repo_name,
393 &cwd,
394 &files_modified,
395 &files_read,
396 has_errors,
397 max_active_agents,
398 is_auxiliary as i64,
399 ],
400 )?;
401
402 conn.execute(
403 "INSERT INTO session_sync (session_id, source_path, sync_status) \
404 VALUES (?1, ?2, 'local_only') \
405 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
406 params![&session.session_id, source_path],
407 )?;
408 Ok(())
409 }
410
411 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
414 let conn = self.conn();
415 conn.execute(
418 "INSERT INTO sessions \
419 (id, user_id, team_id, tool, agent_provider, agent_model, \
420 title, description, tags, created_at, uploaded_at, \
421 message_count, task_count, event_count, duration_seconds, \
422 total_input_tokens, total_output_tokens, body_storage_key, \
423 git_remote, git_branch, git_commit, git_repo_name, \
424 pr_number, pr_url, working_directory, \
425 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
426 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,0) \
427 ON CONFLICT(id) DO UPDATE SET \
428 title=excluded.title, description=excluded.description, \
429 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
430 message_count=excluded.message_count, task_count=excluded.task_count, \
431 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
432 total_input_tokens=excluded.total_input_tokens, \
433 total_output_tokens=excluded.total_output_tokens, \
434 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
435 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
436 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
437 working_directory=excluded.working_directory, \
438 files_modified=excluded.files_modified, files_read=excluded.files_read, \
439 has_errors=excluded.has_errors, \
440 max_active_agents=excluded.max_active_agents, \
441 is_auxiliary=excluded.is_auxiliary",
442 params![
443 &summary.id,
444 &summary.user_id,
445 &summary.team_id,
446 &summary.tool,
447 &summary.agent_provider,
448 &summary.agent_model,
449 &summary.title,
450 &summary.description,
451 &summary.tags,
452 &summary.created_at,
453 &summary.uploaded_at,
454 summary.message_count,
455 summary.task_count,
456 summary.event_count,
457 summary.duration_seconds,
458 summary.total_input_tokens,
459 summary.total_output_tokens,
460 &summary.git_remote,
461 &summary.git_branch,
462 &summary.git_commit,
463 &summary.git_repo_name,
464 summary.pr_number,
465 &summary.pr_url,
466 &summary.working_directory,
467 &summary.files_modified,
468 &summary.files_read,
469 summary.has_errors,
470 summary.max_active_agents,
471 ],
472 )?;
473
474 conn.execute(
475 "INSERT INTO session_sync (session_id, sync_status) \
476 VALUES (?1, 'remote_only') \
477 ON CONFLICT(session_id) DO UPDATE SET \
478 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
479 params![&summary.id],
480 )?;
481 Ok(())
482 }
483
484 fn build_local_session_where_clause(
487 filter: &LocalSessionFilter,
488 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
489 let mut where_clauses = vec![
490 "1=1".to_string(),
491 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
492 ];
493 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
494 let mut idx = 1u32;
495
496 if let Some(ref team_id) = filter.team_id {
497 where_clauses.push(format!("s.team_id = ?{idx}"));
498 param_values.push(Box::new(team_id.clone()));
499 idx += 1;
500 }
501
502 if let Some(ref sync_status) = filter.sync_status {
503 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
504 param_values.push(Box::new(sync_status.clone()));
505 idx += 1;
506 }
507
508 if let Some(ref repo) = filter.git_repo_name {
509 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
510 param_values.push(Box::new(repo.clone()));
511 idx += 1;
512 }
513
514 if let Some(ref tool) = filter.tool {
515 where_clauses.push(format!("s.tool = ?{idx}"));
516 param_values.push(Box::new(tool.clone()));
517 idx += 1;
518 }
519
520 if let Some(ref search) = filter.search {
521 let like = format!("%{search}%");
522 where_clauses.push(format!(
523 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
524 i1 = idx,
525 i2 = idx + 1,
526 i3 = idx + 2,
527 ));
528 param_values.push(Box::new(like.clone()));
529 param_values.push(Box::new(like.clone()));
530 param_values.push(Box::new(like));
531 idx += 3;
532 }
533
534 let interval = match filter.time_range {
535 LocalTimeRange::Hours24 => Some("-1 day"),
536 LocalTimeRange::Days7 => Some("-7 days"),
537 LocalTimeRange::Days30 => Some("-30 days"),
538 LocalTimeRange::All => None,
539 };
540 if let Some(interval) = interval {
541 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
542 param_values.push(Box::new(interval.to_string()));
543 }
544
545 (where_clauses.join(" AND "), param_values)
546 }
547
548 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
549 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
550 let order_clause = match filter.sort {
551 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
552 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
553 LocalSortOrder::Recent => "s.created_at DESC",
554 };
555
556 let mut sql = format!(
557 "SELECT {LOCAL_SESSION_COLUMNS} \
558 {FROM_CLAUSE} WHERE {where_str} \
559 ORDER BY {order_clause}"
560 );
561
562 if let Some(limit) = filter.limit {
563 sql.push_str(" LIMIT ?");
564 param_values.push(Box::new(limit));
565 if let Some(offset) = filter.offset {
566 sql.push_str(" OFFSET ?");
567 param_values.push(Box::new(offset));
568 }
569 }
570
571 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
572 param_values.iter().map(|p| p.as_ref()).collect();
573 let conn = self.conn();
574 let mut stmt = conn.prepare(&sql)?;
575 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
576
577 let mut result = Vec::new();
578 for row in rows {
579 result.push(row?);
580 }
581
582 Ok(result)
583 }
584
585 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
587 let mut count_filter = filter.clone();
588 count_filter.limit = None;
589 count_filter.offset = None;
590 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
591 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
592 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
593 param_values.iter().map(|p| p.as_ref()).collect();
594 let conn = self.conn();
595 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
596 Ok(count)
597 }
598
599 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
601 let mut tool_filter = filter.clone();
602 tool_filter.tool = None;
603 tool_filter.limit = None;
604 tool_filter.offset = None;
605 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
606 let sql = format!(
607 "SELECT DISTINCT s.tool \
608 {FROM_CLAUSE} WHERE {where_str} \
609 ORDER BY s.tool ASC"
610 );
611 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
612 param_values.iter().map(|p| p.as_ref()).collect();
613 let conn = self.conn();
614 let mut stmt = conn.prepare(&sql)?;
615 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
616
617 let mut tools = Vec::new();
618 for row in rows {
619 let tool = row?;
620 if !tool.trim().is_empty() {
621 tools.push(tool);
622 }
623 }
624 Ok(tools)
625 }
626
627 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
631 let mut where_clauses = vec![
632 "1=1".to_string(),
633 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
634 ];
635 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
636 let mut idx = 1u32;
637
638 if let Some(ref tool) = filter.tool {
639 where_clauses.push(format!("s.tool = ?{idx}"));
640 param_values.push(Box::new(tool.clone()));
641 idx += 1;
642 }
643
644 if let Some(ref model) = filter.model {
645 let like = model.replace('*', "%");
646 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
647 param_values.push(Box::new(like));
648 idx += 1;
649 }
650
651 if let Some(ref since) = filter.since {
652 where_clauses.push(format!("s.created_at >= ?{idx}"));
653 param_values.push(Box::new(since.clone()));
654 idx += 1;
655 }
656
657 if let Some(ref before) = filter.before {
658 where_clauses.push(format!("s.created_at < ?{idx}"));
659 param_values.push(Box::new(before.clone()));
660 idx += 1;
661 }
662
663 if let Some(ref touches) = filter.touches {
664 let like = format!("%\"{touches}\"%");
665 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
666 param_values.push(Box::new(like));
667 idx += 1;
668 }
669
670 if let Some(ref grep) = filter.grep {
671 let like = format!("%{grep}%");
672 where_clauses.push(format!(
673 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
674 i1 = idx,
675 i2 = idx + 1,
676 i3 = idx + 2,
677 ));
678 param_values.push(Box::new(like.clone()));
679 param_values.push(Box::new(like.clone()));
680 param_values.push(Box::new(like));
681 idx += 3;
682 }
683
684 if let Some(true) = filter.has_errors {
685 where_clauses.push("s.has_errors = 1".to_string());
686 }
687
688 if let Some(ref wd) = filter.working_directory {
689 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
690 param_values.push(Box::new(format!("{wd}%")));
691 idx += 1;
692 }
693
694 if let Some(ref repo) = filter.git_repo_name {
695 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
696 param_values.push(Box::new(repo.clone()));
697 idx += 1;
698 }
699
700 let mut extra_join = String::new();
702 if let Some(ref commit) = filter.commit {
703 extra_join =
704 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
705 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
706 param_values.push(Box::new(commit.clone()));
707 idx += 1;
708 }
709
710 let _ = idx; let where_str = where_clauses.join(" AND ");
713 let mut sql = format!(
714 "SELECT {LOCAL_SESSION_COLUMNS} \
715 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
716 ORDER BY s.created_at DESC"
717 );
718
719 if let Some(limit) = filter.limit {
720 sql.push_str(" LIMIT ?");
721 param_values.push(Box::new(limit));
722 if let Some(offset) = filter.offset {
723 sql.push_str(" OFFSET ?");
724 param_values.push(Box::new(offset));
725 }
726 }
727
728 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
729 param_values.iter().map(|p| p.as_ref()).collect();
730 let conn = self.conn();
731 let mut stmt = conn.prepare(&sql)?;
732 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
733
734 let mut result = Vec::new();
735 for row in rows {
736 result.push(row?);
737 }
738 Ok(result)
739 }
740
741 pub fn get_sessions_by_tool_latest(
743 &self,
744 tool: &str,
745 count: u32,
746 ) -> Result<Vec<LocalSessionRow>> {
747 let sql = format!(
748 "SELECT {LOCAL_SESSION_COLUMNS} \
749 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
750 ORDER BY s.created_at DESC"
751 );
752 let conn = self.conn();
753 let mut stmt = conn.prepare(&sql)?;
754 let rows = stmt.query_map(params![tool], row_to_local_session)?;
755 let mut result = Vec::new();
756 for row in rows {
757 result.push(row?);
758 }
759
760 result.truncate(count as usize);
761 Ok(result)
762 }
763
764 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
766 let sql = format!(
767 "SELECT {LOCAL_SESSION_COLUMNS} \
768 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
769 ORDER BY s.created_at DESC"
770 );
771 let conn = self.conn();
772 let mut stmt = conn.prepare(&sql)?;
773 let rows = stmt.query_map([], row_to_local_session)?;
774 let mut result = Vec::new();
775 for row in rows {
776 result.push(row?);
777 }
778
779 result.truncate(count as usize);
780 Ok(result)
781 }
782
783 pub fn get_session_by_tool_offset(
785 &self,
786 tool: &str,
787 offset: u32,
788 ) -> Result<Option<LocalSessionRow>> {
789 let sql = format!(
790 "SELECT {LOCAL_SESSION_COLUMNS} \
791 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
792 ORDER BY s.created_at DESC"
793 );
794 let conn = self.conn();
795 let mut stmt = conn.prepare(&sql)?;
796 let rows = stmt.query_map(params![tool], row_to_local_session)?;
797 let result = rows.collect::<Result<Vec<_>, _>>()?;
798 Ok(result.into_iter().nth(offset as usize))
799 }
800
801 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
803 let sql = format!(
804 "SELECT {LOCAL_SESSION_COLUMNS} \
805 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
806 ORDER BY s.created_at DESC"
807 );
808 let conn = self.conn();
809 let mut stmt = conn.prepare(&sql)?;
810 let rows = stmt.query_map([], row_to_local_session)?;
811 let result = rows.collect::<Result<Vec<_>, _>>()?;
812 Ok(result.into_iter().nth(offset as usize))
813 }
814
815 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
817 let conn = self.conn();
818 let result = conn
819 .query_row(
820 "SELECT source_path FROM session_sync WHERE session_id = ?1",
821 params![session_id],
822 |row| row.get(0),
823 )
824 .optional()?;
825
826 Ok(result)
827 }
828
829 pub fn session_count(&self) -> Result<i64> {
831 let count = self
832 .conn()
833 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
834 Ok(count)
835 }
836
837 pub fn delete_session(&self, session_id: &str) -> Result<()> {
840 let conn = self.conn();
841 conn.execute(
842 "DELETE FROM body_cache WHERE session_id = ?1",
843 params![session_id],
844 )?;
845 conn.execute(
846 "DELETE FROM session_sync WHERE session_id = ?1",
847 params![session_id],
848 )?;
849 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
850 Ok(())
851 }
852
853 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
856 let cursor = self
857 .conn()
858 .query_row(
859 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
860 params![team_id],
861 |row| row.get(0),
862 )
863 .optional()?;
864 Ok(cursor)
865 }
866
867 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
868 self.conn().execute(
869 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
870 VALUES (?1, ?2, datetime('now')) \
871 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
872 params![team_id, cursor],
873 )?;
874 Ok(())
875 }
876
877 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
881 let sql = format!(
882 "SELECT {LOCAL_SESSION_COLUMNS} \
883 FROM sessions s \
884 INNER JOIN session_sync ss ON ss.session_id = s.id \
885 LEFT JOIN users u ON u.id = s.user_id \
886 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
887 ORDER BY s.created_at ASC"
888 );
889 let conn = self.conn();
890 let mut stmt = conn.prepare(&sql)?;
891 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
892 let mut result = Vec::new();
893 for row in rows {
894 result.push(row?);
895 }
896 Ok(result)
897 }
898
899 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
900 self.conn().execute(
901 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
902 WHERE session_id = ?1",
903 params![session_id],
904 )?;
905 Ok(())
906 }
907
908 pub fn was_uploaded_after(
910 &self,
911 source_path: &str,
912 modified: &chrono::DateTime<chrono::Utc>,
913 ) -> Result<bool> {
914 let result: Option<String> = self
915 .conn()
916 .query_row(
917 "SELECT last_synced_at FROM session_sync \
918 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
919 params![source_path],
920 |row| row.get(0),
921 )
922 .optional()?;
923
924 if let Some(synced_at) = result {
925 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
926 return Ok(dt >= *modified);
927 }
928 }
929 Ok(false)
930 }
931
932 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
935 self.conn().execute(
936 "INSERT INTO body_cache (session_id, body, cached_at) \
937 VALUES (?1, ?2, datetime('now')) \
938 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
939 params![session_id, body],
940 )?;
941 Ok(())
942 }
943
944 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
945 let body = self
946 .conn()
947 .query_row(
948 "SELECT body FROM body_cache WHERE session_id = ?1",
949 params![session_id],
950 |row| row.get(0),
951 )
952 .optional()?;
953 Ok(body)
954 }
955
956 pub fn migrate_from_state_json(
961 &self,
962 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
963 ) -> Result<usize> {
964 let mut count = 0;
965 for (path, uploaded_at) in uploaded {
966 let exists: bool = self
967 .conn()
968 .query_row(
969 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
970 params![path],
971 |row| row.get(0),
972 )
973 .unwrap_or(false);
974
975 if exists {
976 self.conn().execute(
977 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
978 WHERE source_path = ?2 AND sync_status = 'local_only'",
979 params![uploaded_at.to_rfc3339(), path],
980 )?;
981 count += 1;
982 }
983 }
984 Ok(count)
985 }
986
987 pub fn link_commit_session(
991 &self,
992 commit_hash: &str,
993 session_id: &str,
994 repo_path: Option<&str>,
995 branch: Option<&str>,
996 ) -> Result<()> {
997 self.conn().execute(
998 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
999 VALUES (?1, ?2, ?3, ?4) \
1000 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1001 params![commit_hash, session_id, repo_path, branch],
1002 )?;
1003 Ok(())
1004 }
1005
1006 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1008 let sql = format!(
1009 "SELECT {LOCAL_SESSION_COLUMNS} \
1010 {FROM_CLAUSE} \
1011 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1012 WHERE csl.commit_hash = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
1013 ORDER BY s.created_at DESC"
1014 );
1015 let conn = self.conn();
1016 let mut stmt = conn.prepare(&sql)?;
1017 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1018 let mut result = Vec::new();
1019 for row in rows {
1020 result.push(row?);
1021 }
1022 Ok(result)
1023 }
1024
1025 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1027 let conn = self.conn();
1028 let mut stmt = conn.prepare(
1029 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1030 FROM commit_session_links WHERE session_id = ?1 \
1031 ORDER BY created_at DESC",
1032 )?;
1033 let rows = stmt.query_map(params![session_id], |row| {
1034 Ok(CommitLink {
1035 commit_hash: row.get(0)?,
1036 session_id: row.get(1)?,
1037 repo_path: row.get(2)?,
1038 branch: row.get(3)?,
1039 created_at: row.get(4)?,
1040 })
1041 })?;
1042 let mut result = Vec::new();
1043 for row in rows {
1044 result.push(row?);
1045 }
1046 Ok(result)
1047 }
1048
1049 pub fn find_active_session_for_repo(
1053 &self,
1054 repo_path: &str,
1055 since_minutes: u32,
1056 ) -> Result<Option<LocalSessionRow>> {
1057 let sql = format!(
1058 "SELECT {LOCAL_SESSION_COLUMNS} \
1059 {FROM_CLAUSE} \
1060 WHERE s.working_directory LIKE ?1 \
1061 AND COALESCE(s.is_auxiliary, 0) = 0 \
1062 AND s.created_at >= datetime('now', ?2) \
1063 ORDER BY s.created_at DESC LIMIT 1"
1064 );
1065 let since = format!("-{since_minutes} minutes");
1066 let like = format!("{repo_path}%");
1067 let conn = self.conn();
1068 let mut stmt = conn.prepare(&sql)?;
1069 let row = stmt
1070 .query_map(params![like, since], row_to_local_session)?
1071 .next()
1072 .transpose()?;
1073 Ok(row)
1074 }
1075
1076 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1078 let conn = self.conn();
1079 let mut stmt = conn
1080 .prepare("SELECT id FROM sessions")
1081 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1082 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1083 let mut set = std::collections::HashSet::new();
1084 if let Ok(rows) = rows {
1085 for row in rows.flatten() {
1086 set.insert(row);
1087 }
1088 }
1089 set
1090 }
1091
1092 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1094 let title = session.context.title.as_deref();
1095 let description = session.context.description.as_deref();
1096 let (files_modified, files_read, has_errors) =
1097 opensession_core::extract::extract_file_metadata(session);
1098 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1099 let is_auxiliary = is_auxiliary_session(session);
1100
1101 self.conn().execute(
1102 "UPDATE sessions SET \
1103 title=?2, description=?3, \
1104 message_count=?4, user_message_count=?5, task_count=?6, \
1105 event_count=?7, duration_seconds=?8, \
1106 total_input_tokens=?9, total_output_tokens=?10, \
1107 files_modified=?11, files_read=?12, has_errors=?13, \
1108 max_active_agents=?14, is_auxiliary=?15 \
1109 WHERE id=?1",
1110 params![
1111 &session.session_id,
1112 title,
1113 description,
1114 session.stats.message_count as i64,
1115 session.stats.user_message_count as i64,
1116 session.stats.task_count as i64,
1117 session.stats.event_count as i64,
1118 session.stats.duration_seconds as i64,
1119 session.stats.total_input_tokens as i64,
1120 session.stats.total_output_tokens as i64,
1121 &files_modified,
1122 &files_read,
1123 has_errors,
1124 max_active_agents,
1125 is_auxiliary as i64,
1126 ],
1127 )?;
1128 Ok(())
1129 }
1130
1131 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1133 self.conn().execute(
1134 "INSERT INTO session_sync (session_id, source_path) \
1135 VALUES (?1, ?2) \
1136 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1137 params![session_id, source_path],
1138 )?;
1139 Ok(())
1140 }
1141
1142 pub fn list_repos(&self) -> Result<Vec<String>> {
1144 let conn = self.conn();
1145 let mut stmt = conn.prepare(
1146 "SELECT DISTINCT git_repo_name FROM sessions \
1147 WHERE git_repo_name IS NOT NULL AND COALESCE(is_auxiliary, 0) = 0 \
1148 ORDER BY git_repo_name ASC",
1149 )?;
1150 let rows = stmt.query_map([], |row| row.get(0))?;
1151 let mut result = Vec::new();
1152 for row in rows {
1153 result.push(row?);
1154 }
1155 Ok(result)
1156 }
1157
1158 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1160 let conn = self.conn();
1161 let mut stmt = conn.prepare(
1162 "SELECT DISTINCT working_directory FROM sessions \
1163 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1164 AND COALESCE(is_auxiliary, 0) = 0 \
1165 ORDER BY working_directory ASC",
1166 )?;
1167 let rows = stmt.query_map([], |row| row.get(0))?;
1168 let mut result = Vec::new();
1169 for row in rows {
1170 result.push(row?);
1171 }
1172 Ok(result)
1173 }
1174}
1175
1176fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1179 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1180 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1181
1182 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1184
1185 apply_local_migrations(&conn)?;
1186
1187 ensure_sessions_columns(&conn)?;
1190 repair_session_tools_from_source_path(&conn)?;
1191 validate_local_schema(&conn)?;
1192
1193 Ok(conn)
1194}
1195
1196fn apply_local_migrations(conn: &Connection) -> Result<()> {
1197 conn.execute_batch(
1198 "CREATE TABLE IF NOT EXISTS _migrations (
1199 id INTEGER PRIMARY KEY,
1200 name TEXT NOT NULL UNIQUE,
1201 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1202 );",
1203 )
1204 .context("create _migrations table for local db")?;
1205
1206 for (name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1207 let already_applied: bool = conn
1208 .query_row(
1209 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1210 [name],
1211 |row| row.get(0),
1212 )
1213 .unwrap_or(false);
1214
1215 if already_applied {
1216 continue;
1217 }
1218
1219 if let Err(e) = conn.execute_batch(sql) {
1220 let msg = e.to_string().to_ascii_lowercase();
1221 if !is_local_migration_compat_error(&msg) {
1222 return Err(e).with_context(|| format!("apply local migration {name}"));
1223 }
1224 }
1225
1226 conn.execute(
1227 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1228 [name],
1229 )
1230 .with_context(|| format!("record local migration {name}"))?;
1231 }
1232
1233 Ok(())
1234}
1235
1236fn is_local_migration_compat_error(msg: &str) -> bool {
1237 msg.contains("duplicate column name")
1238 || msg.contains("no such column")
1239 || msg.contains("already exists")
1240}
1241
1242fn validate_local_schema(conn: &Connection) -> Result<()> {
1243 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1244 conn.prepare(&sql)
1245 .map(|_| ())
1246 .context("validate local session schema")
1247}
1248
1249fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1250 let mut stmt = conn.prepare(
1251 "SELECT s.id, s.tool, ss.source_path \
1252 FROM sessions s \
1253 LEFT JOIN session_sync ss ON ss.session_id = s.id \
1254 WHERE ss.source_path IS NOT NULL",
1255 )?;
1256 let rows = stmt.query_map([], |row| {
1257 Ok((
1258 row.get::<_, String>(0)?,
1259 row.get::<_, String>(1)?,
1260 row.get::<_, Option<String>>(2)?,
1261 ))
1262 })?;
1263
1264 let mut updates: Vec<(String, String)> = Vec::new();
1265 for row in rows {
1266 let (id, current_tool, source_path) = row?;
1267 let normalized = normalize_tool_for_source_path(¤t_tool, source_path.as_deref());
1268 if normalized != current_tool {
1269 updates.push((id, normalized));
1270 }
1271 }
1272 drop(stmt);
1273
1274 for (id, tool) in updates {
1275 conn.execute(
1276 "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1277 params![tool, id],
1278 )?;
1279 }
1280
1281 Ok(())
1282}
1283
1284fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1285 let msg = format!("{err:#}").to_ascii_lowercase();
1286 msg.contains("no such column")
1287 || msg.contains("no such table")
1288 || msg.contains("cannot add a column")
1289 || msg.contains("already exists")
1290 || msg.contains("views may not be indexed")
1291 || msg.contains("malformed database schema")
1292 || msg.contains("duplicate column name")
1293}
1294
1295fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1296 if !path.exists() {
1297 return Ok(());
1298 }
1299
1300 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1301 let backup_name = format!(
1302 "{}.legacy-{}.bak",
1303 path.file_name()
1304 .and_then(|n| n.to_str())
1305 .unwrap_or("local.db"),
1306 ts
1307 );
1308 let backup_path = path.with_file_name(backup_name);
1309 std::fs::rename(path, &backup_path).with_context(|| {
1310 format!(
1311 "rotate local db backup {} -> {}",
1312 path.display(),
1313 backup_path.display()
1314 )
1315 })?;
1316
1317 let wal = PathBuf::from(format!("{}-wal", path.display()));
1318 let shm = PathBuf::from(format!("{}-shm", path.display()));
1319 let _ = std::fs::remove_file(wal);
1320 let _ = std::fs::remove_file(shm);
1321 Ok(())
1322}
1323
1324const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1325 ("user_id", "TEXT"),
1326 ("team_id", "TEXT DEFAULT 'personal'"),
1327 ("tool", "TEXT DEFAULT ''"),
1328 ("agent_provider", "TEXT"),
1329 ("agent_model", "TEXT"),
1330 ("title", "TEXT"),
1331 ("description", "TEXT"),
1332 ("tags", "TEXT"),
1333 ("created_at", "TEXT DEFAULT ''"),
1334 ("uploaded_at", "TEXT DEFAULT ''"),
1335 ("message_count", "INTEGER DEFAULT 0"),
1336 ("user_message_count", "INTEGER DEFAULT 0"),
1337 ("task_count", "INTEGER DEFAULT 0"),
1338 ("event_count", "INTEGER DEFAULT 0"),
1339 ("duration_seconds", "INTEGER DEFAULT 0"),
1340 ("total_input_tokens", "INTEGER DEFAULT 0"),
1341 ("total_output_tokens", "INTEGER DEFAULT 0"),
1342 ("body_storage_key", "TEXT DEFAULT ''"),
1344 ("body_url", "TEXT"),
1345 ("git_remote", "TEXT"),
1346 ("git_branch", "TEXT"),
1347 ("git_commit", "TEXT"),
1348 ("git_repo_name", "TEXT"),
1349 ("pr_number", "INTEGER"),
1350 ("pr_url", "TEXT"),
1351 ("working_directory", "TEXT"),
1352 ("files_modified", "TEXT"),
1353 ("files_read", "TEXT"),
1354 ("has_errors", "BOOLEAN DEFAULT 0"),
1355 ("max_active_agents", "INTEGER DEFAULT 1"),
1356 ("is_auxiliary", "INTEGER NOT NULL DEFAULT 0"),
1357];
1358
1359fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1360 let mut existing = HashSet::new();
1361 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1362 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1363 for row in rows {
1364 existing.insert(row?);
1365 }
1366
1367 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1368 if existing.contains(*name) {
1369 continue;
1370 }
1371 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1372 conn.execute_batch(&sql)
1373 .with_context(|| format!("add sessions column '{name}'"))?;
1374 }
1375
1376 Ok(())
1377}
1378
1379pub const LOCAL_SESSION_COLUMNS: &str = "\
1381s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1382s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1383s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1384s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1385s.total_input_tokens, s.total_output_tokens, \
1386s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1387s.pr_number, s.pr_url, s.working_directory, \
1388s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1), COALESCE(s.is_auxiliary, 0)";
1389
1390fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1391 let source_path: Option<String> = row.get(1)?;
1392 let tool: String = row.get(7)?;
1393 let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1394
1395 Ok(LocalSessionRow {
1396 id: row.get(0)?,
1397 source_path,
1398 sync_status: row.get(2)?,
1399 last_synced_at: row.get(3)?,
1400 user_id: row.get(4)?,
1401 nickname: row.get(5)?,
1402 team_id: row.get(6)?,
1403 tool: normalized_tool,
1404 agent_provider: row.get(8)?,
1405 agent_model: row.get(9)?,
1406 title: row.get(10)?,
1407 description: row.get(11)?,
1408 tags: row.get(12)?,
1409 created_at: row.get(13)?,
1410 uploaded_at: row.get(14)?,
1411 message_count: row.get(15)?,
1412 user_message_count: row.get(16)?,
1413 task_count: row.get(17)?,
1414 event_count: row.get(18)?,
1415 duration_seconds: row.get(19)?,
1416 total_input_tokens: row.get(20)?,
1417 total_output_tokens: row.get(21)?,
1418 git_remote: row.get(22)?,
1419 git_branch: row.get(23)?,
1420 git_commit: row.get(24)?,
1421 git_repo_name: row.get(25)?,
1422 pr_number: row.get(26)?,
1423 pr_url: row.get(27)?,
1424 working_directory: row.get(28)?,
1425 files_modified: row.get(29)?,
1426 files_read: row.get(30)?,
1427 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1428 max_active_agents: row.get(32).unwrap_or(1),
1429 is_auxiliary: row.get::<_, i64>(33).unwrap_or(0) != 0,
1430 })
1431}
1432
1433fn default_db_path() -> Result<PathBuf> {
1434 let home = std::env::var("HOME")
1435 .or_else(|_| std::env::var("USERPROFILE"))
1436 .context("Could not determine home directory")?;
1437 Ok(PathBuf::from(home)
1438 .join(".local")
1439 .join("share")
1440 .join("opensession")
1441 .join("local.db"))
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446 use super::*;
1447
1448 use std::fs::{create_dir_all, write};
1449 use tempfile::tempdir;
1450
1451 fn test_db() -> LocalDb {
1452 let dir = tempdir().unwrap();
1453 let path = dir.keep().join("test.db");
1454 LocalDb::open_path(&path).unwrap()
1455 }
1456
1457 fn temp_root() -> tempfile::TempDir {
1458 tempdir().unwrap()
1459 }
1460
1461 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1462 LocalSessionRow {
1463 id: id.to_string(),
1464 source_path: source_path.map(String::from),
1465 sync_status: "local_only".to_string(),
1466 last_synced_at: None,
1467 user_id: None,
1468 nickname: None,
1469 team_id: None,
1470 tool: tool.to_string(),
1471 agent_provider: None,
1472 agent_model: None,
1473 title: Some("test".to_string()),
1474 description: None,
1475 tags: None,
1476 created_at: "2024-01-01T00:00:00Z".to_string(),
1477 uploaded_at: None,
1478 message_count: 0,
1479 user_message_count: 0,
1480 task_count: 0,
1481 event_count: 0,
1482 duration_seconds: 0,
1483 total_input_tokens: 0,
1484 total_output_tokens: 0,
1485 git_remote: None,
1486 git_branch: None,
1487 git_commit: None,
1488 git_repo_name: None,
1489 pr_number: None,
1490 pr_url: None,
1491 working_directory: None,
1492 files_modified: None,
1493 files_read: None,
1494 has_errors: false,
1495 max_active_agents: 1,
1496 is_auxiliary: false,
1497 }
1498 }
1499
1500 #[test]
1501 fn test_open_and_schema() {
1502 let _db = test_db();
1503 }
1504
1505 #[test]
1506 fn test_open_repairs_codex_tool_hint_from_source_path() {
1507 let dir = tempfile::tempdir().unwrap();
1508 let path = dir.path().join("repair.db");
1509
1510 {
1511 let _ = LocalDb::open_path(&path).unwrap();
1512 }
1513
1514 {
1515 let conn = Connection::open(&path).unwrap();
1516 conn.execute(
1517 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1518 params!["rollout-repair", "2026-02-20T00:00:00Z"],
1519 )
1520 .unwrap();
1521 conn.execute(
1522 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1523 params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1524 )
1525 .unwrap();
1526 }
1527
1528 let db = LocalDb::open_path(&path).unwrap();
1529 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1530 let row = rows
1531 .iter()
1532 .find(|row| row.id == "rollout-repair")
1533 .expect("repaired row");
1534 assert_eq!(row.tool, "codex");
1535 }
1536
1537 #[test]
1538 fn test_upsert_local_session_normalizes_tool_from_source_path() {
1539 let db = test_db();
1540 let mut session = Session::new(
1541 "rollout-upsert".to_string(),
1542 opensession_core::trace::Agent {
1543 provider: "openai".to_string(),
1544 model: "gpt-5".to_string(),
1545 tool: "claude-code".to_string(),
1546 tool_version: None,
1547 },
1548 );
1549 session.stats.event_count = 1;
1550
1551 db.upsert_local_session(
1552 &session,
1553 "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
1554 &crate::git::GitContext::default(),
1555 )
1556 .unwrap();
1557
1558 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1559 let row = rows
1560 .iter()
1561 .find(|row| row.id == "rollout-upsert")
1562 .expect("upserted row");
1563 assert_eq!(row.tool, "codex");
1564 }
1565
1566 #[test]
1567 fn test_open_backfills_legacy_sessions_columns() {
1568 let dir = tempfile::tempdir().unwrap();
1569 let path = dir.path().join("legacy.db");
1570 {
1571 let conn = Connection::open(&path).unwrap();
1572 conn.execute_batch(
1573 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1574 INSERT INTO sessions (id) VALUES ('legacy-1');",
1575 )
1576 .unwrap();
1577 }
1578
1579 let db = LocalDb::open_path(&path).unwrap();
1580 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1581 assert_eq!(rows.len(), 1);
1582 assert_eq!(rows[0].id, "legacy-1");
1583 assert_eq!(rows[0].user_message_count, 0);
1584 }
1585
1586 #[test]
1587 fn test_open_rotates_incompatible_legacy_schema() {
1588 let dir = tempfile::tempdir().unwrap();
1589 let path = dir.path().join("broken.db");
1590 {
1591 let conn = Connection::open(&path).unwrap();
1592 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1593 .unwrap();
1594 }
1595
1596 let db = LocalDb::open_path(&path).unwrap();
1597 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1598 assert!(rows.is_empty());
1599
1600 let rotated = std::fs::read_dir(dir.path())
1601 .unwrap()
1602 .filter_map(Result::ok)
1603 .any(|entry| {
1604 let name = entry.file_name();
1605 let name = name.to_string_lossy();
1606 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1607 });
1608 assert!(rotated, "expected rotated legacy backup file");
1609 }
1610
1611 #[test]
1612 fn test_is_opencode_child_session() {
1613 let root = temp_root();
1614 let dir = root.path().join("sessions");
1615 create_dir_all(&dir).unwrap();
1616 let parent_session = dir.join("parent.json");
1617 write(
1618 &parent_session,
1619 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1620 )
1621 .unwrap();
1622 let child_session = dir.join("child.json");
1623 write(
1624 &child_session,
1625 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1626 )
1627 .unwrap();
1628
1629 let parent = make_row(
1630 "ses_parent",
1631 "opencode",
1632 Some(parent_session.to_str().unwrap()),
1633 );
1634 let mut child = make_row(
1635 "ses_child",
1636 "opencode",
1637 Some(child_session.to_str().unwrap()),
1638 );
1639 child.is_auxiliary = true;
1640 let mut codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1641 codex.is_auxiliary = true;
1642
1643 assert!(!is_opencode_child_session(&parent));
1644 assert!(is_opencode_child_session(&child));
1645 assert!(!is_opencode_child_session(&codex));
1646 }
1647
1648 #[allow(deprecated)]
1649 #[test]
1650 fn test_parse_opencode_parent_session_id_aliases() {
1651 let root = temp_root();
1652 let dir = root.path().join("session-aliases");
1653 create_dir_all(&dir).unwrap();
1654 let child_session = dir.join("child.json");
1655 write(
1656 &child_session,
1657 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1658 )
1659 .unwrap();
1660 assert_eq!(
1661 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1662 Some("ses_parent")
1663 );
1664 }
1665
1666 #[allow(deprecated)]
1667 #[test]
1668 fn test_parse_opencode_parent_session_id_nested_metadata() {
1669 let root = temp_root();
1670 let dir = root.path().join("session-nested");
1671 create_dir_all(&dir).unwrap();
1672 let child_session = dir.join("child.json");
1673 write(
1674 &child_session,
1675 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1676 )
1677 .unwrap();
1678 assert_eq!(
1679 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1680 Some("ses_parent")
1681 );
1682 }
1683
1684 #[test]
1685 fn test_hide_opencode_child_sessions() {
1686 let root = temp_root();
1687 let dir = root.path().join("sessions");
1688 create_dir_all(&dir).unwrap();
1689 let parent_session = dir.join("parent.json");
1690 let child_session = dir.join("child.json");
1691 let orphan_session = dir.join("orphan.json");
1692
1693 write(
1694 &parent_session,
1695 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1696 )
1697 .unwrap();
1698 write(
1699 &child_session,
1700 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1701 )
1702 .unwrap();
1703 write(
1704 &orphan_session,
1705 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1706 )
1707 .unwrap();
1708
1709 let rows = vec![
1710 {
1711 let mut row = make_row(
1712 "ses_child",
1713 "opencode",
1714 Some(child_session.to_str().unwrap()),
1715 );
1716 row.is_auxiliary = true;
1717 row
1718 },
1719 make_row(
1720 "ses_parent",
1721 "opencode",
1722 Some(parent_session.to_str().unwrap()),
1723 ),
1724 {
1725 let mut row = make_row("ses_other", "codex", None);
1726 row.user_message_count = 1;
1727 row
1728 },
1729 make_row(
1730 "ses_orphan",
1731 "opencode",
1732 Some(orphan_session.to_str().unwrap()),
1733 ),
1734 ];
1735
1736 let filtered = hide_opencode_child_sessions(rows);
1737 assert_eq!(filtered.len(), 3);
1738 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1739 }
1740
1741 #[test]
1742 fn test_sync_cursor() {
1743 let db = test_db();
1744 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1745 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1746 assert_eq!(
1747 db.get_sync_cursor("team1").unwrap(),
1748 Some("2024-01-01T00:00:00Z".to_string())
1749 );
1750 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1752 assert_eq!(
1753 db.get_sync_cursor("team1").unwrap(),
1754 Some("2024-06-01T00:00:00Z".to_string())
1755 );
1756 }
1757
1758 #[test]
1759 fn test_body_cache() {
1760 let db = test_db();
1761 assert_eq!(db.get_cached_body("s1").unwrap(), None);
1762 db.cache_body("s1", b"hello world").unwrap();
1763 assert_eq!(
1764 db.get_cached_body("s1").unwrap(),
1765 Some(b"hello world".to_vec())
1766 );
1767 }
1768
1769 #[test]
1770 fn test_local_migrations_are_loaded_from_api_crate() {
1771 let migration_names: Vec<&str> = super::LOCAL_MIGRATIONS
1772 .iter()
1773 .map(|(name, _)| *name)
1774 .collect();
1775 assert!(
1776 migration_names.contains(&"local_0003_session_flags"),
1777 "expected local_0003_session_flags migration from opensession-api"
1778 );
1779
1780 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1781 let migrations_dir = manifest_dir.join("migrations");
1782 if migrations_dir.exists() {
1783 let sql_files = std::fs::read_dir(migrations_dir)
1784 .expect("read local-db migrations directory")
1785 .filter_map(Result::ok)
1786 .map(|entry| entry.file_name().to_string_lossy().to_string())
1787 .filter(|name| name.ends_with(".sql"))
1788 .collect::<Vec<_>>();
1789 assert!(
1790 sql_files.is_empty(),
1791 "local-db must not ship duplicated migration SQL files"
1792 );
1793 }
1794 }
1795
1796 #[test]
1797 fn test_local_0003_session_flags_backfills_known_auxiliary_rows() {
1798 let dir = tempdir().unwrap();
1799 let path = dir.path().join("local-legacy.db");
1800
1801 {
1802 let conn = Connection::open(&path).unwrap();
1803 conn.execute_batch("PRAGMA foreign_keys=OFF;").unwrap();
1804 conn.execute_batch(
1805 "CREATE TABLE IF NOT EXISTS _migrations (
1806 id INTEGER PRIMARY KEY,
1807 name TEXT NOT NULL UNIQUE,
1808 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1809 );",
1810 )
1811 .unwrap();
1812
1813 for (name, sql) in super::MIGRATIONS.iter().chain(
1814 super::LOCAL_MIGRATIONS
1815 .iter()
1816 .filter(|(name, _)| *name != "local_0003_session_flags"),
1817 ) {
1818 conn.execute_batch(sql).unwrap();
1819 conn.execute(
1820 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1821 params![name],
1822 )
1823 .unwrap();
1824 }
1825
1826 conn.execute(
1827 "INSERT INTO sessions \
1828 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1829 VALUES (?1, 'personal', 'codex', '2026-02-20T00:00:00Z', '', 12, 6, 6, 20)",
1830 params!["primary-visible"],
1831 )
1832 .unwrap();
1833 conn.execute(
1834 "INSERT INTO sessions \
1835 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1836 VALUES (?1, 'personal', 'opencode', '2026-02-20T00:00:00Z', '', 2, 0, 2, 6)",
1837 params!["opencode-heuristic-child"],
1838 )
1839 .unwrap();
1840 conn.execute(
1841 "INSERT INTO sessions \
1842 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1843 VALUES (?1, 'personal', 'claude-code', '2026-02-20T00:00:00Z', '', 3, 1, 3, 12)",
1844 params!["claude-subagent-child"],
1845 )
1846 .unwrap();
1847
1848 conn.execute(
1849 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1850 params![
1851 "opencode-heuristic-child",
1852 "/Users/test/.opencode/sessions/opencode-heuristic-child.json"
1853 ],
1854 )
1855 .unwrap();
1856 conn.execute(
1857 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1858 params![
1859 "claude-subagent-child",
1860 "/Users/test/.claude/projects/foo/subagents/agent-1.jsonl"
1861 ],
1862 )
1863 .unwrap();
1864 }
1865
1866 let db = LocalDb::open_path(&path).unwrap();
1867
1868 let flags = {
1869 let conn = db.conn();
1870 let mut stmt = conn
1871 .prepare("SELECT id, COALESCE(is_auxiliary, 0) FROM sessions ORDER BY id")
1872 .unwrap();
1873 let rows = stmt
1874 .query_map([], |row| {
1875 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1876 })
1877 .unwrap();
1878 rows.collect::<Result<Vec<_>, _>>().unwrap()
1879 };
1880
1881 assert_eq!(
1882 flags,
1883 vec![
1884 ("claude-subagent-child".to_string(), 1),
1885 ("opencode-heuristic-child".to_string(), 1),
1886 ("primary-visible".to_string(), 0),
1887 ]
1888 );
1889
1890 let visible = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1891 assert_eq!(visible.len(), 1);
1892 assert_eq!(visible[0].id, "primary-visible");
1893 }
1894
1895 #[test]
1896 fn test_upsert_remote_session() {
1897 let db = test_db();
1898 let summary = RemoteSessionSummary {
1899 id: "remote-1".to_string(),
1900 user_id: Some("u1".to_string()),
1901 nickname: Some("alice".to_string()),
1902 team_id: "t1".to_string(),
1903 tool: "claude-code".to_string(),
1904 agent_provider: None,
1905 agent_model: None,
1906 title: Some("Test session".to_string()),
1907 description: None,
1908 tags: None,
1909 created_at: "2024-01-01T00:00:00Z".to_string(),
1910 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
1911 message_count: 10,
1912 task_count: 2,
1913 event_count: 20,
1914 duration_seconds: 300,
1915 total_input_tokens: 1000,
1916 total_output_tokens: 500,
1917 git_remote: None,
1918 git_branch: None,
1919 git_commit: None,
1920 git_repo_name: None,
1921 pr_number: None,
1922 pr_url: None,
1923 working_directory: None,
1924 files_modified: None,
1925 files_read: None,
1926 has_errors: false,
1927 max_active_agents: 1,
1928 };
1929 db.upsert_remote_session(&summary).unwrap();
1930
1931 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1932 assert_eq!(sessions.len(), 1);
1933 assert_eq!(sessions[0].id, "remote-1");
1934 assert_eq!(sessions[0].sync_status, "remote_only");
1935 assert_eq!(sessions[0].nickname, None); assert!(!sessions[0].is_auxiliary);
1937 }
1938
1939 #[test]
1940 fn test_list_filter_by_repo() {
1941 let db = test_db();
1942 let summary1 = RemoteSessionSummary {
1944 id: "s1".to_string(),
1945 user_id: None,
1946 nickname: None,
1947 team_id: "t1".to_string(),
1948 tool: "claude-code".to_string(),
1949 agent_provider: None,
1950 agent_model: None,
1951 title: Some("Session 1".to_string()),
1952 description: None,
1953 tags: None,
1954 created_at: "2024-01-01T00:00:00Z".to_string(),
1955 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
1956 message_count: 5,
1957 task_count: 0,
1958 event_count: 10,
1959 duration_seconds: 60,
1960 total_input_tokens: 100,
1961 total_output_tokens: 50,
1962 git_remote: None,
1963 git_branch: None,
1964 git_commit: None,
1965 git_repo_name: None,
1966 pr_number: None,
1967 pr_url: None,
1968 working_directory: None,
1969 files_modified: None,
1970 files_read: None,
1971 has_errors: false,
1972 max_active_agents: 1,
1973 };
1974 db.upsert_remote_session(&summary1).unwrap();
1975
1976 let filter = LocalSessionFilter {
1978 team_id: Some("t1".to_string()),
1979 ..Default::default()
1980 };
1981 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
1982
1983 let filter = LocalSessionFilter {
1984 team_id: Some("t999".to_string()),
1985 ..Default::default()
1986 };
1987 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
1988 }
1989
1990 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
1993 RemoteSessionSummary {
1994 id: id.to_string(),
1995 user_id: None,
1996 nickname: None,
1997 team_id: "t1".to_string(),
1998 tool: tool.to_string(),
1999 agent_provider: Some("anthropic".to_string()),
2000 agent_model: Some("claude-opus-4-6".to_string()),
2001 title: Some(title.to_string()),
2002 description: None,
2003 tags: None,
2004 created_at: created_at.to_string(),
2005 uploaded_at: created_at.to_string(),
2006 message_count: 5,
2007 task_count: 1,
2008 event_count: 10,
2009 duration_seconds: 300,
2010 total_input_tokens: 1000,
2011 total_output_tokens: 500,
2012 git_remote: None,
2013 git_branch: None,
2014 git_commit: None,
2015 git_repo_name: None,
2016 pr_number: None,
2017 pr_url: None,
2018 working_directory: None,
2019 files_modified: None,
2020 files_read: None,
2021 has_errors: false,
2022 max_active_agents: 1,
2023 }
2024 }
2025
2026 fn seed_sessions(db: &LocalDb) {
2027 db.upsert_remote_session(&make_summary(
2029 "s1",
2030 "claude-code",
2031 "First session",
2032 "2024-01-01T00:00:00Z",
2033 ))
2034 .unwrap();
2035 db.upsert_remote_session(&make_summary(
2036 "s2",
2037 "claude-code",
2038 "JWT auth work",
2039 "2024-01-02T00:00:00Z",
2040 ))
2041 .unwrap();
2042 db.upsert_remote_session(&make_summary(
2043 "s3",
2044 "gemini",
2045 "Gemini test",
2046 "2024-01-03T00:00:00Z",
2047 ))
2048 .unwrap();
2049 db.upsert_remote_session(&make_summary(
2050 "s4",
2051 "claude-code",
2052 "Error handling",
2053 "2024-01-04T00:00:00Z",
2054 ))
2055 .unwrap();
2056 db.upsert_remote_session(&make_summary(
2057 "s5",
2058 "claude-code",
2059 "Final polish",
2060 "2024-01-05T00:00:00Z",
2061 ))
2062 .unwrap();
2063 }
2064
2065 #[test]
2068 fn test_log_no_filters() {
2069 let db = test_db();
2070 seed_sessions(&db);
2071 let filter = LogFilter::default();
2072 let results = db.list_sessions_log(&filter).unwrap();
2073 assert_eq!(results.len(), 5);
2074 assert_eq!(results[0].id, "s5");
2076 assert_eq!(results[4].id, "s1");
2077 }
2078
2079 #[test]
2080 fn test_log_filter_by_tool() {
2081 let db = test_db();
2082 seed_sessions(&db);
2083 let filter = LogFilter {
2084 tool: Some("claude-code".to_string()),
2085 ..Default::default()
2086 };
2087 let results = db.list_sessions_log(&filter).unwrap();
2088 assert_eq!(results.len(), 4);
2089 assert!(results.iter().all(|s| s.tool == "claude-code"));
2090 }
2091
2092 #[test]
2093 fn test_log_filter_by_model_wildcard() {
2094 let db = test_db();
2095 seed_sessions(&db);
2096 let filter = LogFilter {
2097 model: Some("claude*".to_string()),
2098 ..Default::default()
2099 };
2100 let results = db.list_sessions_log(&filter).unwrap();
2101 assert_eq!(results.len(), 5); }
2103
2104 #[test]
2105 fn test_log_filter_since() {
2106 let db = test_db();
2107 seed_sessions(&db);
2108 let filter = LogFilter {
2109 since: Some("2024-01-03T00:00:00Z".to_string()),
2110 ..Default::default()
2111 };
2112 let results = db.list_sessions_log(&filter).unwrap();
2113 assert_eq!(results.len(), 3); }
2115
2116 #[test]
2117 fn test_log_filter_before() {
2118 let db = test_db();
2119 seed_sessions(&db);
2120 let filter = LogFilter {
2121 before: Some("2024-01-03T00:00:00Z".to_string()),
2122 ..Default::default()
2123 };
2124 let results = db.list_sessions_log(&filter).unwrap();
2125 assert_eq!(results.len(), 2); }
2127
2128 #[test]
2129 fn test_log_filter_since_and_before() {
2130 let db = test_db();
2131 seed_sessions(&db);
2132 let filter = LogFilter {
2133 since: Some("2024-01-02T00:00:00Z".to_string()),
2134 before: Some("2024-01-04T00:00:00Z".to_string()),
2135 ..Default::default()
2136 };
2137 let results = db.list_sessions_log(&filter).unwrap();
2138 assert_eq!(results.len(), 2); }
2140
2141 #[test]
2142 fn test_log_filter_grep() {
2143 let db = test_db();
2144 seed_sessions(&db);
2145 let filter = LogFilter {
2146 grep: Some("JWT".to_string()),
2147 ..Default::default()
2148 };
2149 let results = db.list_sessions_log(&filter).unwrap();
2150 assert_eq!(results.len(), 1);
2151 assert_eq!(results[0].id, "s2");
2152 }
2153
2154 #[test]
2155 fn test_log_limit_and_offset() {
2156 let db = test_db();
2157 seed_sessions(&db);
2158 let filter = LogFilter {
2159 limit: Some(2),
2160 offset: Some(1),
2161 ..Default::default()
2162 };
2163 let results = db.list_sessions_log(&filter).unwrap();
2164 assert_eq!(results.len(), 2);
2165 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2167 }
2168
2169 #[test]
2170 fn test_log_limit_only() {
2171 let db = test_db();
2172 seed_sessions(&db);
2173 let filter = LogFilter {
2174 limit: Some(3),
2175 ..Default::default()
2176 };
2177 let results = db.list_sessions_log(&filter).unwrap();
2178 assert_eq!(results.len(), 3);
2179 }
2180
2181 #[test]
2182 fn test_list_sessions_limit_offset() {
2183 let db = test_db();
2184 seed_sessions(&db);
2185 let filter = LocalSessionFilter {
2186 limit: Some(2),
2187 offset: Some(1),
2188 ..Default::default()
2189 };
2190 let results = db.list_sessions(&filter).unwrap();
2191 assert_eq!(results.len(), 2);
2192 assert_eq!(results[0].id, "s4");
2193 assert_eq!(results[1].id, "s3");
2194 }
2195
2196 #[test]
2197 fn test_count_sessions_filtered() {
2198 let db = test_db();
2199 seed_sessions(&db);
2200 let count = db
2201 .count_sessions_filtered(&LocalSessionFilter::default())
2202 .unwrap();
2203 assert_eq!(count, 5);
2204 }
2205
2206 #[test]
2207 fn test_list_and_count_filters_match_when_auxiliary_rows_exist() {
2208 let db = test_db();
2209 seed_sessions(&db);
2210 db.conn()
2211 .execute(
2212 "UPDATE sessions SET is_auxiliary = 1 WHERE id IN ('s2', 's3')",
2213 [],
2214 )
2215 .unwrap();
2216
2217 let default_filter = LocalSessionFilter::default();
2218 let rows = db.list_sessions(&default_filter).unwrap();
2219 let count = db.count_sessions_filtered(&default_filter).unwrap();
2220 assert_eq!(rows.len() as i64, count);
2221 assert!(rows.iter().all(|row| !row.is_auxiliary));
2222
2223 let gemini_filter = LocalSessionFilter {
2224 tool: Some("gemini".to_string()),
2225 ..Default::default()
2226 };
2227 let gemini_rows = db.list_sessions(&gemini_filter).unwrap();
2228 let gemini_count = db.count_sessions_filtered(&gemini_filter).unwrap();
2229 assert_eq!(gemini_rows.len() as i64, gemini_count);
2230 assert!(gemini_rows.is_empty());
2231 assert_eq!(gemini_count, 0);
2232 }
2233
2234 #[test]
2235 fn test_list_working_directories_distinct_non_empty() {
2236 let db = test_db();
2237
2238 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2239 a.working_directory = Some("/tmp/repo-a".to_string());
2240 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2241 b.working_directory = Some("/tmp/repo-a".to_string());
2242 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2243 c.working_directory = Some("/tmp/repo-b".to_string());
2244 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2245 d.working_directory = Some("".to_string());
2246
2247 db.upsert_remote_session(&a).unwrap();
2248 db.upsert_remote_session(&b).unwrap();
2249 db.upsert_remote_session(&c).unwrap();
2250 db.upsert_remote_session(&d).unwrap();
2251
2252 let dirs = db.list_working_directories().unwrap();
2253 assert_eq!(
2254 dirs,
2255 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2256 );
2257 }
2258
2259 #[test]
2260 fn test_list_session_tools() {
2261 let db = test_db();
2262 seed_sessions(&db);
2263 let tools = db
2264 .list_session_tools(&LocalSessionFilter::default())
2265 .unwrap();
2266 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2267 }
2268
2269 #[test]
2270 fn test_log_combined_filters() {
2271 let db = test_db();
2272 seed_sessions(&db);
2273 let filter = LogFilter {
2274 tool: Some("claude-code".to_string()),
2275 since: Some("2024-01-03T00:00:00Z".to_string()),
2276 limit: Some(1),
2277 ..Default::default()
2278 };
2279 let results = db.list_sessions_log(&filter).unwrap();
2280 assert_eq!(results.len(), 1);
2281 assert_eq!(results[0].id, "s5"); }
2283
2284 #[test]
2287 fn test_get_session_by_offset() {
2288 let db = test_db();
2289 seed_sessions(&db);
2290 let row = db.get_session_by_offset(0).unwrap().unwrap();
2291 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2293 assert_eq!(row.id, "s3");
2294 assert!(db.get_session_by_offset(10).unwrap().is_none());
2295 }
2296
2297 #[test]
2298 fn test_get_session_by_tool_offset() {
2299 let db = test_db();
2300 seed_sessions(&db);
2301 let row = db
2302 .get_session_by_tool_offset("claude-code", 0)
2303 .unwrap()
2304 .unwrap();
2305 assert_eq!(row.id, "s5");
2306 let row = db
2307 .get_session_by_tool_offset("claude-code", 1)
2308 .unwrap()
2309 .unwrap();
2310 assert_eq!(row.id, "s4");
2311 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2312 assert_eq!(row.id, "s3");
2313 assert!(db
2314 .get_session_by_tool_offset("gemini", 1)
2315 .unwrap()
2316 .is_none());
2317 }
2318
2319 #[test]
2320 fn test_get_sessions_latest() {
2321 let db = test_db();
2322 seed_sessions(&db);
2323 let rows = db.get_sessions_latest(3).unwrap();
2324 assert_eq!(rows.len(), 3);
2325 assert_eq!(rows[0].id, "s5");
2326 assert_eq!(rows[1].id, "s4");
2327 assert_eq!(rows[2].id, "s3");
2328 }
2329
2330 #[test]
2331 fn test_get_sessions_by_tool_latest() {
2332 let db = test_db();
2333 seed_sessions(&db);
2334 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2335 assert_eq!(rows.len(), 2);
2336 assert_eq!(rows[0].id, "s5");
2337 assert_eq!(rows[1].id, "s4");
2338 }
2339
2340 #[test]
2341 fn test_get_sessions_latest_more_than_available() {
2342 let db = test_db();
2343 seed_sessions(&db);
2344 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2345 assert_eq!(rows.len(), 1); }
2347
2348 #[test]
2349 fn test_session_count() {
2350 let db = test_db();
2351 assert_eq!(db.session_count().unwrap(), 0);
2352 seed_sessions(&db);
2353 assert_eq!(db.session_count().unwrap(), 5);
2354 }
2355
2356 #[test]
2359 fn test_link_commit_session() {
2360 let db = test_db();
2361 seed_sessions(&db);
2362 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2363 .unwrap();
2364
2365 let commits = db.get_commits_by_session("s1").unwrap();
2366 assert_eq!(commits.len(), 1);
2367 assert_eq!(commits[0].commit_hash, "abc123");
2368 assert_eq!(commits[0].session_id, "s1");
2369 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2370 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2371
2372 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2373 assert_eq!(sessions.len(), 1);
2374 assert_eq!(sessions[0].id, "s1");
2375 }
2376
2377 #[test]
2378 fn test_get_sessions_by_commit() {
2379 let db = test_db();
2380 seed_sessions(&db);
2381 db.link_commit_session("abc123", "s1", None, None).unwrap();
2383 db.link_commit_session("abc123", "s2", None, None).unwrap();
2384 db.link_commit_session("abc123", "s3", None, None).unwrap();
2385
2386 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2387 assert_eq!(sessions.len(), 3);
2388 assert_eq!(sessions[0].id, "s3");
2390 assert_eq!(sessions[1].id, "s2");
2391 assert_eq!(sessions[2].id, "s1");
2392 }
2393
2394 #[test]
2395 fn test_get_commits_by_session() {
2396 let db = test_db();
2397 seed_sessions(&db);
2398 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2400 .unwrap();
2401 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2402 .unwrap();
2403 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2404 .unwrap();
2405
2406 let commits = db.get_commits_by_session("s1").unwrap();
2407 assert_eq!(commits.len(), 3);
2408 assert!(commits.iter().all(|c| c.session_id == "s1"));
2410 }
2411
2412 #[test]
2413 fn test_duplicate_link_ignored() {
2414 let db = test_db();
2415 seed_sessions(&db);
2416 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2417 .unwrap();
2418 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2420 .unwrap();
2421
2422 let commits = db.get_commits_by_session("s1").unwrap();
2423 assert_eq!(commits.len(), 1);
2424 }
2425
2426 #[test]
2427 fn test_log_filter_by_commit() {
2428 let db = test_db();
2429 seed_sessions(&db);
2430 db.link_commit_session("abc123", "s2", None, None).unwrap();
2431 db.link_commit_session("abc123", "s4", None, None).unwrap();
2432
2433 let filter = LogFilter {
2434 commit: Some("abc123".to_string()),
2435 ..Default::default()
2436 };
2437 let results = db.list_sessions_log(&filter).unwrap();
2438 assert_eq!(results.len(), 2);
2439 assert_eq!(results[0].id, "s4");
2440 assert_eq!(results[1].id, "s2");
2441
2442 let filter = LogFilter {
2444 commit: Some("nonexistent".to_string()),
2445 ..Default::default()
2446 };
2447 let results = db.list_sessions_log(&filter).unwrap();
2448 assert_eq!(results.len(), 0);
2449 }
2450}