1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0003_max_active_agents",
21 include_str!("../migrations/0003_max_active_agents.sql"),
22 ),
23 (
24 "0004_oauth_states_provider",
25 include_str!("../migrations/0004_oauth_states_provider.sql"),
26 ),
27 (
28 "0005_sessions_body_url_backfill",
29 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
30 ),
31 (
32 "0006_sessions_remove_fk_constraints",
33 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
34 ),
35 (
36 "0007_sessions_list_perf_indexes",
37 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
38 ),
39 (
40 "0009_session_score_plugin",
41 include_str!("../migrations/0009_session_score_plugin.sql"),
42 ),
43 (
44 "0010_api_keys_issuance",
45 include_str!("../migrations/0010_api_keys_issuance.sql"),
46 ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50 (
51 "local_0001_schema",
52 include_str!("../migrations/local_0001_schema.sql"),
53 ),
54 (
55 "local_0002_drop_unused_local_sessions",
56 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57 ),
58];
59
60#[derive(Debug, Clone)]
62pub struct LocalSessionRow {
63 pub id: String,
64 pub source_path: Option<String>,
65 pub sync_status: String,
66 pub last_synced_at: Option<String>,
67 pub user_id: Option<String>,
68 pub nickname: Option<String>,
69 pub team_id: Option<String>,
70 pub tool: String,
71 pub agent_provider: Option<String>,
72 pub agent_model: Option<String>,
73 pub title: Option<String>,
74 pub description: Option<String>,
75 pub tags: Option<String>,
76 pub created_at: String,
77 pub uploaded_at: Option<String>,
78 pub message_count: i64,
79 pub user_message_count: i64,
80 pub task_count: i64,
81 pub event_count: i64,
82 pub duration_seconds: i64,
83 pub total_input_tokens: i64,
84 pub total_output_tokens: i64,
85 pub git_remote: Option<String>,
86 pub git_branch: Option<String>,
87 pub git_commit: Option<String>,
88 pub git_repo_name: Option<String>,
89 pub pr_number: Option<i64>,
90 pub pr_url: Option<String>,
91 pub working_directory: Option<String>,
92 pub files_modified: Option<String>,
93 pub files_read: Option<String>,
94 pub has_errors: bool,
95 pub max_active_agents: i64,
96}
97
98#[derive(Debug, Clone)]
100pub struct CommitLink {
101 pub commit_hash: String,
102 pub session_id: String,
103 pub repo_path: Option<String>,
104 pub branch: Option<String>,
105 pub created_at: String,
106}
107
108pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
110 if row.tool != "opencode" {
111 return false;
112 }
113
114 if row.user_message_count <= 0
118 && row.message_count <= 4
119 && row.task_count <= 4
120 && row.event_count > 0
121 && row.event_count <= 16
122 {
123 return true;
124 }
125
126 if is_opencode_subagent_source(row.source_path.as_deref()) {
127 return true;
128 }
129
130 let source_path = match row.source_path.as_deref() {
131 Some(path) if !path.trim().is_empty() => path,
132 _ => return false,
133 };
134
135 parse_opencode_parent_session_id(source_path)
136 .is_some_and(|parent_id| !parent_id.trim().is_empty())
137}
138
139pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
141 let text = fs::read_to_string(source_path).ok()?;
142 let json: Value = serde_json::from_str(&text).ok()?;
143 lookup_parent_session_id(&json)
144}
145
146fn lookup_parent_session_id(value: &Value) -> Option<String> {
147 match value {
148 Value::Object(obj) => {
149 for (key, value) in obj {
150 if is_parent_id_key(key) {
151 if let Some(parent_id) = value.as_str() {
152 let parent_id = parent_id.trim();
153 if !parent_id.is_empty() {
154 return Some(parent_id.to_string());
155 }
156 }
157 }
158 if let Some(parent_id) = lookup_parent_session_id(value) {
159 return Some(parent_id);
160 }
161 }
162 None
163 }
164 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
165 _ => None,
166 }
167}
168
169fn is_parent_id_key(key: &str) -> bool {
170 let flat = key
171 .chars()
172 .filter(|c| c.is_ascii_alphanumeric())
173 .map(|c| c.to_ascii_lowercase())
174 .collect::<String>();
175
176 flat == "parentid"
177 || flat == "parentuuid"
178 || flat == "parentsessionid"
179 || flat == "parentsessionuuid"
180 || flat.ends_with("parentsessionid")
181 || (flat.contains("parent") && flat.ends_with("id"))
182 || (flat.contains("parent") && flat.ends_with("uuid"))
183}
184
185pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
187 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
188 rows
189}
190
191fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
192 is_subagent_source(source_path)
193}
194
195fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
196 if row.tool != "claude-code" {
197 return false;
198 }
199
200 is_subagent_source(row.source_path.as_deref())
201}
202
203fn is_subagent_source(source_path: Option<&str>) -> bool {
204 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
205 return false;
206 };
207
208 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
209 return true;
210 }
211
212 let filename = match std::path::Path::new(&source_path).file_name() {
213 Some(name) => name.to_string_lossy(),
214 None => return false,
215 };
216
217 filename.starts_with("agent-")
218 || filename.starts_with("agent_")
219 || filename.starts_with("subagent-")
220 || filename.starts_with("subagent_")
221}
222
223fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
224 let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
225
226 if source_path.contains("/.codex/sessions/")
227 || source_path.contains("\\.codex\\sessions\\")
228 || source_path.contains("/codex/sessions/")
229 || source_path.contains("\\codex\\sessions\\")
230 {
231 return Some("codex");
232 }
233
234 if source_path.contains("/.claude/projects/")
235 || source_path.contains("\\.claude\\projects\\")
236 || source_path.contains("/claude/projects/")
237 || source_path.contains("\\claude\\projects\\")
238 {
239 return Some("claude-code");
240 }
241
242 None
243}
244
245fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
246 infer_tool_from_source_path(source_path)
247 .unwrap_or(current_tool)
248 .to_string()
249}
250
251#[derive(Debug, Clone)]
253pub struct LocalSessionFilter {
254 pub team_id: Option<String>,
255 pub sync_status: Option<String>,
256 pub git_repo_name: Option<String>,
257 pub search: Option<String>,
258 pub tool: Option<String>,
259 pub sort: LocalSortOrder,
260 pub time_range: LocalTimeRange,
261 pub limit: Option<u32>,
262 pub offset: Option<u32>,
263}
264
265impl Default for LocalSessionFilter {
266 fn default() -> Self {
267 Self {
268 team_id: None,
269 sync_status: None,
270 git_repo_name: None,
271 search: None,
272 tool: None,
273 sort: LocalSortOrder::Recent,
274 time_range: LocalTimeRange::All,
275 limit: None,
276 offset: None,
277 }
278 }
279}
280
281#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalSortOrder {
284 #[default]
285 Recent,
286 Popular,
287 Longest,
288}
289
290#[derive(Debug, Clone, Default, PartialEq, Eq)]
292pub enum LocalTimeRange {
293 Hours24,
294 Days7,
295 Days30,
296 #[default]
297 All,
298}
299
300#[derive(Debug, Clone)]
302pub struct RemoteSessionSummary {
303 pub id: String,
304 pub user_id: Option<String>,
305 pub nickname: Option<String>,
306 pub team_id: String,
307 pub tool: String,
308 pub agent_provider: Option<String>,
309 pub agent_model: Option<String>,
310 pub title: Option<String>,
311 pub description: Option<String>,
312 pub tags: Option<String>,
313 pub created_at: String,
314 pub uploaded_at: String,
315 pub message_count: i64,
316 pub task_count: i64,
317 pub event_count: i64,
318 pub duration_seconds: i64,
319 pub total_input_tokens: i64,
320 pub total_output_tokens: i64,
321 pub git_remote: Option<String>,
322 pub git_branch: Option<String>,
323 pub git_commit: Option<String>,
324 pub git_repo_name: Option<String>,
325 pub pr_number: Option<i64>,
326 pub pr_url: Option<String>,
327 pub working_directory: Option<String>,
328 pub files_modified: Option<String>,
329 pub files_read: Option<String>,
330 pub has_errors: bool,
331 pub max_active_agents: i64,
332}
333
334#[derive(Debug, Default)]
336pub struct LogFilter {
337 pub tool: Option<String>,
339 pub model: Option<String>,
341 pub since: Option<String>,
343 pub before: Option<String>,
345 pub touches: Option<String>,
347 pub grep: Option<String>,
349 pub has_errors: Option<bool>,
351 pub working_directory: Option<String>,
353 pub git_repo_name: Option<String>,
355 pub commit: Option<String>,
357 pub limit: Option<u32>,
359 pub offset: Option<u32>,
361}
362
363const FROM_CLAUSE: &str = "\
365FROM sessions s \
366LEFT JOIN session_sync ss ON ss.session_id = s.id \
367LEFT JOIN users u ON u.id = s.user_id";
368
369pub struct LocalDb {
373 conn: Mutex<Connection>,
374}
375
376impl LocalDb {
377 pub fn open() -> Result<Self> {
380 let path = default_db_path()?;
381 Self::open_path(&path)
382 }
383
384 pub fn open_path(path: &PathBuf) -> Result<Self> {
386 if let Some(parent) = path.parent() {
387 std::fs::create_dir_all(parent)
388 .with_context(|| format!("create dir for {}", path.display()))?;
389 }
390 match open_connection_with_latest_schema(path) {
391 Ok(conn) => Ok(Self {
392 conn: Mutex::new(conn),
393 }),
394 Err(err) => {
395 if !is_schema_compat_error(&err) {
396 return Err(err);
397 }
398
399 rotate_legacy_db(path)?;
402
403 let conn = open_connection_with_latest_schema(path)
404 .with_context(|| format!("recreate db {}", path.display()))?;
405 Ok(Self {
406 conn: Mutex::new(conn),
407 })
408 }
409 }
410 }
411
412 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
413 self.conn.lock().expect("local db mutex poisoned")
414 }
415
416 pub fn upsert_local_session(
419 &self,
420 session: &Session,
421 source_path: &str,
422 git: &GitContext,
423 ) -> Result<()> {
424 let title = session.context.title.as_deref();
425 let description = session.context.description.as_deref();
426 let tags = if session.context.tags.is_empty() {
427 None
428 } else {
429 Some(session.context.tags.join(","))
430 };
431 let created_at = session.context.created_at.to_rfc3339();
432 let cwd = session
433 .context
434 .attributes
435 .get("cwd")
436 .or_else(|| session.context.attributes.get("working_directory"))
437 .and_then(|v| v.as_str().map(String::from));
438
439 let (files_modified, files_read, has_errors) =
441 opensession_core::extract::extract_file_metadata(session);
442 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
443 let normalized_tool =
444 normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
445
446 let conn = self.conn();
447 conn.execute(
450 "INSERT INTO sessions \
451 (id, team_id, tool, agent_provider, agent_model, \
452 title, description, tags, created_at, \
453 message_count, user_message_count, task_count, event_count, duration_seconds, \
454 total_input_tokens, total_output_tokens, body_storage_key, \
455 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
456 files_modified, files_read, has_errors, max_active_agents) \
457 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
458 ON CONFLICT(id) DO UPDATE SET \
459 tool=excluded.tool, agent_provider=excluded.agent_provider, \
460 agent_model=excluded.agent_model, \
461 title=excluded.title, description=excluded.description, \
462 tags=excluded.tags, \
463 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
464 task_count=excluded.task_count, \
465 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
466 total_input_tokens=excluded.total_input_tokens, \
467 total_output_tokens=excluded.total_output_tokens, \
468 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
469 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
470 working_directory=excluded.working_directory, \
471 files_modified=excluded.files_modified, files_read=excluded.files_read, \
472 has_errors=excluded.has_errors, \
473 max_active_agents=excluded.max_active_agents",
474 params![
475 &session.session_id,
476 &normalized_tool,
477 &session.agent.provider,
478 &session.agent.model,
479 title,
480 description,
481 &tags,
482 &created_at,
483 session.stats.message_count as i64,
484 session.stats.user_message_count as i64,
485 session.stats.task_count as i64,
486 session.stats.event_count as i64,
487 session.stats.duration_seconds as i64,
488 session.stats.total_input_tokens as i64,
489 session.stats.total_output_tokens as i64,
490 &git.remote,
491 &git.branch,
492 &git.commit,
493 &git.repo_name,
494 &cwd,
495 &files_modified,
496 &files_read,
497 has_errors,
498 max_active_agents,
499 ],
500 )?;
501
502 conn.execute(
503 "INSERT INTO session_sync (session_id, source_path, sync_status) \
504 VALUES (?1, ?2, 'local_only') \
505 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
506 params![&session.session_id, source_path],
507 )?;
508 Ok(())
509 }
510
511 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
514 let conn = self.conn();
515 conn.execute(
518 "INSERT INTO sessions \
519 (id, user_id, team_id, tool, agent_provider, agent_model, \
520 title, description, tags, created_at, uploaded_at, \
521 message_count, task_count, event_count, duration_seconds, \
522 total_input_tokens, total_output_tokens, body_storage_key, \
523 git_remote, git_branch, git_commit, git_repo_name, \
524 pr_number, pr_url, working_directory, \
525 files_modified, files_read, has_errors, max_active_agents) \
526 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
527 ON CONFLICT(id) DO UPDATE SET \
528 title=excluded.title, description=excluded.description, \
529 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
530 message_count=excluded.message_count, task_count=excluded.task_count, \
531 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
532 total_input_tokens=excluded.total_input_tokens, \
533 total_output_tokens=excluded.total_output_tokens, \
534 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
535 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
536 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
537 working_directory=excluded.working_directory, \
538 files_modified=excluded.files_modified, files_read=excluded.files_read, \
539 has_errors=excluded.has_errors, \
540 max_active_agents=excluded.max_active_agents",
541 params![
542 &summary.id,
543 &summary.user_id,
544 &summary.team_id,
545 &summary.tool,
546 &summary.agent_provider,
547 &summary.agent_model,
548 &summary.title,
549 &summary.description,
550 &summary.tags,
551 &summary.created_at,
552 &summary.uploaded_at,
553 summary.message_count,
554 summary.task_count,
555 summary.event_count,
556 summary.duration_seconds,
557 summary.total_input_tokens,
558 summary.total_output_tokens,
559 &summary.git_remote,
560 &summary.git_branch,
561 &summary.git_commit,
562 &summary.git_repo_name,
563 summary.pr_number,
564 &summary.pr_url,
565 &summary.working_directory,
566 &summary.files_modified,
567 &summary.files_read,
568 summary.has_errors,
569 summary.max_active_agents,
570 ],
571 )?;
572
573 conn.execute(
574 "INSERT INTO session_sync (session_id, sync_status) \
575 VALUES (?1, 'remote_only') \
576 ON CONFLICT(session_id) DO UPDATE SET \
577 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
578 params![&summary.id],
579 )?;
580 Ok(())
581 }
582
583 fn build_local_session_where_clause(
586 filter: &LocalSessionFilter,
587 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
588 let mut where_clauses = vec![
589 "1=1".to_string(),
590 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
591 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
592 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
593 ];
594 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
595 let mut idx = 1u32;
596
597 if let Some(ref team_id) = filter.team_id {
598 where_clauses.push(format!("s.team_id = ?{idx}"));
599 param_values.push(Box::new(team_id.clone()));
600 idx += 1;
601 }
602
603 if let Some(ref sync_status) = filter.sync_status {
604 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
605 param_values.push(Box::new(sync_status.clone()));
606 idx += 1;
607 }
608
609 if let Some(ref repo) = filter.git_repo_name {
610 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
611 param_values.push(Box::new(repo.clone()));
612 idx += 1;
613 }
614
615 if let Some(ref tool) = filter.tool {
616 where_clauses.push(format!("s.tool = ?{idx}"));
617 param_values.push(Box::new(tool.clone()));
618 idx += 1;
619 }
620
621 if let Some(ref search) = filter.search {
622 let like = format!("%{search}%");
623 where_clauses.push(format!(
624 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
625 i1 = idx,
626 i2 = idx + 1,
627 i3 = idx + 2,
628 ));
629 param_values.push(Box::new(like.clone()));
630 param_values.push(Box::new(like.clone()));
631 param_values.push(Box::new(like));
632 idx += 3;
633 }
634
635 let interval = match filter.time_range {
636 LocalTimeRange::Hours24 => Some("-1 day"),
637 LocalTimeRange::Days7 => Some("-7 days"),
638 LocalTimeRange::Days30 => Some("-30 days"),
639 LocalTimeRange::All => None,
640 };
641 if let Some(interval) = interval {
642 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
643 param_values.push(Box::new(interval.to_string()));
644 }
645
646 (where_clauses.join(" AND "), param_values)
647 }
648
649 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
650 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
651 let order_clause = match filter.sort {
652 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
653 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
654 LocalSortOrder::Recent => "s.created_at DESC",
655 };
656
657 let mut sql = format!(
658 "SELECT {LOCAL_SESSION_COLUMNS} \
659 {FROM_CLAUSE} WHERE {where_str} \
660 ORDER BY {order_clause}"
661 );
662
663 if let Some(limit) = filter.limit {
664 sql.push_str(" LIMIT ?");
665 param_values.push(Box::new(limit));
666 if let Some(offset) = filter.offset {
667 sql.push_str(" OFFSET ?");
668 param_values.push(Box::new(offset));
669 }
670 }
671
672 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
673 param_values.iter().map(|p| p.as_ref()).collect();
674 let conn = self.conn();
675 let mut stmt = conn.prepare(&sql)?;
676 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
677
678 let mut result = Vec::new();
679 for row in rows {
680 result.push(row?);
681 }
682
683 Ok(hide_opencode_child_sessions(result))
684 }
685
686 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
688 let mut count_filter = filter.clone();
689 count_filter.limit = None;
690 count_filter.offset = None;
691 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
692 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
693 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
694 param_values.iter().map(|p| p.as_ref()).collect();
695 let conn = self.conn();
696 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
697 Ok(count)
698 }
699
700 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
702 let mut tool_filter = filter.clone();
703 tool_filter.tool = None;
704 tool_filter.limit = None;
705 tool_filter.offset = None;
706 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
707 let sql = format!(
708 "SELECT DISTINCT s.tool \
709 {FROM_CLAUSE} WHERE {where_str} \
710 ORDER BY s.tool ASC"
711 );
712 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
713 param_values.iter().map(|p| p.as_ref()).collect();
714 let conn = self.conn();
715 let mut stmt = conn.prepare(&sql)?;
716 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
717
718 let mut tools = Vec::new();
719 for row in rows {
720 let tool = row?;
721 if !tool.trim().is_empty() {
722 tools.push(tool);
723 }
724 }
725 Ok(tools)
726 }
727
728 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
732 let mut where_clauses = vec!["1=1".to_string()];
733 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
734 let mut idx = 1u32;
735
736 if let Some(ref tool) = filter.tool {
737 where_clauses.push(format!("s.tool = ?{idx}"));
738 param_values.push(Box::new(tool.clone()));
739 idx += 1;
740 }
741
742 if let Some(ref model) = filter.model {
743 let like = model.replace('*', "%");
744 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
745 param_values.push(Box::new(like));
746 idx += 1;
747 }
748
749 if let Some(ref since) = filter.since {
750 where_clauses.push(format!("s.created_at >= ?{idx}"));
751 param_values.push(Box::new(since.clone()));
752 idx += 1;
753 }
754
755 if let Some(ref before) = filter.before {
756 where_clauses.push(format!("s.created_at < ?{idx}"));
757 param_values.push(Box::new(before.clone()));
758 idx += 1;
759 }
760
761 if let Some(ref touches) = filter.touches {
762 let like = format!("%\"{touches}\"%");
763 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
764 param_values.push(Box::new(like));
765 idx += 1;
766 }
767
768 if let Some(ref grep) = filter.grep {
769 let like = format!("%{grep}%");
770 where_clauses.push(format!(
771 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
772 i1 = idx,
773 i2 = idx + 1,
774 i3 = idx + 2,
775 ));
776 param_values.push(Box::new(like.clone()));
777 param_values.push(Box::new(like.clone()));
778 param_values.push(Box::new(like));
779 idx += 3;
780 }
781
782 if let Some(true) = filter.has_errors {
783 where_clauses.push("s.has_errors = 1".to_string());
784 }
785
786 if let Some(ref wd) = filter.working_directory {
787 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
788 param_values.push(Box::new(format!("{wd}%")));
789 idx += 1;
790 }
791
792 if let Some(ref repo) = filter.git_repo_name {
793 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
794 param_values.push(Box::new(repo.clone()));
795 idx += 1;
796 }
797
798 let mut extra_join = String::new();
800 if let Some(ref commit) = filter.commit {
801 extra_join =
802 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
803 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
804 param_values.push(Box::new(commit.clone()));
805 idx += 1;
806 }
807
808 let _ = idx; let where_str = where_clauses.join(" AND ");
811 let mut sql = format!(
812 "SELECT {LOCAL_SESSION_COLUMNS} \
813 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
814 ORDER BY s.created_at DESC"
815 );
816
817 if let Some(limit) = filter.limit {
818 sql.push_str(" LIMIT ?");
819 param_values.push(Box::new(limit));
820 if let Some(offset) = filter.offset {
821 sql.push_str(" OFFSET ?");
822 param_values.push(Box::new(offset));
823 }
824 }
825
826 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
827 param_values.iter().map(|p| p.as_ref()).collect();
828 let conn = self.conn();
829 let mut stmt = conn.prepare(&sql)?;
830 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
831
832 let mut result = Vec::new();
833 for row in rows {
834 result.push(row?);
835 }
836 Ok(hide_opencode_child_sessions(result))
837 }
838
839 pub fn get_sessions_by_tool_latest(
841 &self,
842 tool: &str,
843 count: u32,
844 ) -> Result<Vec<LocalSessionRow>> {
845 let sql = format!(
846 "SELECT {LOCAL_SESSION_COLUMNS} \
847 {FROM_CLAUSE} WHERE s.tool = ?1 \
848 ORDER BY s.created_at DESC"
849 );
850 let conn = self.conn();
851 let mut stmt = conn.prepare(&sql)?;
852 let rows = stmt.query_map(params![tool], row_to_local_session)?;
853 let mut result = Vec::new();
854 for row in rows {
855 result.push(row?);
856 }
857
858 let mut filtered = hide_opencode_child_sessions(result);
859 filtered.truncate(count as usize);
860 Ok(filtered)
861 }
862
863 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
865 let sql = format!(
866 "SELECT {LOCAL_SESSION_COLUMNS} \
867 {FROM_CLAUSE} \
868 ORDER BY s.created_at DESC"
869 );
870 let conn = self.conn();
871 let mut stmt = conn.prepare(&sql)?;
872 let rows = stmt.query_map([], row_to_local_session)?;
873 let mut result = Vec::new();
874 for row in rows {
875 result.push(row?);
876 }
877
878 let mut filtered = hide_opencode_child_sessions(result);
879 filtered.truncate(count as usize);
880 Ok(filtered)
881 }
882
883 pub fn get_session_by_tool_offset(
885 &self,
886 tool: &str,
887 offset: u32,
888 ) -> Result<Option<LocalSessionRow>> {
889 let sql = format!(
890 "SELECT {LOCAL_SESSION_COLUMNS} \
891 {FROM_CLAUSE} WHERE s.tool = ?1 \
892 ORDER BY s.created_at DESC"
893 );
894 let conn = self.conn();
895 let mut stmt = conn.prepare(&sql)?;
896 let rows = stmt.query_map(params![tool], row_to_local_session)?;
897 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
898 Ok(filtered.into_iter().nth(offset as usize))
899 }
900
901 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
903 let sql = format!(
904 "SELECT {LOCAL_SESSION_COLUMNS} \
905 {FROM_CLAUSE} \
906 ORDER BY s.created_at DESC"
907 );
908 let conn = self.conn();
909 let mut stmt = conn.prepare(&sql)?;
910 let rows = stmt.query_map([], row_to_local_session)?;
911 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
912 Ok(filtered.into_iter().nth(offset as usize))
913 }
914
915 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
917 let conn = self.conn();
918 let result = conn
919 .query_row(
920 "SELECT source_path FROM session_sync WHERE session_id = ?1",
921 params![session_id],
922 |row| row.get(0),
923 )
924 .optional()?;
925
926 Ok(result)
927 }
928
929 pub fn session_count(&self) -> Result<i64> {
931 let count = self
932 .conn()
933 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
934 Ok(count)
935 }
936
937 pub fn delete_session(&self, session_id: &str) -> Result<()> {
940 let conn = self.conn();
941 conn.execute(
942 "DELETE FROM body_cache WHERE session_id = ?1",
943 params![session_id],
944 )?;
945 conn.execute(
946 "DELETE FROM session_sync WHERE session_id = ?1",
947 params![session_id],
948 )?;
949 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
950 Ok(())
951 }
952
953 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
956 let cursor = self
957 .conn()
958 .query_row(
959 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
960 params![team_id],
961 |row| row.get(0),
962 )
963 .optional()?;
964 Ok(cursor)
965 }
966
967 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
968 self.conn().execute(
969 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
970 VALUES (?1, ?2, datetime('now')) \
971 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
972 params![team_id, cursor],
973 )?;
974 Ok(())
975 }
976
977 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
981 let sql = format!(
982 "SELECT {LOCAL_SESSION_COLUMNS} \
983 FROM sessions s \
984 INNER JOIN session_sync ss ON ss.session_id = s.id \
985 LEFT JOIN users u ON u.id = s.user_id \
986 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
987 ORDER BY s.created_at ASC"
988 );
989 let conn = self.conn();
990 let mut stmt = conn.prepare(&sql)?;
991 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
992 let mut result = Vec::new();
993 for row in rows {
994 result.push(row?);
995 }
996 Ok(result)
997 }
998
999 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
1000 self.conn().execute(
1001 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
1002 WHERE session_id = ?1",
1003 params![session_id],
1004 )?;
1005 Ok(())
1006 }
1007
1008 pub fn was_uploaded_after(
1010 &self,
1011 source_path: &str,
1012 modified: &chrono::DateTime<chrono::Utc>,
1013 ) -> Result<bool> {
1014 let result: Option<String> = self
1015 .conn()
1016 .query_row(
1017 "SELECT last_synced_at FROM session_sync \
1018 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1019 params![source_path],
1020 |row| row.get(0),
1021 )
1022 .optional()?;
1023
1024 if let Some(synced_at) = result {
1025 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1026 return Ok(dt >= *modified);
1027 }
1028 }
1029 Ok(false)
1030 }
1031
1032 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1035 self.conn().execute(
1036 "INSERT INTO body_cache (session_id, body, cached_at) \
1037 VALUES (?1, ?2, datetime('now')) \
1038 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1039 params![session_id, body],
1040 )?;
1041 Ok(())
1042 }
1043
1044 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1045 let body = self
1046 .conn()
1047 .query_row(
1048 "SELECT body FROM body_cache WHERE session_id = ?1",
1049 params![session_id],
1050 |row| row.get(0),
1051 )
1052 .optional()?;
1053 Ok(body)
1054 }
1055
1056 pub fn migrate_from_state_json(
1061 &self,
1062 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1063 ) -> Result<usize> {
1064 let mut count = 0;
1065 for (path, uploaded_at) in uploaded {
1066 let exists: bool = self
1067 .conn()
1068 .query_row(
1069 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1070 params![path],
1071 |row| row.get(0),
1072 )
1073 .unwrap_or(false);
1074
1075 if exists {
1076 self.conn().execute(
1077 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1078 WHERE source_path = ?2 AND sync_status = 'local_only'",
1079 params![uploaded_at.to_rfc3339(), path],
1080 )?;
1081 count += 1;
1082 }
1083 }
1084 Ok(count)
1085 }
1086
1087 pub fn link_commit_session(
1091 &self,
1092 commit_hash: &str,
1093 session_id: &str,
1094 repo_path: Option<&str>,
1095 branch: Option<&str>,
1096 ) -> Result<()> {
1097 self.conn().execute(
1098 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1099 VALUES (?1, ?2, ?3, ?4) \
1100 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1101 params![commit_hash, session_id, repo_path, branch],
1102 )?;
1103 Ok(())
1104 }
1105
1106 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1108 let sql = format!(
1109 "SELECT {LOCAL_SESSION_COLUMNS} \
1110 {FROM_CLAUSE} \
1111 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1112 WHERE csl.commit_hash = ?1 \
1113 ORDER BY s.created_at DESC"
1114 );
1115 let conn = self.conn();
1116 let mut stmt = conn.prepare(&sql)?;
1117 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1118 let mut result = Vec::new();
1119 for row in rows {
1120 result.push(row?);
1121 }
1122 Ok(result)
1123 }
1124
1125 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1127 let conn = self.conn();
1128 let mut stmt = conn.prepare(
1129 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1130 FROM commit_session_links WHERE session_id = ?1 \
1131 ORDER BY created_at DESC",
1132 )?;
1133 let rows = stmt.query_map(params![session_id], |row| {
1134 Ok(CommitLink {
1135 commit_hash: row.get(0)?,
1136 session_id: row.get(1)?,
1137 repo_path: row.get(2)?,
1138 branch: row.get(3)?,
1139 created_at: row.get(4)?,
1140 })
1141 })?;
1142 let mut result = Vec::new();
1143 for row in rows {
1144 result.push(row?);
1145 }
1146 Ok(result)
1147 }
1148
1149 pub fn find_active_session_for_repo(
1153 &self,
1154 repo_path: &str,
1155 since_minutes: u32,
1156 ) -> Result<Option<LocalSessionRow>> {
1157 let sql = format!(
1158 "SELECT {LOCAL_SESSION_COLUMNS} \
1159 {FROM_CLAUSE} \
1160 WHERE s.working_directory LIKE ?1 \
1161 AND s.created_at >= datetime('now', ?2) \
1162 ORDER BY s.created_at DESC LIMIT 1"
1163 );
1164 let since = format!("-{since_minutes} minutes");
1165 let like = format!("{repo_path}%");
1166 let conn = self.conn();
1167 let mut stmt = conn.prepare(&sql)?;
1168 let row = stmt
1169 .query_map(params![like, since], row_to_local_session)?
1170 .next()
1171 .transpose()?;
1172 Ok(row)
1173 }
1174
1175 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1177 let conn = self.conn();
1178 let mut stmt = conn
1179 .prepare("SELECT id FROM sessions")
1180 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1181 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1182 let mut set = std::collections::HashSet::new();
1183 if let Ok(rows) = rows {
1184 for row in rows.flatten() {
1185 set.insert(row);
1186 }
1187 }
1188 set
1189 }
1190
1191 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1193 let title = session.context.title.as_deref();
1194 let description = session.context.description.as_deref();
1195 let (files_modified, files_read, has_errors) =
1196 opensession_core::extract::extract_file_metadata(session);
1197 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1198
1199 self.conn().execute(
1200 "UPDATE sessions SET \
1201 title=?2, description=?3, \
1202 message_count=?4, user_message_count=?5, task_count=?6, \
1203 event_count=?7, duration_seconds=?8, \
1204 total_input_tokens=?9, total_output_tokens=?10, \
1205 files_modified=?11, files_read=?12, has_errors=?13, \
1206 max_active_agents=?14 \
1207 WHERE id=?1",
1208 params![
1209 &session.session_id,
1210 title,
1211 description,
1212 session.stats.message_count as i64,
1213 session.stats.user_message_count as i64,
1214 session.stats.task_count as i64,
1215 session.stats.event_count as i64,
1216 session.stats.duration_seconds as i64,
1217 session.stats.total_input_tokens as i64,
1218 session.stats.total_output_tokens as i64,
1219 &files_modified,
1220 &files_read,
1221 has_errors,
1222 max_active_agents,
1223 ],
1224 )?;
1225 Ok(())
1226 }
1227
1228 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1230 self.conn().execute(
1231 "INSERT INTO session_sync (session_id, source_path) \
1232 VALUES (?1, ?2) \
1233 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1234 params![session_id, source_path],
1235 )?;
1236 Ok(())
1237 }
1238
1239 pub fn list_repos(&self) -> Result<Vec<String>> {
1241 let conn = self.conn();
1242 let mut stmt = conn.prepare(
1243 "SELECT DISTINCT git_repo_name FROM sessions \
1244 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1245 )?;
1246 let rows = stmt.query_map([], |row| row.get(0))?;
1247 let mut result = Vec::new();
1248 for row in rows {
1249 result.push(row?);
1250 }
1251 Ok(result)
1252 }
1253
1254 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1256 let conn = self.conn();
1257 let mut stmt = conn.prepare(
1258 "SELECT DISTINCT working_directory FROM sessions \
1259 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1260 ORDER BY working_directory ASC",
1261 )?;
1262 let rows = stmt.query_map([], |row| row.get(0))?;
1263 let mut result = Vec::new();
1264 for row in rows {
1265 result.push(row?);
1266 }
1267 Ok(result)
1268 }
1269}
1270
1271fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1274 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1275 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1276
1277 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1279
1280 apply_local_migrations(&conn)?;
1281
1282 ensure_sessions_columns(&conn)?;
1285 repair_session_tools_from_source_path(&conn)?;
1286 validate_local_schema(&conn)?;
1287
1288 Ok(conn)
1289}
1290
1291fn apply_local_migrations(conn: &Connection) -> Result<()> {
1292 conn.execute_batch(
1293 "CREATE TABLE IF NOT EXISTS _migrations (
1294 id INTEGER PRIMARY KEY,
1295 name TEXT NOT NULL UNIQUE,
1296 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1297 );",
1298 )
1299 .context("create _migrations table for local db")?;
1300
1301 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1302 let already_applied: bool = conn
1303 .query_row(
1304 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1305 [name],
1306 |row| row.get(0),
1307 )
1308 .unwrap_or(false);
1309
1310 if already_applied {
1311 continue;
1312 }
1313
1314 if let Err(e) = conn.execute_batch(sql) {
1315 let msg = e.to_string().to_ascii_lowercase();
1316 if !is_local_migration_compat_error(&msg) {
1317 return Err(e).with_context(|| format!("apply local migration {name}"));
1318 }
1319 }
1320
1321 conn.execute(
1322 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1323 [name],
1324 )
1325 .with_context(|| format!("record local migration {name}"))?;
1326 }
1327
1328 Ok(())
1329}
1330
1331fn is_local_migration_compat_error(msg: &str) -> bool {
1332 msg.contains("duplicate column name")
1333 || msg.contains("no such column")
1334 || msg.contains("already exists")
1335}
1336
1337fn validate_local_schema(conn: &Connection) -> Result<()> {
1338 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1339 conn.prepare(&sql)
1340 .map(|_| ())
1341 .context("validate local session schema")
1342}
1343
1344fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1345 let mut stmt = conn.prepare(
1346 "SELECT s.id, s.tool, ss.source_path \
1347 FROM sessions s \
1348 LEFT JOIN session_sync ss ON ss.session_id = s.id \
1349 WHERE ss.source_path IS NOT NULL",
1350 )?;
1351 let rows = stmt.query_map([], |row| {
1352 Ok((
1353 row.get::<_, String>(0)?,
1354 row.get::<_, String>(1)?,
1355 row.get::<_, Option<String>>(2)?,
1356 ))
1357 })?;
1358
1359 let mut updates: Vec<(String, String)> = Vec::new();
1360 for row in rows {
1361 let (id, current_tool, source_path) = row?;
1362 let normalized = normalize_tool_for_source_path(¤t_tool, source_path.as_deref());
1363 if normalized != current_tool {
1364 updates.push((id, normalized));
1365 }
1366 }
1367 drop(stmt);
1368
1369 for (id, tool) in updates {
1370 conn.execute(
1371 "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1372 params![tool, id],
1373 )?;
1374 }
1375
1376 Ok(())
1377}
1378
1379fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1380 let msg = format!("{err:#}").to_ascii_lowercase();
1381 msg.contains("no such column")
1382 || msg.contains("no such table")
1383 || msg.contains("cannot add a column")
1384 || msg.contains("already exists")
1385 || msg.contains("views may not be indexed")
1386 || msg.contains("malformed database schema")
1387 || msg.contains("duplicate column name")
1388}
1389
1390fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1391 if !path.exists() {
1392 return Ok(());
1393 }
1394
1395 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1396 let backup_name = format!(
1397 "{}.legacy-{}.bak",
1398 path.file_name()
1399 .and_then(|n| n.to_str())
1400 .unwrap_or("local.db"),
1401 ts
1402 );
1403 let backup_path = path.with_file_name(backup_name);
1404 std::fs::rename(path, &backup_path).with_context(|| {
1405 format!(
1406 "rotate local db backup {} -> {}",
1407 path.display(),
1408 backup_path.display()
1409 )
1410 })?;
1411
1412 let wal = PathBuf::from(format!("{}-wal", path.display()));
1413 let shm = PathBuf::from(format!("{}-shm", path.display()));
1414 let _ = std::fs::remove_file(wal);
1415 let _ = std::fs::remove_file(shm);
1416 Ok(())
1417}
1418
1419const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1420 ("user_id", "TEXT"),
1421 ("team_id", "TEXT DEFAULT 'personal'"),
1422 ("tool", "TEXT DEFAULT ''"),
1423 ("agent_provider", "TEXT"),
1424 ("agent_model", "TEXT"),
1425 ("title", "TEXT"),
1426 ("description", "TEXT"),
1427 ("tags", "TEXT"),
1428 ("created_at", "TEXT DEFAULT ''"),
1429 ("uploaded_at", "TEXT DEFAULT ''"),
1430 ("message_count", "INTEGER DEFAULT 0"),
1431 ("user_message_count", "INTEGER DEFAULT 0"),
1432 ("task_count", "INTEGER DEFAULT 0"),
1433 ("event_count", "INTEGER DEFAULT 0"),
1434 ("duration_seconds", "INTEGER DEFAULT 0"),
1435 ("total_input_tokens", "INTEGER DEFAULT 0"),
1436 ("total_output_tokens", "INTEGER DEFAULT 0"),
1437 ("body_storage_key", "TEXT DEFAULT ''"),
1439 ("body_url", "TEXT"),
1440 ("git_remote", "TEXT"),
1441 ("git_branch", "TEXT"),
1442 ("git_commit", "TEXT"),
1443 ("git_repo_name", "TEXT"),
1444 ("pr_number", "INTEGER"),
1445 ("pr_url", "TEXT"),
1446 ("working_directory", "TEXT"),
1447 ("files_modified", "TEXT"),
1448 ("files_read", "TEXT"),
1449 ("has_errors", "BOOLEAN DEFAULT 0"),
1450 ("max_active_agents", "INTEGER DEFAULT 1"),
1451];
1452
1453fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1454 let mut existing = HashSet::new();
1455 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1456 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1457 for row in rows {
1458 existing.insert(row?);
1459 }
1460
1461 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1462 if existing.contains(*name) {
1463 continue;
1464 }
1465 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1466 conn.execute_batch(&sql)
1467 .with_context(|| format!("add sessions column '{name}'"))?;
1468 }
1469
1470 Ok(())
1471}
1472
1473pub const LOCAL_SESSION_COLUMNS: &str = "\
1475s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1476s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1477s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1478s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1479s.total_input_tokens, s.total_output_tokens, \
1480s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1481s.pr_number, s.pr_url, s.working_directory, \
1482s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1483
1484fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1485 let source_path: Option<String> = row.get(1)?;
1486 let tool: String = row.get(7)?;
1487 let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1488
1489 Ok(LocalSessionRow {
1490 id: row.get(0)?,
1491 source_path,
1492 sync_status: row.get(2)?,
1493 last_synced_at: row.get(3)?,
1494 user_id: row.get(4)?,
1495 nickname: row.get(5)?,
1496 team_id: row.get(6)?,
1497 tool: normalized_tool,
1498 agent_provider: row.get(8)?,
1499 agent_model: row.get(9)?,
1500 title: row.get(10)?,
1501 description: row.get(11)?,
1502 tags: row.get(12)?,
1503 created_at: row.get(13)?,
1504 uploaded_at: row.get(14)?,
1505 message_count: row.get(15)?,
1506 user_message_count: row.get(16)?,
1507 task_count: row.get(17)?,
1508 event_count: row.get(18)?,
1509 duration_seconds: row.get(19)?,
1510 total_input_tokens: row.get(20)?,
1511 total_output_tokens: row.get(21)?,
1512 git_remote: row.get(22)?,
1513 git_branch: row.get(23)?,
1514 git_commit: row.get(24)?,
1515 git_repo_name: row.get(25)?,
1516 pr_number: row.get(26)?,
1517 pr_url: row.get(27)?,
1518 working_directory: row.get(28)?,
1519 files_modified: row.get(29)?,
1520 files_read: row.get(30)?,
1521 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1522 max_active_agents: row.get(32).unwrap_or(1),
1523 })
1524}
1525
1526fn default_db_path() -> Result<PathBuf> {
1527 let home = std::env::var("HOME")
1528 .or_else(|_| std::env::var("USERPROFILE"))
1529 .context("Could not determine home directory")?;
1530 Ok(PathBuf::from(home)
1531 .join(".local")
1532 .join("share")
1533 .join("opensession")
1534 .join("local.db"))
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use super::*;
1540
1541 use std::collections::BTreeSet;
1542 use std::fs::{create_dir_all, write};
1543 use tempfile::tempdir;
1544
1545 fn test_db() -> LocalDb {
1546 let dir = tempdir().unwrap();
1547 let path = dir.keep().join("test.db");
1548 LocalDb::open_path(&path).unwrap()
1549 }
1550
1551 fn temp_root() -> tempfile::TempDir {
1552 tempdir().unwrap()
1553 }
1554
1555 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1556 LocalSessionRow {
1557 id: id.to_string(),
1558 source_path: source_path.map(String::from),
1559 sync_status: "local_only".to_string(),
1560 last_synced_at: None,
1561 user_id: None,
1562 nickname: None,
1563 team_id: None,
1564 tool: tool.to_string(),
1565 agent_provider: None,
1566 agent_model: None,
1567 title: Some("test".to_string()),
1568 description: None,
1569 tags: None,
1570 created_at: "2024-01-01T00:00:00Z".to_string(),
1571 uploaded_at: None,
1572 message_count: 0,
1573 user_message_count: 0,
1574 task_count: 0,
1575 event_count: 0,
1576 duration_seconds: 0,
1577 total_input_tokens: 0,
1578 total_output_tokens: 0,
1579 git_remote: None,
1580 git_branch: None,
1581 git_commit: None,
1582 git_repo_name: None,
1583 pr_number: None,
1584 pr_url: None,
1585 working_directory: None,
1586 files_modified: None,
1587 files_read: None,
1588 has_errors: false,
1589 max_active_agents: 1,
1590 }
1591 }
1592
1593 #[test]
1594 fn test_open_and_schema() {
1595 let _db = test_db();
1596 }
1597
1598 #[test]
1599 fn test_open_repairs_codex_tool_hint_from_source_path() {
1600 let dir = tempfile::tempdir().unwrap();
1601 let path = dir.path().join("repair.db");
1602
1603 {
1604 let _ = LocalDb::open_path(&path).unwrap();
1605 }
1606
1607 {
1608 let conn = Connection::open(&path).unwrap();
1609 conn.execute(
1610 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1611 params!["rollout-repair", "2026-02-20T00:00:00Z"],
1612 )
1613 .unwrap();
1614 conn.execute(
1615 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1616 params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1617 )
1618 .unwrap();
1619 }
1620
1621 let db = LocalDb::open_path(&path).unwrap();
1622 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1623 let row = rows
1624 .iter()
1625 .find(|row| row.id == "rollout-repair")
1626 .expect("repaired row");
1627 assert_eq!(row.tool, "codex");
1628 }
1629
1630 #[test]
1631 fn test_upsert_local_session_normalizes_tool_from_source_path() {
1632 let db = test_db();
1633 let mut session = Session::new(
1634 "rollout-upsert".to_string(),
1635 opensession_core::trace::Agent {
1636 provider: "openai".to_string(),
1637 model: "gpt-5".to_string(),
1638 tool: "claude-code".to_string(),
1639 tool_version: None,
1640 },
1641 );
1642 session.stats.event_count = 1;
1643
1644 db.upsert_local_session(
1645 &session,
1646 "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
1647 &crate::git::GitContext::default(),
1648 )
1649 .unwrap();
1650
1651 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1652 let row = rows
1653 .iter()
1654 .find(|row| row.id == "rollout-upsert")
1655 .expect("upserted row");
1656 assert_eq!(row.tool, "codex");
1657 }
1658
1659 #[test]
1660 fn test_open_backfills_legacy_sessions_columns() {
1661 let dir = tempfile::tempdir().unwrap();
1662 let path = dir.path().join("legacy.db");
1663 {
1664 let conn = Connection::open(&path).unwrap();
1665 conn.execute_batch(
1666 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1667 INSERT INTO sessions (id) VALUES ('legacy-1');",
1668 )
1669 .unwrap();
1670 }
1671
1672 let db = LocalDb::open_path(&path).unwrap();
1673 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1674 assert_eq!(rows.len(), 1);
1675 assert_eq!(rows[0].id, "legacy-1");
1676 assert_eq!(rows[0].user_message_count, 0);
1677 }
1678
1679 #[test]
1680 fn test_open_rotates_incompatible_legacy_schema() {
1681 let dir = tempfile::tempdir().unwrap();
1682 let path = dir.path().join("broken.db");
1683 {
1684 let conn = Connection::open(&path).unwrap();
1685 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1686 .unwrap();
1687 }
1688
1689 let db = LocalDb::open_path(&path).unwrap();
1690 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1691 assert!(rows.is_empty());
1692
1693 let rotated = std::fs::read_dir(dir.path())
1694 .unwrap()
1695 .filter_map(Result::ok)
1696 .any(|entry| {
1697 let name = entry.file_name();
1698 let name = name.to_string_lossy();
1699 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1700 });
1701 assert!(rotated, "expected rotated legacy backup file");
1702 }
1703
1704 #[test]
1705 fn test_is_opencode_child_session() {
1706 let root = temp_root();
1707 let dir = root.path().join("sessions");
1708 create_dir_all(&dir).unwrap();
1709 let parent_session = dir.join("parent.json");
1710 write(
1711 &parent_session,
1712 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1713 )
1714 .unwrap();
1715 let child_session = dir.join("child.json");
1716 write(
1717 &child_session,
1718 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1719 )
1720 .unwrap();
1721
1722 let parent = make_row(
1723 "ses_parent",
1724 "opencode",
1725 Some(parent_session.to_str().unwrap()),
1726 );
1727 let child = make_row(
1728 "ses_child",
1729 "opencode",
1730 Some(child_session.to_str().unwrap()),
1731 );
1732 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1733
1734 assert!(!is_opencode_child_session(&parent));
1735 assert!(is_opencode_child_session(&child));
1736 assert!(!is_opencode_child_session(&codex));
1737 }
1738
1739 #[test]
1740 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1741 let child = LocalSessionRow {
1742 id: "sess_child".to_string(),
1743 source_path: None,
1744 sync_status: "local_only".to_string(),
1745 last_synced_at: None,
1746 user_id: None,
1747 nickname: None,
1748 team_id: None,
1749 tool: "opencode".to_string(),
1750 agent_provider: None,
1751 agent_model: None,
1752 title: None,
1753 description: None,
1754 tags: None,
1755 created_at: "2024-01-01T00:00:00Z".to_string(),
1756 uploaded_at: None,
1757 message_count: 1,
1758 user_message_count: 0,
1759 task_count: 4,
1760 event_count: 4,
1761 duration_seconds: 0,
1762 total_input_tokens: 0,
1763 total_output_tokens: 0,
1764 git_remote: None,
1765 git_branch: None,
1766 git_commit: None,
1767 git_repo_name: None,
1768 pr_number: None,
1769 pr_url: None,
1770 working_directory: None,
1771 files_modified: None,
1772 files_read: None,
1773 has_errors: false,
1774 max_active_agents: 1,
1775 };
1776
1777 let parent = LocalSessionRow {
1778 id: "sess_parent".to_string(),
1779 source_path: None,
1780 sync_status: "local_only".to_string(),
1781 last_synced_at: None,
1782 user_id: None,
1783 nickname: None,
1784 team_id: None,
1785 tool: "opencode".to_string(),
1786 agent_provider: None,
1787 agent_model: None,
1788 title: Some("regular".to_string()),
1789 description: None,
1790 tags: None,
1791 created_at: "2024-01-01T00:00:00Z".to_string(),
1792 uploaded_at: None,
1793 message_count: 1,
1794 user_message_count: 1,
1795 task_count: 2,
1796 event_count: 20,
1797 duration_seconds: 0,
1798 total_input_tokens: 0,
1799 total_output_tokens: 0,
1800 git_remote: None,
1801 git_branch: None,
1802 git_commit: None,
1803 git_repo_name: None,
1804 pr_number: None,
1805 pr_url: None,
1806 working_directory: None,
1807 files_modified: None,
1808 files_read: None,
1809 has_errors: false,
1810 max_active_agents: 1,
1811 };
1812
1813 assert!(is_opencode_child_session(&child));
1814 assert!(!is_opencode_child_session(&parent));
1815 }
1816
1817 #[test]
1818 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1819 let child = LocalSessionRow {
1820 id: "sess_child_2".to_string(),
1821 source_path: None,
1822 sync_status: "local_only".to_string(),
1823 last_synced_at: None,
1824 user_id: None,
1825 nickname: None,
1826 team_id: None,
1827 tool: "opencode".to_string(),
1828 agent_provider: None,
1829 agent_model: None,
1830 title: None,
1831 description: None,
1832 tags: None,
1833 created_at: "2024-01-01T00:00:00Z".to_string(),
1834 uploaded_at: None,
1835 message_count: 2,
1836 user_message_count: 0,
1837 task_count: 4,
1838 event_count: 4,
1839 duration_seconds: 0,
1840 total_input_tokens: 0,
1841 total_output_tokens: 0,
1842 git_remote: None,
1843 git_branch: None,
1844 git_commit: None,
1845 git_repo_name: None,
1846 pr_number: None,
1847 pr_url: None,
1848 working_directory: None,
1849 files_modified: None,
1850 files_read: None,
1851 has_errors: false,
1852 max_active_agents: 1,
1853 };
1854
1855 let parent = LocalSessionRow {
1856 id: "sess_parent".to_string(),
1857 source_path: None,
1858 sync_status: "local_only".to_string(),
1859 last_synced_at: None,
1860 user_id: None,
1861 nickname: None,
1862 team_id: None,
1863 tool: "opencode".to_string(),
1864 agent_provider: None,
1865 agent_model: None,
1866 title: Some("regular".to_string()),
1867 description: None,
1868 tags: None,
1869 created_at: "2024-01-01T00:00:00Z".to_string(),
1870 uploaded_at: None,
1871 message_count: 2,
1872 user_message_count: 1,
1873 task_count: 5,
1874 event_count: 20,
1875 duration_seconds: 0,
1876 total_input_tokens: 0,
1877 total_output_tokens: 0,
1878 git_remote: None,
1879 git_branch: None,
1880 git_commit: None,
1881 git_repo_name: None,
1882 pr_number: None,
1883 pr_url: None,
1884 working_directory: None,
1885 files_modified: None,
1886 files_read: None,
1887 has_errors: false,
1888 max_active_agents: 1,
1889 };
1890
1891 assert!(is_opencode_child_session(&child));
1892 assert!(!is_opencode_child_session(&parent));
1893 }
1894
1895 #[test]
1896 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1897 let child = LocalSessionRow {
1898 id: "sess_child_3".to_string(),
1899 source_path: None,
1900 sync_status: "local_only".to_string(),
1901 last_synced_at: None,
1902 user_id: None,
1903 nickname: None,
1904 team_id: None,
1905 tool: "opencode".to_string(),
1906 agent_provider: None,
1907 agent_model: None,
1908 title: None,
1909 description: None,
1910 tags: None,
1911 created_at: "2024-01-01T00:00:00Z".to_string(),
1912 uploaded_at: None,
1913 message_count: 3,
1914 user_message_count: 0,
1915 task_count: 2,
1916 event_count: 6,
1917 duration_seconds: 0,
1918 total_input_tokens: 0,
1919 total_output_tokens: 0,
1920 git_remote: None,
1921 git_branch: None,
1922 git_commit: None,
1923 git_repo_name: None,
1924 pr_number: None,
1925 pr_url: None,
1926 working_directory: None,
1927 files_modified: None,
1928 files_read: None,
1929 has_errors: false,
1930 max_active_agents: 1,
1931 };
1932
1933 assert!(is_opencode_child_session(&child));
1934 }
1935
1936 #[test]
1937 fn test_parse_opencode_parent_session_id_aliases() {
1938 let root = temp_root();
1939 let dir = root.path().join("session-aliases");
1940 create_dir_all(&dir).unwrap();
1941 let child_session = dir.join("child.json");
1942 write(
1943 &child_session,
1944 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1945 )
1946 .unwrap();
1947 assert_eq!(
1948 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1949 Some("ses_parent")
1950 );
1951 }
1952
1953 #[test]
1954 fn test_parse_opencode_parent_session_id_nested_metadata() {
1955 let root = temp_root();
1956 let dir = root.path().join("session-nested");
1957 create_dir_all(&dir).unwrap();
1958 let child_session = dir.join("child.json");
1959 write(
1960 &child_session,
1961 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1962 )
1963 .unwrap();
1964 assert_eq!(
1965 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1966 Some("ses_parent")
1967 );
1968 }
1969
1970 #[test]
1971 fn test_is_claude_subagent_session() {
1972 let row = make_row(
1973 "ses_parent",
1974 "claude-code",
1975 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1976 );
1977 assert!(!is_opencode_child_session(&row));
1978 assert!(is_claude_subagent_session(&row));
1979 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1980 }
1981
1982 #[test]
1983 fn test_hide_opencode_child_sessions() {
1984 let root = temp_root();
1985 let dir = root.path().join("sessions");
1986 create_dir_all(&dir).unwrap();
1987 let parent_session = dir.join("parent.json");
1988 let child_session = dir.join("child.json");
1989 let orphan_session = dir.join("orphan.json");
1990
1991 write(
1992 &parent_session,
1993 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1994 )
1995 .unwrap();
1996 write(
1997 &child_session,
1998 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1999 )
2000 .unwrap();
2001 write(
2002 &orphan_session,
2003 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
2004 )
2005 .unwrap();
2006
2007 let rows = vec![
2008 make_row(
2009 "ses_child",
2010 "opencode",
2011 Some(child_session.to_str().unwrap()),
2012 ),
2013 make_row(
2014 "ses_parent",
2015 "opencode",
2016 Some(parent_session.to_str().unwrap()),
2017 ),
2018 {
2019 let mut row = make_row("ses_other", "codex", None);
2020 row.user_message_count = 1;
2021 row
2022 },
2023 make_row(
2024 "ses_orphan",
2025 "opencode",
2026 Some(orphan_session.to_str().unwrap()),
2027 ),
2028 ];
2029
2030 let filtered = hide_opencode_child_sessions(rows);
2031 assert_eq!(filtered.len(), 3);
2032 assert!(filtered.iter().all(|r| r.id != "ses_child"));
2033 }
2034
2035 #[test]
2036 fn test_sync_cursor() {
2037 let db = test_db();
2038 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
2039 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
2040 assert_eq!(
2041 db.get_sync_cursor("team1").unwrap(),
2042 Some("2024-01-01T00:00:00Z".to_string())
2043 );
2044 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
2046 assert_eq!(
2047 db.get_sync_cursor("team1").unwrap(),
2048 Some("2024-06-01T00:00:00Z".to_string())
2049 );
2050 }
2051
2052 #[test]
2053 fn test_body_cache() {
2054 let db = test_db();
2055 assert_eq!(db.get_cached_body("s1").unwrap(), None);
2056 db.cache_body("s1", b"hello world").unwrap();
2057 assert_eq!(
2058 db.get_cached_body("s1").unwrap(),
2059 Some(b"hello world".to_vec())
2060 );
2061 }
2062
2063 #[test]
2064 fn test_local_migration_files_match_api_local_migrations() {
2065 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2066 std::fs::read_dir(dir)
2067 .expect("read migrations directory")
2068 .filter_map(Result::ok)
2069 .map(|entry| entry.file_name().to_string_lossy().to_string())
2070 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2071 .collect()
2072 }
2073
2074 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2075 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2076 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2077
2078 assert_eq!(
2079 local_files, api_files,
2080 "local-db local migrations must stay in parity with api local migrations"
2081 );
2082 }
2083
2084 #[test]
2085 fn test_upsert_remote_session() {
2086 let db = test_db();
2087 let summary = RemoteSessionSummary {
2088 id: "remote-1".to_string(),
2089 user_id: Some("u1".to_string()),
2090 nickname: Some("alice".to_string()),
2091 team_id: "t1".to_string(),
2092 tool: "claude-code".to_string(),
2093 agent_provider: None,
2094 agent_model: None,
2095 title: Some("Test session".to_string()),
2096 description: None,
2097 tags: None,
2098 created_at: "2024-01-01T00:00:00Z".to_string(),
2099 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2100 message_count: 10,
2101 task_count: 2,
2102 event_count: 20,
2103 duration_seconds: 300,
2104 total_input_tokens: 1000,
2105 total_output_tokens: 500,
2106 git_remote: None,
2107 git_branch: None,
2108 git_commit: None,
2109 git_repo_name: None,
2110 pr_number: None,
2111 pr_url: None,
2112 working_directory: None,
2113 files_modified: None,
2114 files_read: None,
2115 has_errors: false,
2116 max_active_agents: 1,
2117 };
2118 db.upsert_remote_session(&summary).unwrap();
2119
2120 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2121 assert_eq!(sessions.len(), 1);
2122 assert_eq!(sessions[0].id, "remote-1");
2123 assert_eq!(sessions[0].sync_status, "remote_only");
2124 assert_eq!(sessions[0].nickname, None); }
2126
2127 #[test]
2128 fn test_list_filter_by_repo() {
2129 let db = test_db();
2130 let summary1 = RemoteSessionSummary {
2132 id: "s1".to_string(),
2133 user_id: None,
2134 nickname: None,
2135 team_id: "t1".to_string(),
2136 tool: "claude-code".to_string(),
2137 agent_provider: None,
2138 agent_model: None,
2139 title: Some("Session 1".to_string()),
2140 description: None,
2141 tags: None,
2142 created_at: "2024-01-01T00:00:00Z".to_string(),
2143 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2144 message_count: 5,
2145 task_count: 0,
2146 event_count: 10,
2147 duration_seconds: 60,
2148 total_input_tokens: 100,
2149 total_output_tokens: 50,
2150 git_remote: None,
2151 git_branch: None,
2152 git_commit: None,
2153 git_repo_name: None,
2154 pr_number: None,
2155 pr_url: None,
2156 working_directory: None,
2157 files_modified: None,
2158 files_read: None,
2159 has_errors: false,
2160 max_active_agents: 1,
2161 };
2162 db.upsert_remote_session(&summary1).unwrap();
2163
2164 let filter = LocalSessionFilter {
2166 team_id: Some("t1".to_string()),
2167 ..Default::default()
2168 };
2169 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2170
2171 let filter = LocalSessionFilter {
2172 team_id: Some("t999".to_string()),
2173 ..Default::default()
2174 };
2175 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2176 }
2177
2178 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2181 RemoteSessionSummary {
2182 id: id.to_string(),
2183 user_id: None,
2184 nickname: None,
2185 team_id: "t1".to_string(),
2186 tool: tool.to_string(),
2187 agent_provider: Some("anthropic".to_string()),
2188 agent_model: Some("claude-opus-4-6".to_string()),
2189 title: Some(title.to_string()),
2190 description: None,
2191 tags: None,
2192 created_at: created_at.to_string(),
2193 uploaded_at: created_at.to_string(),
2194 message_count: 5,
2195 task_count: 1,
2196 event_count: 10,
2197 duration_seconds: 300,
2198 total_input_tokens: 1000,
2199 total_output_tokens: 500,
2200 git_remote: None,
2201 git_branch: None,
2202 git_commit: None,
2203 git_repo_name: None,
2204 pr_number: None,
2205 pr_url: None,
2206 working_directory: None,
2207 files_modified: None,
2208 files_read: None,
2209 has_errors: false,
2210 max_active_agents: 1,
2211 }
2212 }
2213
2214 fn seed_sessions(db: &LocalDb) {
2215 db.upsert_remote_session(&make_summary(
2217 "s1",
2218 "claude-code",
2219 "First session",
2220 "2024-01-01T00:00:00Z",
2221 ))
2222 .unwrap();
2223 db.upsert_remote_session(&make_summary(
2224 "s2",
2225 "claude-code",
2226 "JWT auth work",
2227 "2024-01-02T00:00:00Z",
2228 ))
2229 .unwrap();
2230 db.upsert_remote_session(&make_summary(
2231 "s3",
2232 "gemini",
2233 "Gemini test",
2234 "2024-01-03T00:00:00Z",
2235 ))
2236 .unwrap();
2237 db.upsert_remote_session(&make_summary(
2238 "s4",
2239 "claude-code",
2240 "Error handling",
2241 "2024-01-04T00:00:00Z",
2242 ))
2243 .unwrap();
2244 db.upsert_remote_session(&make_summary(
2245 "s5",
2246 "claude-code",
2247 "Final polish",
2248 "2024-01-05T00:00:00Z",
2249 ))
2250 .unwrap();
2251 }
2252
2253 #[test]
2256 fn test_log_no_filters() {
2257 let db = test_db();
2258 seed_sessions(&db);
2259 let filter = LogFilter::default();
2260 let results = db.list_sessions_log(&filter).unwrap();
2261 assert_eq!(results.len(), 5);
2262 assert_eq!(results[0].id, "s5");
2264 assert_eq!(results[4].id, "s1");
2265 }
2266
2267 #[test]
2268 fn test_log_filter_by_tool() {
2269 let db = test_db();
2270 seed_sessions(&db);
2271 let filter = LogFilter {
2272 tool: Some("claude-code".to_string()),
2273 ..Default::default()
2274 };
2275 let results = db.list_sessions_log(&filter).unwrap();
2276 assert_eq!(results.len(), 4);
2277 assert!(results.iter().all(|s| s.tool == "claude-code"));
2278 }
2279
2280 #[test]
2281 fn test_log_filter_by_model_wildcard() {
2282 let db = test_db();
2283 seed_sessions(&db);
2284 let filter = LogFilter {
2285 model: Some("claude*".to_string()),
2286 ..Default::default()
2287 };
2288 let results = db.list_sessions_log(&filter).unwrap();
2289 assert_eq!(results.len(), 5); }
2291
2292 #[test]
2293 fn test_log_filter_since() {
2294 let db = test_db();
2295 seed_sessions(&db);
2296 let filter = LogFilter {
2297 since: Some("2024-01-03T00:00:00Z".to_string()),
2298 ..Default::default()
2299 };
2300 let results = db.list_sessions_log(&filter).unwrap();
2301 assert_eq!(results.len(), 3); }
2303
2304 #[test]
2305 fn test_log_filter_before() {
2306 let db = test_db();
2307 seed_sessions(&db);
2308 let filter = LogFilter {
2309 before: Some("2024-01-03T00:00:00Z".to_string()),
2310 ..Default::default()
2311 };
2312 let results = db.list_sessions_log(&filter).unwrap();
2313 assert_eq!(results.len(), 2); }
2315
2316 #[test]
2317 fn test_log_filter_since_and_before() {
2318 let db = test_db();
2319 seed_sessions(&db);
2320 let filter = LogFilter {
2321 since: Some("2024-01-02T00:00:00Z".to_string()),
2322 before: Some("2024-01-04T00:00:00Z".to_string()),
2323 ..Default::default()
2324 };
2325 let results = db.list_sessions_log(&filter).unwrap();
2326 assert_eq!(results.len(), 2); }
2328
2329 #[test]
2330 fn test_log_filter_grep() {
2331 let db = test_db();
2332 seed_sessions(&db);
2333 let filter = LogFilter {
2334 grep: Some("JWT".to_string()),
2335 ..Default::default()
2336 };
2337 let results = db.list_sessions_log(&filter).unwrap();
2338 assert_eq!(results.len(), 1);
2339 assert_eq!(results[0].id, "s2");
2340 }
2341
2342 #[test]
2343 fn test_log_limit_and_offset() {
2344 let db = test_db();
2345 seed_sessions(&db);
2346 let filter = LogFilter {
2347 limit: Some(2),
2348 offset: Some(1),
2349 ..Default::default()
2350 };
2351 let results = db.list_sessions_log(&filter).unwrap();
2352 assert_eq!(results.len(), 2);
2353 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2355 }
2356
2357 #[test]
2358 fn test_log_limit_only() {
2359 let db = test_db();
2360 seed_sessions(&db);
2361 let filter = LogFilter {
2362 limit: Some(3),
2363 ..Default::default()
2364 };
2365 let results = db.list_sessions_log(&filter).unwrap();
2366 assert_eq!(results.len(), 3);
2367 }
2368
2369 #[test]
2370 fn test_list_sessions_limit_offset() {
2371 let db = test_db();
2372 seed_sessions(&db);
2373 let filter = LocalSessionFilter {
2374 limit: Some(2),
2375 offset: Some(1),
2376 ..Default::default()
2377 };
2378 let results = db.list_sessions(&filter).unwrap();
2379 assert_eq!(results.len(), 2);
2380 assert_eq!(results[0].id, "s4");
2381 assert_eq!(results[1].id, "s3");
2382 }
2383
2384 #[test]
2385 fn test_count_sessions_filtered() {
2386 let db = test_db();
2387 seed_sessions(&db);
2388 let count = db
2389 .count_sessions_filtered(&LocalSessionFilter::default())
2390 .unwrap();
2391 assert_eq!(count, 5);
2392 }
2393
2394 #[test]
2395 fn test_list_working_directories_distinct_non_empty() {
2396 let db = test_db();
2397
2398 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2399 a.working_directory = Some("/tmp/repo-a".to_string());
2400 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2401 b.working_directory = Some("/tmp/repo-a".to_string());
2402 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2403 c.working_directory = Some("/tmp/repo-b".to_string());
2404 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2405 d.working_directory = Some("".to_string());
2406
2407 db.upsert_remote_session(&a).unwrap();
2408 db.upsert_remote_session(&b).unwrap();
2409 db.upsert_remote_session(&c).unwrap();
2410 db.upsert_remote_session(&d).unwrap();
2411
2412 let dirs = db.list_working_directories().unwrap();
2413 assert_eq!(
2414 dirs,
2415 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2416 );
2417 }
2418
2419 #[test]
2420 fn test_list_session_tools() {
2421 let db = test_db();
2422 seed_sessions(&db);
2423 let tools = db
2424 .list_session_tools(&LocalSessionFilter::default())
2425 .unwrap();
2426 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2427 }
2428
2429 #[test]
2430 fn test_log_combined_filters() {
2431 let db = test_db();
2432 seed_sessions(&db);
2433 let filter = LogFilter {
2434 tool: Some("claude-code".to_string()),
2435 since: Some("2024-01-03T00:00:00Z".to_string()),
2436 limit: Some(1),
2437 ..Default::default()
2438 };
2439 let results = db.list_sessions_log(&filter).unwrap();
2440 assert_eq!(results.len(), 1);
2441 assert_eq!(results[0].id, "s5"); }
2443
2444 #[test]
2447 fn test_get_session_by_offset() {
2448 let db = test_db();
2449 seed_sessions(&db);
2450 let row = db.get_session_by_offset(0).unwrap().unwrap();
2451 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2453 assert_eq!(row.id, "s3");
2454 assert!(db.get_session_by_offset(10).unwrap().is_none());
2455 }
2456
2457 #[test]
2458 fn test_get_session_by_tool_offset() {
2459 let db = test_db();
2460 seed_sessions(&db);
2461 let row = db
2462 .get_session_by_tool_offset("claude-code", 0)
2463 .unwrap()
2464 .unwrap();
2465 assert_eq!(row.id, "s5");
2466 let row = db
2467 .get_session_by_tool_offset("claude-code", 1)
2468 .unwrap()
2469 .unwrap();
2470 assert_eq!(row.id, "s4");
2471 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2472 assert_eq!(row.id, "s3");
2473 assert!(db
2474 .get_session_by_tool_offset("gemini", 1)
2475 .unwrap()
2476 .is_none());
2477 }
2478
2479 #[test]
2480 fn test_get_sessions_latest() {
2481 let db = test_db();
2482 seed_sessions(&db);
2483 let rows = db.get_sessions_latest(3).unwrap();
2484 assert_eq!(rows.len(), 3);
2485 assert_eq!(rows[0].id, "s5");
2486 assert_eq!(rows[1].id, "s4");
2487 assert_eq!(rows[2].id, "s3");
2488 }
2489
2490 #[test]
2491 fn test_get_sessions_by_tool_latest() {
2492 let db = test_db();
2493 seed_sessions(&db);
2494 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2495 assert_eq!(rows.len(), 2);
2496 assert_eq!(rows[0].id, "s5");
2497 assert_eq!(rows[1].id, "s4");
2498 }
2499
2500 #[test]
2501 fn test_get_sessions_latest_more_than_available() {
2502 let db = test_db();
2503 seed_sessions(&db);
2504 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2505 assert_eq!(rows.len(), 1); }
2507
2508 #[test]
2509 fn test_session_count() {
2510 let db = test_db();
2511 assert_eq!(db.session_count().unwrap(), 0);
2512 seed_sessions(&db);
2513 assert_eq!(db.session_count().unwrap(), 5);
2514 }
2515
2516 #[test]
2519 fn test_link_commit_session() {
2520 let db = test_db();
2521 seed_sessions(&db);
2522 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2523 .unwrap();
2524
2525 let commits = db.get_commits_by_session("s1").unwrap();
2526 assert_eq!(commits.len(), 1);
2527 assert_eq!(commits[0].commit_hash, "abc123");
2528 assert_eq!(commits[0].session_id, "s1");
2529 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2530 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2531
2532 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2533 assert_eq!(sessions.len(), 1);
2534 assert_eq!(sessions[0].id, "s1");
2535 }
2536
2537 #[test]
2538 fn test_get_sessions_by_commit() {
2539 let db = test_db();
2540 seed_sessions(&db);
2541 db.link_commit_session("abc123", "s1", None, None).unwrap();
2543 db.link_commit_session("abc123", "s2", None, None).unwrap();
2544 db.link_commit_session("abc123", "s3", None, None).unwrap();
2545
2546 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2547 assert_eq!(sessions.len(), 3);
2548 assert_eq!(sessions[0].id, "s3");
2550 assert_eq!(sessions[1].id, "s2");
2551 assert_eq!(sessions[2].id, "s1");
2552 }
2553
2554 #[test]
2555 fn test_get_commits_by_session() {
2556 let db = test_db();
2557 seed_sessions(&db);
2558 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2560 .unwrap();
2561 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2562 .unwrap();
2563 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2564 .unwrap();
2565
2566 let commits = db.get_commits_by_session("s1").unwrap();
2567 assert_eq!(commits.len(), 3);
2568 assert!(commits.iter().all(|c| c.session_id == "s1"));
2570 }
2571
2572 #[test]
2573 fn test_duplicate_link_ignored() {
2574 let db = test_db();
2575 seed_sessions(&db);
2576 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2577 .unwrap();
2578 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2580 .unwrap();
2581
2582 let commits = db.get_commits_by_session("s1").unwrap();
2583 assert_eq!(commits.len(), 1);
2584 }
2585
2586 #[test]
2587 fn test_log_filter_by_commit() {
2588 let db = test_db();
2589 seed_sessions(&db);
2590 db.link_commit_session("abc123", "s2", None, None).unwrap();
2591 db.link_commit_session("abc123", "s4", None, None).unwrap();
2592
2593 let filter = LogFilter {
2594 commit: Some("abc123".to_string()),
2595 ..Default::default()
2596 };
2597 let results = db.list_sessions_log(&filter).unwrap();
2598 assert_eq!(results.len(), 2);
2599 assert_eq!(results[0].id, "s4");
2600 assert_eq!(results[1].id, "s2");
2601
2602 let filter = LogFilter {
2604 commit: Some("nonexistent".to_string()),
2605 ..Default::default()
2606 };
2607 let results = db.list_sessions_log(&filter).unwrap();
2608 assert_eq!(results.len(), 0);
2609 }
2610}