1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0002_team_invite_keys",
21 include_str!("../migrations/0002_team_invite_keys.sql"),
22 ),
23 (
24 "0003_max_active_agents",
25 include_str!("../migrations/0003_max_active_agents.sql"),
26 ),
27 (
28 "0004_oauth_states_provider",
29 include_str!("../migrations/0004_oauth_states_provider.sql"),
30 ),
31 (
32 "0005_sessions_body_url_backfill",
33 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34 ),
35 (
36 "0006_sessions_remove_fk_constraints",
37 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38 ),
39 (
40 "0007_sessions_list_perf_indexes",
41 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42 ),
43 (
44 "0008_teams_force_public",
45 include_str!("../migrations/0008_teams_force_public.sql"),
46 ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50 (
51 "local_0001_schema",
52 include_str!("../migrations/local_0001_schema.sql"),
53 ),
54 (
55 "local_0002_drop_unused_local_sessions",
56 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57 ),
58 (
59 "local_0003_timeline_summary_cache",
60 include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
61 ),
62];
63
64#[derive(Debug, Clone)]
66pub struct LocalSessionRow {
67 pub id: String,
68 pub source_path: Option<String>,
69 pub sync_status: String,
70 pub last_synced_at: Option<String>,
71 pub user_id: Option<String>,
72 pub nickname: Option<String>,
73 pub team_id: Option<String>,
74 pub tool: String,
75 pub agent_provider: Option<String>,
76 pub agent_model: Option<String>,
77 pub title: Option<String>,
78 pub description: Option<String>,
79 pub tags: Option<String>,
80 pub created_at: String,
81 pub uploaded_at: Option<String>,
82 pub message_count: i64,
83 pub user_message_count: i64,
84 pub task_count: i64,
85 pub event_count: i64,
86 pub duration_seconds: i64,
87 pub total_input_tokens: i64,
88 pub total_output_tokens: i64,
89 pub git_remote: Option<String>,
90 pub git_branch: Option<String>,
91 pub git_commit: Option<String>,
92 pub git_repo_name: Option<String>,
93 pub pr_number: Option<i64>,
94 pub pr_url: Option<String>,
95 pub working_directory: Option<String>,
96 pub files_modified: Option<String>,
97 pub files_read: Option<String>,
98 pub has_errors: bool,
99 pub max_active_agents: i64,
100}
101
102#[derive(Debug, Clone)]
104pub struct CommitLink {
105 pub commit_hash: String,
106 pub session_id: String,
107 pub repo_path: Option<String>,
108 pub branch: Option<String>,
109 pub created_at: String,
110}
111
112#[derive(Debug, Clone)]
114pub struct TimelineSummaryCacheRow {
115 pub lookup_key: String,
116 pub namespace: String,
117 pub compact: String,
118 pub payload: String,
119 pub raw: String,
120 pub cached_at: String,
121}
122
123pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
125 if row.tool != "opencode" {
126 return false;
127 }
128
129 if row.user_message_count <= 0
133 && row.message_count <= 4
134 && row.task_count <= 4
135 && row.event_count > 0
136 && row.event_count <= 16
137 {
138 return true;
139 }
140
141 if is_opencode_subagent_source(row.source_path.as_deref()) {
142 return true;
143 }
144
145 let source_path = match row.source_path.as_deref() {
146 Some(path) if !path.trim().is_empty() => path,
147 _ => return false,
148 };
149
150 parse_opencode_parent_session_id(source_path)
151 .is_some_and(|parent_id| !parent_id.trim().is_empty())
152}
153
154pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
156 let text = fs::read_to_string(source_path).ok()?;
157 let json: Value = serde_json::from_str(&text).ok()?;
158 lookup_parent_session_id(&json)
159}
160
161fn lookup_parent_session_id(value: &Value) -> Option<String> {
162 match value {
163 Value::Object(obj) => {
164 for (key, value) in obj {
165 if is_parent_id_key(key) {
166 if let Some(parent_id) = value.as_str() {
167 let parent_id = parent_id.trim();
168 if !parent_id.is_empty() {
169 return Some(parent_id.to_string());
170 }
171 }
172 }
173 if let Some(parent_id) = lookup_parent_session_id(value) {
174 return Some(parent_id);
175 }
176 }
177 None
178 }
179 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
180 _ => None,
181 }
182}
183
184fn is_parent_id_key(key: &str) -> bool {
185 let flat = key
186 .chars()
187 .filter(|c| c.is_ascii_alphanumeric())
188 .map(|c| c.to_ascii_lowercase())
189 .collect::<String>();
190
191 flat == "parentid"
192 || flat == "parentuuid"
193 || flat == "parentsessionid"
194 || flat == "parentsessionuuid"
195 || flat.ends_with("parentsessionid")
196 || (flat.contains("parent") && flat.ends_with("id"))
197 || (flat.contains("parent") && flat.ends_with("uuid"))
198}
199
200pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
202 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
203 rows
204}
205
206fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
207 is_subagent_source(source_path)
208}
209
210fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
211 if row.tool != "claude-code" {
212 return false;
213 }
214
215 is_subagent_source(row.source_path.as_deref())
216}
217
218fn is_subagent_source(source_path: Option<&str>) -> bool {
219 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
220 return false;
221 };
222
223 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
224 return true;
225 }
226
227 let filename = match std::path::Path::new(&source_path).file_name() {
228 Some(name) => name.to_string_lossy(),
229 None => return false,
230 };
231
232 filename.starts_with("agent-")
233 || filename.starts_with("agent_")
234 || filename.starts_with("subagent-")
235 || filename.starts_with("subagent_")
236}
237
238#[derive(Debug, Clone)]
240pub struct LocalSessionFilter {
241 pub team_id: Option<String>,
242 pub sync_status: Option<String>,
243 pub git_repo_name: Option<String>,
244 pub search: Option<String>,
245 pub tool: Option<String>,
246 pub sort: LocalSortOrder,
247 pub time_range: LocalTimeRange,
248 pub limit: Option<u32>,
249 pub offset: Option<u32>,
250}
251
252impl Default for LocalSessionFilter {
253 fn default() -> Self {
254 Self {
255 team_id: None,
256 sync_status: None,
257 git_repo_name: None,
258 search: None,
259 tool: None,
260 sort: LocalSortOrder::Recent,
261 time_range: LocalTimeRange::All,
262 limit: None,
263 offset: None,
264 }
265 }
266}
267
268#[derive(Debug, Clone, Default, PartialEq, Eq)]
270pub enum LocalSortOrder {
271 #[default]
272 Recent,
273 Popular,
274 Longest,
275}
276
277#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub enum LocalTimeRange {
280 Hours24,
281 Days7,
282 Days30,
283 #[default]
284 All,
285}
286
287#[derive(Debug, Clone)]
289pub struct RemoteSessionSummary {
290 pub id: String,
291 pub user_id: Option<String>,
292 pub nickname: Option<String>,
293 pub team_id: String,
294 pub tool: String,
295 pub agent_provider: Option<String>,
296 pub agent_model: Option<String>,
297 pub title: Option<String>,
298 pub description: Option<String>,
299 pub tags: Option<String>,
300 pub created_at: String,
301 pub uploaded_at: String,
302 pub message_count: i64,
303 pub task_count: i64,
304 pub event_count: i64,
305 pub duration_seconds: i64,
306 pub total_input_tokens: i64,
307 pub total_output_tokens: i64,
308 pub git_remote: Option<String>,
309 pub git_branch: Option<String>,
310 pub git_commit: Option<String>,
311 pub git_repo_name: Option<String>,
312 pub pr_number: Option<i64>,
313 pub pr_url: Option<String>,
314 pub working_directory: Option<String>,
315 pub files_modified: Option<String>,
316 pub files_read: Option<String>,
317 pub has_errors: bool,
318 pub max_active_agents: i64,
319}
320
321#[derive(Debug, Default)]
323pub struct LogFilter {
324 pub tool: Option<String>,
326 pub model: Option<String>,
328 pub since: Option<String>,
330 pub before: Option<String>,
332 pub touches: Option<String>,
334 pub grep: Option<String>,
336 pub has_errors: Option<bool>,
338 pub working_directory: Option<String>,
340 pub git_repo_name: Option<String>,
342 pub commit: Option<String>,
344 pub limit: Option<u32>,
346 pub offset: Option<u32>,
348}
349
350const FROM_CLAUSE: &str = "\
352FROM sessions s \
353LEFT JOIN session_sync ss ON ss.session_id = s.id \
354LEFT JOIN users u ON u.id = s.user_id";
355
356pub struct LocalDb {
359 conn: Mutex<Connection>,
360}
361
362impl LocalDb {
363 pub fn open() -> Result<Self> {
366 let path = default_db_path()?;
367 Self::open_path(&path)
368 }
369
370 pub fn open_path(path: &PathBuf) -> Result<Self> {
372 if let Some(parent) = path.parent() {
373 std::fs::create_dir_all(parent)
374 .with_context(|| format!("create dir for {}", path.display()))?;
375 }
376 match open_connection_with_latest_schema(path) {
377 Ok(conn) => Ok(Self {
378 conn: Mutex::new(conn),
379 }),
380 Err(err) => {
381 if !is_schema_compat_error(&err) {
382 return Err(err);
383 }
384
385 rotate_legacy_db(path)?;
388
389 let conn = open_connection_with_latest_schema(path)
390 .with_context(|| format!("recreate db {}", path.display()))?;
391 Ok(Self {
392 conn: Mutex::new(conn),
393 })
394 }
395 }
396 }
397
398 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
399 self.conn.lock().expect("local db mutex poisoned")
400 }
401
402 pub fn upsert_local_session(
405 &self,
406 session: &Session,
407 source_path: &str,
408 git: &GitContext,
409 ) -> Result<()> {
410 let title = session.context.title.as_deref();
411 let description = session.context.description.as_deref();
412 let tags = if session.context.tags.is_empty() {
413 None
414 } else {
415 Some(session.context.tags.join(","))
416 };
417 let created_at = session.context.created_at.to_rfc3339();
418 let cwd = session
419 .context
420 .attributes
421 .get("cwd")
422 .or_else(|| session.context.attributes.get("working_directory"))
423 .and_then(|v| v.as_str().map(String::from));
424
425 let (files_modified, files_read, has_errors) =
427 opensession_core::extract::extract_file_metadata(session);
428 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
429
430 let conn = self.conn();
431 conn.execute(
432 "INSERT INTO sessions \
433 (id, team_id, tool, agent_provider, agent_model, \
434 title, description, tags, created_at, \
435 message_count, user_message_count, task_count, event_count, duration_seconds, \
436 total_input_tokens, total_output_tokens, body_storage_key, \
437 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
438 files_modified, files_read, has_errors, max_active_agents) \
439 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
440 ON CONFLICT(id) DO UPDATE SET \
441 tool=excluded.tool, agent_provider=excluded.agent_provider, \
442 agent_model=excluded.agent_model, \
443 title=excluded.title, description=excluded.description, \
444 tags=excluded.tags, \
445 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
446 task_count=excluded.task_count, \
447 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
448 total_input_tokens=excluded.total_input_tokens, \
449 total_output_tokens=excluded.total_output_tokens, \
450 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
451 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
452 working_directory=excluded.working_directory, \
453 files_modified=excluded.files_modified, files_read=excluded.files_read, \
454 has_errors=excluded.has_errors, \
455 max_active_agents=excluded.max_active_agents",
456 params![
457 &session.session_id,
458 &session.agent.tool,
459 &session.agent.provider,
460 &session.agent.model,
461 title,
462 description,
463 &tags,
464 &created_at,
465 session.stats.message_count as i64,
466 session.stats.user_message_count as i64,
467 session.stats.task_count as i64,
468 session.stats.event_count as i64,
469 session.stats.duration_seconds as i64,
470 session.stats.total_input_tokens as i64,
471 session.stats.total_output_tokens as i64,
472 &git.remote,
473 &git.branch,
474 &git.commit,
475 &git.repo_name,
476 &cwd,
477 &files_modified,
478 &files_read,
479 has_errors,
480 max_active_agents,
481 ],
482 )?;
483
484 conn.execute(
485 "INSERT INTO session_sync (session_id, source_path, sync_status) \
486 VALUES (?1, ?2, 'local_only') \
487 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
488 params![&session.session_id, source_path],
489 )?;
490 Ok(())
491 }
492
493 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
496 let conn = self.conn();
497 conn.execute(
498 "INSERT INTO sessions \
499 (id, user_id, team_id, tool, agent_provider, agent_model, \
500 title, description, tags, created_at, uploaded_at, \
501 message_count, task_count, event_count, duration_seconds, \
502 total_input_tokens, total_output_tokens, body_storage_key, \
503 git_remote, git_branch, git_commit, git_repo_name, \
504 pr_number, pr_url, working_directory, \
505 files_modified, files_read, has_errors, max_active_agents) \
506 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
507 ON CONFLICT(id) DO UPDATE SET \
508 title=excluded.title, description=excluded.description, \
509 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
510 message_count=excluded.message_count, task_count=excluded.task_count, \
511 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
512 total_input_tokens=excluded.total_input_tokens, \
513 total_output_tokens=excluded.total_output_tokens, \
514 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
515 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
516 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
517 working_directory=excluded.working_directory, \
518 files_modified=excluded.files_modified, files_read=excluded.files_read, \
519 has_errors=excluded.has_errors, \
520 max_active_agents=excluded.max_active_agents",
521 params![
522 &summary.id,
523 &summary.user_id,
524 &summary.team_id,
525 &summary.tool,
526 &summary.agent_provider,
527 &summary.agent_model,
528 &summary.title,
529 &summary.description,
530 &summary.tags,
531 &summary.created_at,
532 &summary.uploaded_at,
533 summary.message_count,
534 summary.task_count,
535 summary.event_count,
536 summary.duration_seconds,
537 summary.total_input_tokens,
538 summary.total_output_tokens,
539 &summary.git_remote,
540 &summary.git_branch,
541 &summary.git_commit,
542 &summary.git_repo_name,
543 summary.pr_number,
544 &summary.pr_url,
545 &summary.working_directory,
546 &summary.files_modified,
547 &summary.files_read,
548 summary.has_errors,
549 summary.max_active_agents,
550 ],
551 )?;
552
553 conn.execute(
554 "INSERT INTO session_sync (session_id, sync_status) \
555 VALUES (?1, 'remote_only') \
556 ON CONFLICT(session_id) DO UPDATE SET \
557 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
558 params![&summary.id],
559 )?;
560 Ok(())
561 }
562
563 fn build_local_session_where_clause(
566 filter: &LocalSessionFilter,
567 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
568 let mut where_clauses = vec![
569 "1=1".to_string(),
570 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
571 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
572 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
573 ];
574 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
575 let mut idx = 1u32;
576
577 if let Some(ref team_id) = filter.team_id {
578 where_clauses.push(format!("s.team_id = ?{idx}"));
579 param_values.push(Box::new(team_id.clone()));
580 idx += 1;
581 }
582
583 if let Some(ref sync_status) = filter.sync_status {
584 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
585 param_values.push(Box::new(sync_status.clone()));
586 idx += 1;
587 }
588
589 if let Some(ref repo) = filter.git_repo_name {
590 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
591 param_values.push(Box::new(repo.clone()));
592 idx += 1;
593 }
594
595 if let Some(ref tool) = filter.tool {
596 where_clauses.push(format!("s.tool = ?{idx}"));
597 param_values.push(Box::new(tool.clone()));
598 idx += 1;
599 }
600
601 if let Some(ref search) = filter.search {
602 let like = format!("%{search}%");
603 where_clauses.push(format!(
604 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
605 i1 = idx,
606 i2 = idx + 1,
607 i3 = idx + 2,
608 ));
609 param_values.push(Box::new(like.clone()));
610 param_values.push(Box::new(like.clone()));
611 param_values.push(Box::new(like));
612 idx += 3;
613 }
614
615 let interval = match filter.time_range {
616 LocalTimeRange::Hours24 => Some("-1 day"),
617 LocalTimeRange::Days7 => Some("-7 days"),
618 LocalTimeRange::Days30 => Some("-30 days"),
619 LocalTimeRange::All => None,
620 };
621 if let Some(interval) = interval {
622 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
623 param_values.push(Box::new(interval.to_string()));
624 }
625
626 (where_clauses.join(" AND "), param_values)
627 }
628
629 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
630 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
631 let order_clause = match filter.sort {
632 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
633 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
634 LocalSortOrder::Recent => "s.created_at DESC",
635 };
636
637 let mut sql = format!(
638 "SELECT {LOCAL_SESSION_COLUMNS} \
639 {FROM_CLAUSE} WHERE {where_str} \
640 ORDER BY {order_clause}"
641 );
642
643 if let Some(limit) = filter.limit {
644 sql.push_str(" LIMIT ?");
645 param_values.push(Box::new(limit));
646 if let Some(offset) = filter.offset {
647 sql.push_str(" OFFSET ?");
648 param_values.push(Box::new(offset));
649 }
650 }
651
652 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
653 param_values.iter().map(|p| p.as_ref()).collect();
654 let conn = self.conn();
655 let mut stmt = conn.prepare(&sql)?;
656 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
657
658 let mut result = Vec::new();
659 for row in rows {
660 result.push(row?);
661 }
662
663 Ok(hide_opencode_child_sessions(result))
664 }
665
666 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
668 let mut count_filter = filter.clone();
669 count_filter.limit = None;
670 count_filter.offset = None;
671 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
672 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
673 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
674 param_values.iter().map(|p| p.as_ref()).collect();
675 let conn = self.conn();
676 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
677 Ok(count)
678 }
679
680 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
682 let mut tool_filter = filter.clone();
683 tool_filter.tool = None;
684 tool_filter.limit = None;
685 tool_filter.offset = None;
686 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
687 let sql = format!(
688 "SELECT DISTINCT s.tool \
689 {FROM_CLAUSE} WHERE {where_str} \
690 ORDER BY s.tool ASC"
691 );
692 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
693 param_values.iter().map(|p| p.as_ref()).collect();
694 let conn = self.conn();
695 let mut stmt = conn.prepare(&sql)?;
696 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
697
698 let mut tools = Vec::new();
699 for row in rows {
700 let tool = row?;
701 if !tool.trim().is_empty() {
702 tools.push(tool);
703 }
704 }
705 Ok(tools)
706 }
707
708 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
712 let mut where_clauses = vec!["1=1".to_string()];
713 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
714 let mut idx = 1u32;
715
716 if let Some(ref tool) = filter.tool {
717 where_clauses.push(format!("s.tool = ?{idx}"));
718 param_values.push(Box::new(tool.clone()));
719 idx += 1;
720 }
721
722 if let Some(ref model) = filter.model {
723 let like = model.replace('*', "%");
724 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
725 param_values.push(Box::new(like));
726 idx += 1;
727 }
728
729 if let Some(ref since) = filter.since {
730 where_clauses.push(format!("s.created_at >= ?{idx}"));
731 param_values.push(Box::new(since.clone()));
732 idx += 1;
733 }
734
735 if let Some(ref before) = filter.before {
736 where_clauses.push(format!("s.created_at < ?{idx}"));
737 param_values.push(Box::new(before.clone()));
738 idx += 1;
739 }
740
741 if let Some(ref touches) = filter.touches {
742 let like = format!("%\"{touches}\"%");
743 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
744 param_values.push(Box::new(like));
745 idx += 1;
746 }
747
748 if let Some(ref grep) = filter.grep {
749 let like = format!("%{grep}%");
750 where_clauses.push(format!(
751 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
752 i1 = idx,
753 i2 = idx + 1,
754 i3 = idx + 2,
755 ));
756 param_values.push(Box::new(like.clone()));
757 param_values.push(Box::new(like.clone()));
758 param_values.push(Box::new(like));
759 idx += 3;
760 }
761
762 if let Some(true) = filter.has_errors {
763 where_clauses.push("s.has_errors = 1".to_string());
764 }
765
766 if let Some(ref wd) = filter.working_directory {
767 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
768 param_values.push(Box::new(format!("{wd}%")));
769 idx += 1;
770 }
771
772 if let Some(ref repo) = filter.git_repo_name {
773 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
774 param_values.push(Box::new(repo.clone()));
775 idx += 1;
776 }
777
778 let mut extra_join = String::new();
780 if let Some(ref commit) = filter.commit {
781 extra_join =
782 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
783 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
784 param_values.push(Box::new(commit.clone()));
785 idx += 1;
786 }
787
788 let _ = idx; let where_str = where_clauses.join(" AND ");
791 let mut sql = format!(
792 "SELECT {LOCAL_SESSION_COLUMNS} \
793 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
794 ORDER BY s.created_at DESC"
795 );
796
797 if let Some(limit) = filter.limit {
798 sql.push_str(" LIMIT ?");
799 param_values.push(Box::new(limit));
800 if let Some(offset) = filter.offset {
801 sql.push_str(" OFFSET ?");
802 param_values.push(Box::new(offset));
803 }
804 }
805
806 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
807 param_values.iter().map(|p| p.as_ref()).collect();
808 let conn = self.conn();
809 let mut stmt = conn.prepare(&sql)?;
810 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
811
812 let mut result = Vec::new();
813 for row in rows {
814 result.push(row?);
815 }
816 Ok(hide_opencode_child_sessions(result))
817 }
818
819 pub fn get_sessions_by_tool_latest(
821 &self,
822 tool: &str,
823 count: u32,
824 ) -> Result<Vec<LocalSessionRow>> {
825 let sql = format!(
826 "SELECT {LOCAL_SESSION_COLUMNS} \
827 {FROM_CLAUSE} WHERE s.tool = ?1 \
828 ORDER BY s.created_at DESC"
829 );
830 let conn = self.conn();
831 let mut stmt = conn.prepare(&sql)?;
832 let rows = stmt.query_map(params![tool], row_to_local_session)?;
833 let mut result = Vec::new();
834 for row in rows {
835 result.push(row?);
836 }
837
838 let mut filtered = hide_opencode_child_sessions(result);
839 filtered.truncate(count as usize);
840 Ok(filtered)
841 }
842
843 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
845 let sql = format!(
846 "SELECT {LOCAL_SESSION_COLUMNS} \
847 {FROM_CLAUSE} \
848 ORDER BY s.created_at DESC"
849 );
850 let conn = self.conn();
851 let mut stmt = conn.prepare(&sql)?;
852 let rows = stmt.query_map([], row_to_local_session)?;
853 let mut result = Vec::new();
854 for row in rows {
855 result.push(row?);
856 }
857
858 let mut filtered = hide_opencode_child_sessions(result);
859 filtered.truncate(count as usize);
860 Ok(filtered)
861 }
862
863 pub fn get_session_by_tool_offset(
865 &self,
866 tool: &str,
867 offset: u32,
868 ) -> Result<Option<LocalSessionRow>> {
869 let sql = format!(
870 "SELECT {LOCAL_SESSION_COLUMNS} \
871 {FROM_CLAUSE} WHERE s.tool = ?1 \
872 ORDER BY s.created_at DESC"
873 );
874 let conn = self.conn();
875 let mut stmt = conn.prepare(&sql)?;
876 let rows = stmt.query_map(params![tool], row_to_local_session)?;
877 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
878 Ok(filtered.into_iter().nth(offset as usize))
879 }
880
881 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
883 let sql = format!(
884 "SELECT {LOCAL_SESSION_COLUMNS} \
885 {FROM_CLAUSE} \
886 ORDER BY s.created_at DESC"
887 );
888 let conn = self.conn();
889 let mut stmt = conn.prepare(&sql)?;
890 let rows = stmt.query_map([], row_to_local_session)?;
891 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
892 Ok(filtered.into_iter().nth(offset as usize))
893 }
894
895 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
897 let conn = self.conn();
898 let result = conn
899 .query_row(
900 "SELECT source_path FROM session_sync WHERE session_id = ?1",
901 params![session_id],
902 |row| row.get(0),
903 )
904 .optional()?;
905
906 Ok(result)
907 }
908
909 pub fn session_count(&self) -> Result<i64> {
911 let count = self
912 .conn()
913 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
914 Ok(count)
915 }
916
917 pub fn delete_session(&self, session_id: &str) -> Result<()> {
920 let conn = self.conn();
921 conn.execute(
922 "DELETE FROM body_cache WHERE session_id = ?1",
923 params![session_id],
924 )?;
925 conn.execute(
926 "DELETE FROM session_sync WHERE session_id = ?1",
927 params![session_id],
928 )?;
929 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
930 Ok(())
931 }
932
933 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
936 let cursor = self
937 .conn()
938 .query_row(
939 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
940 params![team_id],
941 |row| row.get(0),
942 )
943 .optional()?;
944 Ok(cursor)
945 }
946
947 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
948 self.conn().execute(
949 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
950 VALUES (?1, ?2, datetime('now')) \
951 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
952 params![team_id, cursor],
953 )?;
954 Ok(())
955 }
956
957 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
961 let sql = format!(
962 "SELECT {LOCAL_SESSION_COLUMNS} \
963 FROM sessions s \
964 INNER JOIN session_sync ss ON ss.session_id = s.id \
965 LEFT JOIN users u ON u.id = s.user_id \
966 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
967 ORDER BY s.created_at ASC"
968 );
969 let conn = self.conn();
970 let mut stmt = conn.prepare(&sql)?;
971 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
972 let mut result = Vec::new();
973 for row in rows {
974 result.push(row?);
975 }
976 Ok(result)
977 }
978
979 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
980 self.conn().execute(
981 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
982 WHERE session_id = ?1",
983 params![session_id],
984 )?;
985 Ok(())
986 }
987
988 pub fn was_uploaded_after(
990 &self,
991 source_path: &str,
992 modified: &chrono::DateTime<chrono::Utc>,
993 ) -> Result<bool> {
994 let result: Option<String> = self
995 .conn()
996 .query_row(
997 "SELECT last_synced_at FROM session_sync \
998 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
999 params![source_path],
1000 |row| row.get(0),
1001 )
1002 .optional()?;
1003
1004 if let Some(synced_at) = result {
1005 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1006 return Ok(dt >= *modified);
1007 }
1008 }
1009 Ok(false)
1010 }
1011
1012 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1015 self.conn().execute(
1016 "INSERT INTO body_cache (session_id, body, cached_at) \
1017 VALUES (?1, ?2, datetime('now')) \
1018 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1019 params![session_id, body],
1020 )?;
1021 Ok(())
1022 }
1023
1024 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1025 let body = self
1026 .conn()
1027 .query_row(
1028 "SELECT body FROM body_cache WHERE session_id = ?1",
1029 params![session_id],
1030 |row| row.get(0),
1031 )
1032 .optional()?;
1033 Ok(body)
1034 }
1035
1036 pub fn upsert_timeline_summary_cache(
1039 &self,
1040 lookup_key: &str,
1041 namespace: &str,
1042 compact: &str,
1043 payload: &str,
1044 raw: &str,
1045 ) -> Result<()> {
1046 self.conn().execute(
1047 "INSERT INTO timeline_summary_cache \
1048 (lookup_key, namespace, compact, payload, raw, cached_at) \
1049 VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1050 ON CONFLICT(lookup_key) DO UPDATE SET \
1051 namespace = excluded.namespace, \
1052 compact = excluded.compact, \
1053 payload = excluded.payload, \
1054 raw = excluded.raw, \
1055 cached_at = datetime('now')",
1056 params![lookup_key, namespace, compact, payload, raw],
1057 )?;
1058 Ok(())
1059 }
1060
1061 pub fn list_timeline_summary_cache_by_namespace(
1062 &self,
1063 namespace: &str,
1064 ) -> Result<Vec<TimelineSummaryCacheRow>> {
1065 let conn = self.conn();
1066 let mut stmt = conn.prepare(
1067 "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1068 FROM timeline_summary_cache \
1069 WHERE namespace = ?1 \
1070 ORDER BY cached_at DESC",
1071 )?;
1072 let rows = stmt.query_map(params![namespace], |row| {
1073 Ok(TimelineSummaryCacheRow {
1074 lookup_key: row.get(0)?,
1075 namespace: row.get(1)?,
1076 compact: row.get(2)?,
1077 payload: row.get(3)?,
1078 raw: row.get(4)?,
1079 cached_at: row.get(5)?,
1080 })
1081 })?;
1082
1083 let mut out = Vec::new();
1084 for row in rows {
1085 out.push(row?);
1086 }
1087 Ok(out)
1088 }
1089
1090 pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1091 let affected = self
1092 .conn()
1093 .execute("DELETE FROM timeline_summary_cache", [])?;
1094 Ok(affected)
1095 }
1096
1097 pub fn migrate_from_state_json(
1102 &self,
1103 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1104 ) -> Result<usize> {
1105 let mut count = 0;
1106 for (path, uploaded_at) in uploaded {
1107 let exists: bool = self
1108 .conn()
1109 .query_row(
1110 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1111 params![path],
1112 |row| row.get(0),
1113 )
1114 .unwrap_or(false);
1115
1116 if exists {
1117 self.conn().execute(
1118 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1119 WHERE source_path = ?2 AND sync_status = 'local_only'",
1120 params![uploaded_at.to_rfc3339(), path],
1121 )?;
1122 count += 1;
1123 }
1124 }
1125 Ok(count)
1126 }
1127
1128 pub fn link_commit_session(
1132 &self,
1133 commit_hash: &str,
1134 session_id: &str,
1135 repo_path: Option<&str>,
1136 branch: Option<&str>,
1137 ) -> Result<()> {
1138 self.conn().execute(
1139 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1140 VALUES (?1, ?2, ?3, ?4) \
1141 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1142 params![commit_hash, session_id, repo_path, branch],
1143 )?;
1144 Ok(())
1145 }
1146
1147 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1149 let sql = format!(
1150 "SELECT {LOCAL_SESSION_COLUMNS} \
1151 {FROM_CLAUSE} \
1152 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1153 WHERE csl.commit_hash = ?1 \
1154 ORDER BY s.created_at DESC"
1155 );
1156 let conn = self.conn();
1157 let mut stmt = conn.prepare(&sql)?;
1158 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1159 let mut result = Vec::new();
1160 for row in rows {
1161 result.push(row?);
1162 }
1163 Ok(result)
1164 }
1165
1166 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1168 let conn = self.conn();
1169 let mut stmt = conn.prepare(
1170 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1171 FROM commit_session_links WHERE session_id = ?1 \
1172 ORDER BY created_at DESC",
1173 )?;
1174 let rows = stmt.query_map(params![session_id], |row| {
1175 Ok(CommitLink {
1176 commit_hash: row.get(0)?,
1177 session_id: row.get(1)?,
1178 repo_path: row.get(2)?,
1179 branch: row.get(3)?,
1180 created_at: row.get(4)?,
1181 })
1182 })?;
1183 let mut result = Vec::new();
1184 for row in rows {
1185 result.push(row?);
1186 }
1187 Ok(result)
1188 }
1189
1190 pub fn find_active_session_for_repo(
1194 &self,
1195 repo_path: &str,
1196 since_minutes: u32,
1197 ) -> Result<Option<LocalSessionRow>> {
1198 let sql = format!(
1199 "SELECT {LOCAL_SESSION_COLUMNS} \
1200 {FROM_CLAUSE} \
1201 WHERE s.working_directory LIKE ?1 \
1202 AND s.created_at >= datetime('now', ?2) \
1203 ORDER BY s.created_at DESC LIMIT 1"
1204 );
1205 let since = format!("-{since_minutes} minutes");
1206 let like = format!("{repo_path}%");
1207 let conn = self.conn();
1208 let mut stmt = conn.prepare(&sql)?;
1209 let row = stmt
1210 .query_map(params![like, since], row_to_local_session)?
1211 .next()
1212 .transpose()?;
1213 Ok(row)
1214 }
1215
1216 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1218 let conn = self.conn();
1219 let mut stmt = conn
1220 .prepare("SELECT id FROM sessions")
1221 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1222 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1223 let mut set = std::collections::HashSet::new();
1224 if let Ok(rows) = rows {
1225 for row in rows.flatten() {
1226 set.insert(row);
1227 }
1228 }
1229 set
1230 }
1231
1232 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1234 let title = session.context.title.as_deref();
1235 let description = session.context.description.as_deref();
1236 let (files_modified, files_read, has_errors) =
1237 opensession_core::extract::extract_file_metadata(session);
1238 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1239
1240 self.conn().execute(
1241 "UPDATE sessions SET \
1242 title=?2, description=?3, \
1243 message_count=?4, user_message_count=?5, task_count=?6, \
1244 event_count=?7, duration_seconds=?8, \
1245 total_input_tokens=?9, total_output_tokens=?10, \
1246 files_modified=?11, files_read=?12, has_errors=?13, \
1247 max_active_agents=?14 \
1248 WHERE id=?1",
1249 params![
1250 &session.session_id,
1251 title,
1252 description,
1253 session.stats.message_count as i64,
1254 session.stats.user_message_count as i64,
1255 session.stats.task_count as i64,
1256 session.stats.event_count as i64,
1257 session.stats.duration_seconds as i64,
1258 session.stats.total_input_tokens as i64,
1259 session.stats.total_output_tokens as i64,
1260 &files_modified,
1261 &files_read,
1262 has_errors,
1263 max_active_agents,
1264 ],
1265 )?;
1266 Ok(())
1267 }
1268
1269 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1271 self.conn().execute(
1272 "INSERT INTO session_sync (session_id, source_path) \
1273 VALUES (?1, ?2) \
1274 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1275 params![session_id, source_path],
1276 )?;
1277 Ok(())
1278 }
1279
1280 pub fn list_repos(&self) -> Result<Vec<String>> {
1282 let conn = self.conn();
1283 let mut stmt = conn.prepare(
1284 "SELECT DISTINCT git_repo_name FROM sessions \
1285 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1286 )?;
1287 let rows = stmt.query_map([], |row| row.get(0))?;
1288 let mut result = Vec::new();
1289 for row in rows {
1290 result.push(row?);
1291 }
1292 Ok(result)
1293 }
1294}
1295
1296fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1299 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1300 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1301
1302 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1304
1305 apply_local_migrations(&conn)?;
1306
1307 ensure_sessions_columns(&conn)?;
1310 validate_local_schema(&conn)?;
1311
1312 Ok(conn)
1313}
1314
1315fn apply_local_migrations(conn: &Connection) -> Result<()> {
1316 conn.execute_batch(
1317 "CREATE TABLE IF NOT EXISTS _migrations (
1318 id INTEGER PRIMARY KEY,
1319 name TEXT NOT NULL UNIQUE,
1320 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1321 );",
1322 )
1323 .context("create _migrations table for local db")?;
1324
1325 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1326 let already_applied: bool = conn
1327 .query_row(
1328 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1329 [name],
1330 |row| row.get(0),
1331 )
1332 .unwrap_or(false);
1333
1334 if already_applied {
1335 continue;
1336 }
1337
1338 if let Err(e) = conn.execute_batch(sql) {
1339 let msg = e.to_string().to_ascii_lowercase();
1340 if !is_local_migration_compat_error(&msg) {
1341 return Err(e).with_context(|| format!("apply local migration {name}"));
1342 }
1343 }
1344
1345 conn.execute(
1346 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1347 [name],
1348 )
1349 .with_context(|| format!("record local migration {name}"))?;
1350 }
1351
1352 Ok(())
1353}
1354
1355fn is_local_migration_compat_error(msg: &str) -> bool {
1356 msg.contains("duplicate column name")
1357 || msg.contains("no such column")
1358 || msg.contains("already exists")
1359}
1360
1361fn validate_local_schema(conn: &Connection) -> Result<()> {
1362 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1363 conn.prepare(&sql)
1364 .map(|_| ())
1365 .context("validate local session schema")
1366}
1367
1368fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1369 let msg = format!("{err:#}").to_ascii_lowercase();
1370 msg.contains("no such column")
1371 || msg.contains("no such table")
1372 || msg.contains("cannot add a column")
1373 || msg.contains("already exists")
1374 || msg.contains("views may not be indexed")
1375 || msg.contains("malformed database schema")
1376 || msg.contains("duplicate column name")
1377}
1378
1379fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1380 if !path.exists() {
1381 return Ok(());
1382 }
1383
1384 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1385 let backup_name = format!(
1386 "{}.legacy-{}.bak",
1387 path.file_name()
1388 .and_then(|n| n.to_str())
1389 .unwrap_or("local.db"),
1390 ts
1391 );
1392 let backup_path = path.with_file_name(backup_name);
1393 std::fs::rename(path, &backup_path).with_context(|| {
1394 format!(
1395 "rotate legacy db {} -> {}",
1396 path.display(),
1397 backup_path.display()
1398 )
1399 })?;
1400
1401 let wal = PathBuf::from(format!("{}-wal", path.display()));
1402 let shm = PathBuf::from(format!("{}-shm", path.display()));
1403 let _ = std::fs::remove_file(wal);
1404 let _ = std::fs::remove_file(shm);
1405 Ok(())
1406}
1407
1408const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1409 ("user_id", "TEXT"),
1410 ("team_id", "TEXT DEFAULT 'personal'"),
1411 ("tool", "TEXT DEFAULT ''"),
1412 ("agent_provider", "TEXT"),
1413 ("agent_model", "TEXT"),
1414 ("title", "TEXT"),
1415 ("description", "TEXT"),
1416 ("tags", "TEXT"),
1417 ("created_at", "TEXT DEFAULT ''"),
1418 ("uploaded_at", "TEXT DEFAULT ''"),
1419 ("message_count", "INTEGER DEFAULT 0"),
1420 ("user_message_count", "INTEGER DEFAULT 0"),
1421 ("task_count", "INTEGER DEFAULT 0"),
1422 ("event_count", "INTEGER DEFAULT 0"),
1423 ("duration_seconds", "INTEGER DEFAULT 0"),
1424 ("total_input_tokens", "INTEGER DEFAULT 0"),
1425 ("total_output_tokens", "INTEGER DEFAULT 0"),
1426 ("body_storage_key", "TEXT DEFAULT ''"),
1427 ("body_url", "TEXT"),
1428 ("git_remote", "TEXT"),
1429 ("git_branch", "TEXT"),
1430 ("git_commit", "TEXT"),
1431 ("git_repo_name", "TEXT"),
1432 ("pr_number", "INTEGER"),
1433 ("pr_url", "TEXT"),
1434 ("working_directory", "TEXT"),
1435 ("files_modified", "TEXT"),
1436 ("files_read", "TEXT"),
1437 ("has_errors", "BOOLEAN DEFAULT 0"),
1438 ("max_active_agents", "INTEGER DEFAULT 1"),
1439];
1440
1441fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1442 let mut existing = HashSet::new();
1443 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1444 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1445 for row in rows {
1446 existing.insert(row?);
1447 }
1448
1449 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1450 if existing.contains(*name) {
1451 continue;
1452 }
1453 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1454 conn.execute_batch(&sql)
1455 .with_context(|| format!("add legacy sessions column '{name}'"))?;
1456 }
1457
1458 Ok(())
1459}
1460
1461pub const LOCAL_SESSION_COLUMNS: &str = "\
1463s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1464s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1465s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1466s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1467s.total_input_tokens, s.total_output_tokens, \
1468s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1469s.pr_number, s.pr_url, s.working_directory, \
1470s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1471
1472fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1473 Ok(LocalSessionRow {
1474 id: row.get(0)?,
1475 source_path: row.get(1)?,
1476 sync_status: row.get(2)?,
1477 last_synced_at: row.get(3)?,
1478 user_id: row.get(4)?,
1479 nickname: row.get(5)?,
1480 team_id: row.get(6)?,
1481 tool: row.get(7)?,
1482 agent_provider: row.get(8)?,
1483 agent_model: row.get(9)?,
1484 title: row.get(10)?,
1485 description: row.get(11)?,
1486 tags: row.get(12)?,
1487 created_at: row.get(13)?,
1488 uploaded_at: row.get(14)?,
1489 message_count: row.get(15)?,
1490 user_message_count: row.get(16)?,
1491 task_count: row.get(17)?,
1492 event_count: row.get(18)?,
1493 duration_seconds: row.get(19)?,
1494 total_input_tokens: row.get(20)?,
1495 total_output_tokens: row.get(21)?,
1496 git_remote: row.get(22)?,
1497 git_branch: row.get(23)?,
1498 git_commit: row.get(24)?,
1499 git_repo_name: row.get(25)?,
1500 pr_number: row.get(26)?,
1501 pr_url: row.get(27)?,
1502 working_directory: row.get(28)?,
1503 files_modified: row.get(29)?,
1504 files_read: row.get(30)?,
1505 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1506 max_active_agents: row.get(32).unwrap_or(1),
1507 })
1508}
1509
1510fn default_db_path() -> Result<PathBuf> {
1511 let home = std::env::var("HOME")
1512 .or_else(|_| std::env::var("USERPROFILE"))
1513 .context("Could not determine home directory")?;
1514 Ok(PathBuf::from(home)
1515 .join(".local")
1516 .join("share")
1517 .join("opensession")
1518 .join("local.db"))
1519}
1520
1521#[cfg(test)]
1522mod tests {
1523 use super::*;
1524
1525 use std::collections::BTreeSet;
1526 use std::fs::{create_dir_all, write};
1527 use tempfile::tempdir;
1528
1529 fn test_db() -> LocalDb {
1530 let dir = tempdir().unwrap();
1531 let path = dir.keep().join("test.db");
1532 LocalDb::open_path(&path).unwrap()
1533 }
1534
1535 fn temp_root() -> tempfile::TempDir {
1536 tempdir().unwrap()
1537 }
1538
1539 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1540 LocalSessionRow {
1541 id: id.to_string(),
1542 source_path: source_path.map(String::from),
1543 sync_status: "local_only".to_string(),
1544 last_synced_at: None,
1545 user_id: None,
1546 nickname: None,
1547 team_id: None,
1548 tool: tool.to_string(),
1549 agent_provider: None,
1550 agent_model: None,
1551 title: Some("test".to_string()),
1552 description: None,
1553 tags: None,
1554 created_at: "2024-01-01T00:00:00Z".to_string(),
1555 uploaded_at: None,
1556 message_count: 0,
1557 user_message_count: 0,
1558 task_count: 0,
1559 event_count: 0,
1560 duration_seconds: 0,
1561 total_input_tokens: 0,
1562 total_output_tokens: 0,
1563 git_remote: None,
1564 git_branch: None,
1565 git_commit: None,
1566 git_repo_name: None,
1567 pr_number: None,
1568 pr_url: None,
1569 working_directory: None,
1570 files_modified: None,
1571 files_read: None,
1572 has_errors: false,
1573 max_active_agents: 1,
1574 }
1575 }
1576
1577 #[test]
1578 fn test_open_and_schema() {
1579 let _db = test_db();
1580 }
1581
1582 #[test]
1583 fn test_open_backfills_legacy_sessions_columns() {
1584 let dir = tempfile::tempdir().unwrap();
1585 let path = dir.path().join("legacy.db");
1586 {
1587 let conn = Connection::open(&path).unwrap();
1588 conn.execute_batch(
1589 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1590 INSERT INTO sessions (id) VALUES ('legacy-1');",
1591 )
1592 .unwrap();
1593 }
1594
1595 let db = LocalDb::open_path(&path).unwrap();
1596 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1597 assert_eq!(rows.len(), 1);
1598 assert_eq!(rows[0].id, "legacy-1");
1599 assert_eq!(rows[0].user_message_count, 0);
1600 }
1601
1602 #[test]
1603 fn test_open_rotates_incompatible_legacy_schema() {
1604 let dir = tempfile::tempdir().unwrap();
1605 let path = dir.path().join("broken.db");
1606 {
1607 let conn = Connection::open(&path).unwrap();
1608 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1609 .unwrap();
1610 }
1611
1612 let db = LocalDb::open_path(&path).unwrap();
1613 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1614 assert!(rows.is_empty());
1615
1616 let rotated = std::fs::read_dir(dir.path())
1617 .unwrap()
1618 .filter_map(Result::ok)
1619 .any(|entry| {
1620 let name = entry.file_name();
1621 let name = name.to_string_lossy();
1622 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1623 });
1624 assert!(rotated, "expected rotated legacy backup file");
1625 }
1626
1627 #[test]
1628 fn test_is_opencode_child_session() {
1629 let root = temp_root();
1630 let dir = root.path().join("sessions");
1631 create_dir_all(&dir).unwrap();
1632 let parent_session = dir.join("parent.json");
1633 write(
1634 &parent_session,
1635 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1636 )
1637 .unwrap();
1638 let child_session = dir.join("child.json");
1639 write(
1640 &child_session,
1641 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1642 )
1643 .unwrap();
1644
1645 let parent = make_row(
1646 "ses_parent",
1647 "opencode",
1648 Some(parent_session.to_str().unwrap()),
1649 );
1650 let child = make_row(
1651 "ses_child",
1652 "opencode",
1653 Some(child_session.to_str().unwrap()),
1654 );
1655 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1656
1657 assert!(!is_opencode_child_session(&parent));
1658 assert!(is_opencode_child_session(&child));
1659 assert!(!is_opencode_child_session(&codex));
1660 }
1661
1662 #[test]
1663 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1664 let child = LocalSessionRow {
1665 id: "sess_child".to_string(),
1666 source_path: None,
1667 sync_status: "local_only".to_string(),
1668 last_synced_at: None,
1669 user_id: None,
1670 nickname: None,
1671 team_id: None,
1672 tool: "opencode".to_string(),
1673 agent_provider: None,
1674 agent_model: None,
1675 title: None,
1676 description: None,
1677 tags: None,
1678 created_at: "2024-01-01T00:00:00Z".to_string(),
1679 uploaded_at: None,
1680 message_count: 1,
1681 user_message_count: 0,
1682 task_count: 4,
1683 event_count: 4,
1684 duration_seconds: 0,
1685 total_input_tokens: 0,
1686 total_output_tokens: 0,
1687 git_remote: None,
1688 git_branch: None,
1689 git_commit: None,
1690 git_repo_name: None,
1691 pr_number: None,
1692 pr_url: None,
1693 working_directory: None,
1694 files_modified: None,
1695 files_read: None,
1696 has_errors: false,
1697 max_active_agents: 1,
1698 };
1699
1700 let parent = LocalSessionRow {
1701 id: "sess_parent".to_string(),
1702 source_path: None,
1703 sync_status: "local_only".to_string(),
1704 last_synced_at: None,
1705 user_id: None,
1706 nickname: None,
1707 team_id: None,
1708 tool: "opencode".to_string(),
1709 agent_provider: None,
1710 agent_model: None,
1711 title: Some("regular".to_string()),
1712 description: None,
1713 tags: None,
1714 created_at: "2024-01-01T00:00:00Z".to_string(),
1715 uploaded_at: None,
1716 message_count: 1,
1717 user_message_count: 1,
1718 task_count: 2,
1719 event_count: 20,
1720 duration_seconds: 0,
1721 total_input_tokens: 0,
1722 total_output_tokens: 0,
1723 git_remote: None,
1724 git_branch: None,
1725 git_commit: None,
1726 git_repo_name: None,
1727 pr_number: None,
1728 pr_url: None,
1729 working_directory: None,
1730 files_modified: None,
1731 files_read: None,
1732 has_errors: false,
1733 max_active_agents: 1,
1734 };
1735
1736 assert!(is_opencode_child_session(&child));
1737 assert!(!is_opencode_child_session(&parent));
1738 }
1739
1740 #[test]
1741 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1742 let child = LocalSessionRow {
1743 id: "sess_child_2".to_string(),
1744 source_path: None,
1745 sync_status: "local_only".to_string(),
1746 last_synced_at: None,
1747 user_id: None,
1748 nickname: None,
1749 team_id: None,
1750 tool: "opencode".to_string(),
1751 agent_provider: None,
1752 agent_model: None,
1753 title: None,
1754 description: None,
1755 tags: None,
1756 created_at: "2024-01-01T00:00:00Z".to_string(),
1757 uploaded_at: None,
1758 message_count: 2,
1759 user_message_count: 0,
1760 task_count: 4,
1761 event_count: 4,
1762 duration_seconds: 0,
1763 total_input_tokens: 0,
1764 total_output_tokens: 0,
1765 git_remote: None,
1766 git_branch: None,
1767 git_commit: None,
1768 git_repo_name: None,
1769 pr_number: None,
1770 pr_url: None,
1771 working_directory: None,
1772 files_modified: None,
1773 files_read: None,
1774 has_errors: false,
1775 max_active_agents: 1,
1776 };
1777
1778 let parent = LocalSessionRow {
1779 id: "sess_parent".to_string(),
1780 source_path: None,
1781 sync_status: "local_only".to_string(),
1782 last_synced_at: None,
1783 user_id: None,
1784 nickname: None,
1785 team_id: None,
1786 tool: "opencode".to_string(),
1787 agent_provider: None,
1788 agent_model: None,
1789 title: Some("regular".to_string()),
1790 description: None,
1791 tags: None,
1792 created_at: "2024-01-01T00:00:00Z".to_string(),
1793 uploaded_at: None,
1794 message_count: 2,
1795 user_message_count: 1,
1796 task_count: 5,
1797 event_count: 20,
1798 duration_seconds: 0,
1799 total_input_tokens: 0,
1800 total_output_tokens: 0,
1801 git_remote: None,
1802 git_branch: None,
1803 git_commit: None,
1804 git_repo_name: None,
1805 pr_number: None,
1806 pr_url: None,
1807 working_directory: None,
1808 files_modified: None,
1809 files_read: None,
1810 has_errors: false,
1811 max_active_agents: 1,
1812 };
1813
1814 assert!(is_opencode_child_session(&child));
1815 assert!(!is_opencode_child_session(&parent));
1816 }
1817
1818 #[test]
1819 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1820 let child = LocalSessionRow {
1821 id: "sess_child_3".to_string(),
1822 source_path: None,
1823 sync_status: "local_only".to_string(),
1824 last_synced_at: None,
1825 user_id: None,
1826 nickname: None,
1827 team_id: None,
1828 tool: "opencode".to_string(),
1829 agent_provider: None,
1830 agent_model: None,
1831 title: None,
1832 description: None,
1833 tags: None,
1834 created_at: "2024-01-01T00:00:00Z".to_string(),
1835 uploaded_at: None,
1836 message_count: 3,
1837 user_message_count: 0,
1838 task_count: 2,
1839 event_count: 6,
1840 duration_seconds: 0,
1841 total_input_tokens: 0,
1842 total_output_tokens: 0,
1843 git_remote: None,
1844 git_branch: None,
1845 git_commit: None,
1846 git_repo_name: None,
1847 pr_number: None,
1848 pr_url: None,
1849 working_directory: None,
1850 files_modified: None,
1851 files_read: None,
1852 has_errors: false,
1853 max_active_agents: 1,
1854 };
1855
1856 assert!(is_opencode_child_session(&child));
1857 }
1858
1859 #[test]
1860 fn test_parse_opencode_parent_session_id_aliases() {
1861 let root = temp_root();
1862 let dir = root.path().join("session-aliases");
1863 create_dir_all(&dir).unwrap();
1864 let child_session = dir.join("child.json");
1865 write(
1866 &child_session,
1867 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1868 )
1869 .unwrap();
1870 assert_eq!(
1871 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1872 Some("ses_parent")
1873 );
1874 }
1875
1876 #[test]
1877 fn test_parse_opencode_parent_session_id_nested_metadata() {
1878 let root = temp_root();
1879 let dir = root.path().join("session-nested");
1880 create_dir_all(&dir).unwrap();
1881 let child_session = dir.join("child.json");
1882 write(
1883 &child_session,
1884 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1885 )
1886 .unwrap();
1887 assert_eq!(
1888 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1889 Some("ses_parent")
1890 );
1891 }
1892
1893 #[test]
1894 fn test_is_claude_subagent_session() {
1895 let row = make_row(
1896 "ses_parent",
1897 "claude-code",
1898 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1899 );
1900 assert!(!is_opencode_child_session(&row));
1901 assert!(is_claude_subagent_session(&row));
1902 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1903 }
1904
1905 #[test]
1906 fn test_hide_opencode_child_sessions() {
1907 let root = temp_root();
1908 let dir = root.path().join("sessions");
1909 create_dir_all(&dir).unwrap();
1910 let parent_session = dir.join("parent.json");
1911 let child_session = dir.join("child.json");
1912 let orphan_session = dir.join("orphan.json");
1913
1914 write(
1915 &parent_session,
1916 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1917 )
1918 .unwrap();
1919 write(
1920 &child_session,
1921 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1922 )
1923 .unwrap();
1924 write(
1925 &orphan_session,
1926 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1927 )
1928 .unwrap();
1929
1930 let rows = vec![
1931 make_row(
1932 "ses_child",
1933 "opencode",
1934 Some(child_session.to_str().unwrap()),
1935 ),
1936 make_row(
1937 "ses_parent",
1938 "opencode",
1939 Some(parent_session.to_str().unwrap()),
1940 ),
1941 {
1942 let mut row = make_row("ses_other", "codex", None);
1943 row.user_message_count = 1;
1944 row
1945 },
1946 make_row(
1947 "ses_orphan",
1948 "opencode",
1949 Some(orphan_session.to_str().unwrap()),
1950 ),
1951 ];
1952
1953 let filtered = hide_opencode_child_sessions(rows);
1954 assert_eq!(filtered.len(), 3);
1955 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1956 }
1957
1958 #[test]
1959 fn test_sync_cursor() {
1960 let db = test_db();
1961 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1962 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1963 assert_eq!(
1964 db.get_sync_cursor("team1").unwrap(),
1965 Some("2024-01-01T00:00:00Z".to_string())
1966 );
1967 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1969 assert_eq!(
1970 db.get_sync_cursor("team1").unwrap(),
1971 Some("2024-06-01T00:00:00Z".to_string())
1972 );
1973 }
1974
1975 #[test]
1976 fn test_body_cache() {
1977 let db = test_db();
1978 assert_eq!(db.get_cached_body("s1").unwrap(), None);
1979 db.cache_body("s1", b"hello world").unwrap();
1980 assert_eq!(
1981 db.get_cached_body("s1").unwrap(),
1982 Some(b"hello world".to_vec())
1983 );
1984 }
1985
1986 #[test]
1987 fn test_timeline_summary_cache_roundtrip() {
1988 let db = test_db();
1989 db.upsert_timeline_summary_cache(
1990 "k1",
1991 "timeline:v1",
1992 "compact text",
1993 "{\"kind\":\"turn-summary\"}",
1994 "raw text",
1995 )
1996 .unwrap();
1997
1998 let rows = db
1999 .list_timeline_summary_cache_by_namespace("timeline:v1")
2000 .unwrap();
2001 assert_eq!(rows.len(), 1);
2002 assert_eq!(rows[0].lookup_key, "k1");
2003 assert_eq!(rows[0].namespace, "timeline:v1");
2004 assert_eq!(rows[0].compact, "compact text");
2005 assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2006 assert_eq!(rows[0].raw, "raw text");
2007
2008 let cleared = db.clear_timeline_summary_cache().unwrap();
2009 assert_eq!(cleared, 1);
2010 let rows_after = db
2011 .list_timeline_summary_cache_by_namespace("timeline:v1")
2012 .unwrap();
2013 assert!(rows_after.is_empty());
2014 }
2015
2016 #[test]
2017 fn test_local_migrations_include_timeline_summary_cache() {
2018 let db = test_db();
2019 let conn = db.conn();
2020 let applied: bool = conn
2021 .query_row(
2022 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2023 params!["local_0003_timeline_summary_cache"],
2024 |row| row.get(0),
2025 )
2026 .unwrap();
2027 assert!(applied);
2028 }
2029
2030 #[test]
2031 fn test_local_migration_files_match_api_local_migrations() {
2032 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2033 std::fs::read_dir(dir)
2034 .expect("read migrations directory")
2035 .filter_map(Result::ok)
2036 .map(|entry| entry.file_name().to_string_lossy().to_string())
2037 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2038 .collect()
2039 }
2040
2041 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2042 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2043 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2044
2045 assert_eq!(
2046 local_files, api_files,
2047 "local-db local migrations must stay in parity with api local migrations"
2048 );
2049 }
2050
2051 #[test]
2052 fn test_upsert_remote_session() {
2053 let db = test_db();
2054 let summary = RemoteSessionSummary {
2055 id: "remote-1".to_string(),
2056 user_id: Some("u1".to_string()),
2057 nickname: Some("alice".to_string()),
2058 team_id: "t1".to_string(),
2059 tool: "claude-code".to_string(),
2060 agent_provider: None,
2061 agent_model: None,
2062 title: Some("Test session".to_string()),
2063 description: None,
2064 tags: None,
2065 created_at: "2024-01-01T00:00:00Z".to_string(),
2066 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2067 message_count: 10,
2068 task_count: 2,
2069 event_count: 20,
2070 duration_seconds: 300,
2071 total_input_tokens: 1000,
2072 total_output_tokens: 500,
2073 git_remote: None,
2074 git_branch: None,
2075 git_commit: None,
2076 git_repo_name: None,
2077 pr_number: None,
2078 pr_url: None,
2079 working_directory: None,
2080 files_modified: None,
2081 files_read: None,
2082 has_errors: false,
2083 max_active_agents: 1,
2084 };
2085 db.upsert_remote_session(&summary).unwrap();
2086
2087 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2088 assert_eq!(sessions.len(), 1);
2089 assert_eq!(sessions[0].id, "remote-1");
2090 assert_eq!(sessions[0].sync_status, "remote_only");
2091 assert_eq!(sessions[0].nickname, None); }
2093
2094 #[test]
2095 fn test_list_filter_by_repo() {
2096 let db = test_db();
2097 let summary1 = RemoteSessionSummary {
2099 id: "s1".to_string(),
2100 user_id: None,
2101 nickname: None,
2102 team_id: "t1".to_string(),
2103 tool: "claude-code".to_string(),
2104 agent_provider: None,
2105 agent_model: None,
2106 title: Some("Session 1".to_string()),
2107 description: None,
2108 tags: None,
2109 created_at: "2024-01-01T00:00:00Z".to_string(),
2110 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2111 message_count: 5,
2112 task_count: 0,
2113 event_count: 10,
2114 duration_seconds: 60,
2115 total_input_tokens: 100,
2116 total_output_tokens: 50,
2117 git_remote: None,
2118 git_branch: None,
2119 git_commit: None,
2120 git_repo_name: None,
2121 pr_number: None,
2122 pr_url: None,
2123 working_directory: None,
2124 files_modified: None,
2125 files_read: None,
2126 has_errors: false,
2127 max_active_agents: 1,
2128 };
2129 db.upsert_remote_session(&summary1).unwrap();
2130
2131 let filter = LocalSessionFilter {
2133 team_id: Some("t1".to_string()),
2134 ..Default::default()
2135 };
2136 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2137
2138 let filter = LocalSessionFilter {
2139 team_id: Some("t999".to_string()),
2140 ..Default::default()
2141 };
2142 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2143 }
2144
2145 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2148 RemoteSessionSummary {
2149 id: id.to_string(),
2150 user_id: None,
2151 nickname: None,
2152 team_id: "t1".to_string(),
2153 tool: tool.to_string(),
2154 agent_provider: Some("anthropic".to_string()),
2155 agent_model: Some("claude-opus-4-6".to_string()),
2156 title: Some(title.to_string()),
2157 description: None,
2158 tags: None,
2159 created_at: created_at.to_string(),
2160 uploaded_at: created_at.to_string(),
2161 message_count: 5,
2162 task_count: 1,
2163 event_count: 10,
2164 duration_seconds: 300,
2165 total_input_tokens: 1000,
2166 total_output_tokens: 500,
2167 git_remote: None,
2168 git_branch: None,
2169 git_commit: None,
2170 git_repo_name: None,
2171 pr_number: None,
2172 pr_url: None,
2173 working_directory: None,
2174 files_modified: None,
2175 files_read: None,
2176 has_errors: false,
2177 max_active_agents: 1,
2178 }
2179 }
2180
2181 fn seed_sessions(db: &LocalDb) {
2182 db.upsert_remote_session(&make_summary(
2184 "s1",
2185 "claude-code",
2186 "First session",
2187 "2024-01-01T00:00:00Z",
2188 ))
2189 .unwrap();
2190 db.upsert_remote_session(&make_summary(
2191 "s2",
2192 "claude-code",
2193 "JWT auth work",
2194 "2024-01-02T00:00:00Z",
2195 ))
2196 .unwrap();
2197 db.upsert_remote_session(&make_summary(
2198 "s3",
2199 "gemini",
2200 "Gemini test",
2201 "2024-01-03T00:00:00Z",
2202 ))
2203 .unwrap();
2204 db.upsert_remote_session(&make_summary(
2205 "s4",
2206 "claude-code",
2207 "Error handling",
2208 "2024-01-04T00:00:00Z",
2209 ))
2210 .unwrap();
2211 db.upsert_remote_session(&make_summary(
2212 "s5",
2213 "claude-code",
2214 "Final polish",
2215 "2024-01-05T00:00:00Z",
2216 ))
2217 .unwrap();
2218 }
2219
2220 #[test]
2223 fn test_log_no_filters() {
2224 let db = test_db();
2225 seed_sessions(&db);
2226 let filter = LogFilter::default();
2227 let results = db.list_sessions_log(&filter).unwrap();
2228 assert_eq!(results.len(), 5);
2229 assert_eq!(results[0].id, "s5");
2231 assert_eq!(results[4].id, "s1");
2232 }
2233
2234 #[test]
2235 fn test_log_filter_by_tool() {
2236 let db = test_db();
2237 seed_sessions(&db);
2238 let filter = LogFilter {
2239 tool: Some("claude-code".to_string()),
2240 ..Default::default()
2241 };
2242 let results = db.list_sessions_log(&filter).unwrap();
2243 assert_eq!(results.len(), 4);
2244 assert!(results.iter().all(|s| s.tool == "claude-code"));
2245 }
2246
2247 #[test]
2248 fn test_log_filter_by_model_wildcard() {
2249 let db = test_db();
2250 seed_sessions(&db);
2251 let filter = LogFilter {
2252 model: Some("claude*".to_string()),
2253 ..Default::default()
2254 };
2255 let results = db.list_sessions_log(&filter).unwrap();
2256 assert_eq!(results.len(), 5); }
2258
2259 #[test]
2260 fn test_log_filter_since() {
2261 let db = test_db();
2262 seed_sessions(&db);
2263 let filter = LogFilter {
2264 since: Some("2024-01-03T00:00:00Z".to_string()),
2265 ..Default::default()
2266 };
2267 let results = db.list_sessions_log(&filter).unwrap();
2268 assert_eq!(results.len(), 3); }
2270
2271 #[test]
2272 fn test_log_filter_before() {
2273 let db = test_db();
2274 seed_sessions(&db);
2275 let filter = LogFilter {
2276 before: Some("2024-01-03T00:00:00Z".to_string()),
2277 ..Default::default()
2278 };
2279 let results = db.list_sessions_log(&filter).unwrap();
2280 assert_eq!(results.len(), 2); }
2282
2283 #[test]
2284 fn test_log_filter_since_and_before() {
2285 let db = test_db();
2286 seed_sessions(&db);
2287 let filter = LogFilter {
2288 since: Some("2024-01-02T00:00:00Z".to_string()),
2289 before: Some("2024-01-04T00:00:00Z".to_string()),
2290 ..Default::default()
2291 };
2292 let results = db.list_sessions_log(&filter).unwrap();
2293 assert_eq!(results.len(), 2); }
2295
2296 #[test]
2297 fn test_log_filter_grep() {
2298 let db = test_db();
2299 seed_sessions(&db);
2300 let filter = LogFilter {
2301 grep: Some("JWT".to_string()),
2302 ..Default::default()
2303 };
2304 let results = db.list_sessions_log(&filter).unwrap();
2305 assert_eq!(results.len(), 1);
2306 assert_eq!(results[0].id, "s2");
2307 }
2308
2309 #[test]
2310 fn test_log_limit_and_offset() {
2311 let db = test_db();
2312 seed_sessions(&db);
2313 let filter = LogFilter {
2314 limit: Some(2),
2315 offset: Some(1),
2316 ..Default::default()
2317 };
2318 let results = db.list_sessions_log(&filter).unwrap();
2319 assert_eq!(results.len(), 2);
2320 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2322 }
2323
2324 #[test]
2325 fn test_log_limit_only() {
2326 let db = test_db();
2327 seed_sessions(&db);
2328 let filter = LogFilter {
2329 limit: Some(3),
2330 ..Default::default()
2331 };
2332 let results = db.list_sessions_log(&filter).unwrap();
2333 assert_eq!(results.len(), 3);
2334 }
2335
2336 #[test]
2337 fn test_list_sessions_limit_offset() {
2338 let db = test_db();
2339 seed_sessions(&db);
2340 let filter = LocalSessionFilter {
2341 limit: Some(2),
2342 offset: Some(1),
2343 ..Default::default()
2344 };
2345 let results = db.list_sessions(&filter).unwrap();
2346 assert_eq!(results.len(), 2);
2347 assert_eq!(results[0].id, "s4");
2348 assert_eq!(results[1].id, "s3");
2349 }
2350
2351 #[test]
2352 fn test_count_sessions_filtered() {
2353 let db = test_db();
2354 seed_sessions(&db);
2355 let count = db
2356 .count_sessions_filtered(&LocalSessionFilter::default())
2357 .unwrap();
2358 assert_eq!(count, 5);
2359 }
2360
2361 #[test]
2362 fn test_list_session_tools() {
2363 let db = test_db();
2364 seed_sessions(&db);
2365 let tools = db
2366 .list_session_tools(&LocalSessionFilter::default())
2367 .unwrap();
2368 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2369 }
2370
2371 #[test]
2372 fn test_log_combined_filters() {
2373 let db = test_db();
2374 seed_sessions(&db);
2375 let filter = LogFilter {
2376 tool: Some("claude-code".to_string()),
2377 since: Some("2024-01-03T00:00:00Z".to_string()),
2378 limit: Some(1),
2379 ..Default::default()
2380 };
2381 let results = db.list_sessions_log(&filter).unwrap();
2382 assert_eq!(results.len(), 1);
2383 assert_eq!(results[0].id, "s5"); }
2385
2386 #[test]
2389 fn test_get_session_by_offset() {
2390 let db = test_db();
2391 seed_sessions(&db);
2392 let row = db.get_session_by_offset(0).unwrap().unwrap();
2393 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2395 assert_eq!(row.id, "s3");
2396 assert!(db.get_session_by_offset(10).unwrap().is_none());
2397 }
2398
2399 #[test]
2400 fn test_get_session_by_tool_offset() {
2401 let db = test_db();
2402 seed_sessions(&db);
2403 let row = db
2404 .get_session_by_tool_offset("claude-code", 0)
2405 .unwrap()
2406 .unwrap();
2407 assert_eq!(row.id, "s5");
2408 let row = db
2409 .get_session_by_tool_offset("claude-code", 1)
2410 .unwrap()
2411 .unwrap();
2412 assert_eq!(row.id, "s4");
2413 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2414 assert_eq!(row.id, "s3");
2415 assert!(db
2416 .get_session_by_tool_offset("gemini", 1)
2417 .unwrap()
2418 .is_none());
2419 }
2420
2421 #[test]
2422 fn test_get_sessions_latest() {
2423 let db = test_db();
2424 seed_sessions(&db);
2425 let rows = db.get_sessions_latest(3).unwrap();
2426 assert_eq!(rows.len(), 3);
2427 assert_eq!(rows[0].id, "s5");
2428 assert_eq!(rows[1].id, "s4");
2429 assert_eq!(rows[2].id, "s3");
2430 }
2431
2432 #[test]
2433 fn test_get_sessions_by_tool_latest() {
2434 let db = test_db();
2435 seed_sessions(&db);
2436 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2437 assert_eq!(rows.len(), 2);
2438 assert_eq!(rows[0].id, "s5");
2439 assert_eq!(rows[1].id, "s4");
2440 }
2441
2442 #[test]
2443 fn test_get_sessions_latest_more_than_available() {
2444 let db = test_db();
2445 seed_sessions(&db);
2446 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2447 assert_eq!(rows.len(), 1); }
2449
2450 #[test]
2451 fn test_session_count() {
2452 let db = test_db();
2453 assert_eq!(db.session_count().unwrap(), 0);
2454 seed_sessions(&db);
2455 assert_eq!(db.session_count().unwrap(), 5);
2456 }
2457
2458 #[test]
2461 fn test_link_commit_session() {
2462 let db = test_db();
2463 seed_sessions(&db);
2464 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2465 .unwrap();
2466
2467 let commits = db.get_commits_by_session("s1").unwrap();
2468 assert_eq!(commits.len(), 1);
2469 assert_eq!(commits[0].commit_hash, "abc123");
2470 assert_eq!(commits[0].session_id, "s1");
2471 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2472 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2473
2474 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2475 assert_eq!(sessions.len(), 1);
2476 assert_eq!(sessions[0].id, "s1");
2477 }
2478
2479 #[test]
2480 fn test_get_sessions_by_commit() {
2481 let db = test_db();
2482 seed_sessions(&db);
2483 db.link_commit_session("abc123", "s1", None, None).unwrap();
2485 db.link_commit_session("abc123", "s2", None, None).unwrap();
2486 db.link_commit_session("abc123", "s3", None, None).unwrap();
2487
2488 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2489 assert_eq!(sessions.len(), 3);
2490 assert_eq!(sessions[0].id, "s3");
2492 assert_eq!(sessions[1].id, "s2");
2493 assert_eq!(sessions[2].id, "s1");
2494 }
2495
2496 #[test]
2497 fn test_get_commits_by_session() {
2498 let db = test_db();
2499 seed_sessions(&db);
2500 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2502 .unwrap();
2503 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2504 .unwrap();
2505 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2506 .unwrap();
2507
2508 let commits = db.get_commits_by_session("s1").unwrap();
2509 assert_eq!(commits.len(), 3);
2510 assert!(commits.iter().all(|c| c.session_id == "s1"));
2512 }
2513
2514 #[test]
2515 fn test_duplicate_link_ignored() {
2516 let db = test_db();
2517 seed_sessions(&db);
2518 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2519 .unwrap();
2520 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2522 .unwrap();
2523
2524 let commits = db.get_commits_by_session("s1").unwrap();
2525 assert_eq!(commits.len(), 1);
2526 }
2527
2528 #[test]
2529 fn test_log_filter_by_commit() {
2530 let db = test_db();
2531 seed_sessions(&db);
2532 db.link_commit_session("abc123", "s2", None, None).unwrap();
2533 db.link_commit_session("abc123", "s4", None, None).unwrap();
2534
2535 let filter = LogFilter {
2536 commit: Some("abc123".to_string()),
2537 ..Default::default()
2538 };
2539 let results = db.list_sessions_log(&filter).unwrap();
2540 assert_eq!(results.len(), 2);
2541 assert_eq!(results[0].id, "s4");
2542 assert_eq!(results[1].id, "s2");
2543
2544 let filter = LogFilter {
2546 commit: Some("nonexistent".to_string()),
2547 ..Default::default()
2548 };
2549 let results = db.list_sessions_log(&filter).unwrap();
2550 assert_eq!(results.len(), 0);
2551 }
2552}