1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0002_team_invite_keys",
21 include_str!("../migrations/0002_team_invite_keys.sql"),
22 ),
23 (
24 "0003_max_active_agents",
25 include_str!("../migrations/0003_max_active_agents.sql"),
26 ),
27 (
28 "0004_oauth_states_provider",
29 include_str!("../migrations/0004_oauth_states_provider.sql"),
30 ),
31 (
32 "0005_sessions_body_url_backfill",
33 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34 ),
35 (
36 "0006_sessions_remove_fk_constraints",
37 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38 ),
39 (
40 "0007_sessions_list_perf_indexes",
41 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42 ),
43 (
44 "0008_teams_force_public",
45 include_str!("../migrations/0008_teams_force_public.sql"),
46 ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50 (
51 "local_0001_schema",
52 include_str!("../migrations/local_0001_schema.sql"),
53 ),
54 (
55 "local_0002_drop_unused_local_sessions",
56 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57 ),
58 (
59 "local_0003_timeline_summary_cache",
60 include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
61 ),
62];
63
64#[derive(Debug, Clone)]
66pub struct LocalSessionRow {
67 pub id: String,
68 pub source_path: Option<String>,
69 pub sync_status: String,
70 pub last_synced_at: Option<String>,
71 pub user_id: Option<String>,
72 pub nickname: Option<String>,
73 pub team_id: Option<String>,
74 pub tool: String,
75 pub agent_provider: Option<String>,
76 pub agent_model: Option<String>,
77 pub title: Option<String>,
78 pub description: Option<String>,
79 pub tags: Option<String>,
80 pub created_at: String,
81 pub uploaded_at: Option<String>,
82 pub message_count: i64,
83 pub user_message_count: i64,
84 pub task_count: i64,
85 pub event_count: i64,
86 pub duration_seconds: i64,
87 pub total_input_tokens: i64,
88 pub total_output_tokens: i64,
89 pub git_remote: Option<String>,
90 pub git_branch: Option<String>,
91 pub git_commit: Option<String>,
92 pub git_repo_name: Option<String>,
93 pub pr_number: Option<i64>,
94 pub pr_url: Option<String>,
95 pub working_directory: Option<String>,
96 pub files_modified: Option<String>,
97 pub files_read: Option<String>,
98 pub has_errors: bool,
99 pub max_active_agents: i64,
100}
101
102#[derive(Debug, Clone)]
104pub struct CommitLink {
105 pub commit_hash: String,
106 pub session_id: String,
107 pub repo_path: Option<String>,
108 pub branch: Option<String>,
109 pub created_at: String,
110}
111
112#[derive(Debug, Clone)]
114pub struct TimelineSummaryCacheRow {
115 pub lookup_key: String,
116 pub namespace: String,
117 pub compact: String,
118 pub payload: String,
119 pub raw: String,
120 pub cached_at: String,
121}
122
123pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
125 if row.tool != "opencode" {
126 return false;
127 }
128
129 if row.user_message_count <= 0
133 && row.message_count <= 4
134 && row.task_count <= 4
135 && row.event_count > 0
136 && row.event_count <= 16
137 {
138 return true;
139 }
140
141 if is_opencode_subagent_source(row.source_path.as_deref()) {
142 return true;
143 }
144
145 let source_path = match row.source_path.as_deref() {
146 Some(path) if !path.trim().is_empty() => path,
147 _ => return false,
148 };
149
150 parse_opencode_parent_session_id(source_path)
151 .is_some_and(|parent_id| !parent_id.trim().is_empty())
152}
153
154pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
156 let text = fs::read_to_string(source_path).ok()?;
157 let json: Value = serde_json::from_str(&text).ok()?;
158 lookup_parent_session_id(&json)
159}
160
161fn lookup_parent_session_id(value: &Value) -> Option<String> {
162 match value {
163 Value::Object(obj) => {
164 for (key, value) in obj {
165 if is_parent_id_key(key) {
166 if let Some(parent_id) = value.as_str() {
167 let parent_id = parent_id.trim();
168 if !parent_id.is_empty() {
169 return Some(parent_id.to_string());
170 }
171 }
172 }
173 if let Some(parent_id) = lookup_parent_session_id(value) {
174 return Some(parent_id);
175 }
176 }
177 None
178 }
179 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
180 _ => None,
181 }
182}
183
184fn is_parent_id_key(key: &str) -> bool {
185 let flat = key
186 .chars()
187 .filter(|c| c.is_ascii_alphanumeric())
188 .map(|c| c.to_ascii_lowercase())
189 .collect::<String>();
190
191 flat == "parentid"
192 || flat == "parentuuid"
193 || flat == "parentsessionid"
194 || flat == "parentsessionuuid"
195 || flat.ends_with("parentsessionid")
196 || (flat.contains("parent") && flat.ends_with("id"))
197 || (flat.contains("parent") && flat.ends_with("uuid"))
198}
199
200pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
202 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
203 rows
204}
205
206fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
207 is_subagent_source(source_path)
208}
209
210fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
211 if row.tool != "claude-code" {
212 return false;
213 }
214
215 is_subagent_source(row.source_path.as_deref())
216}
217
218fn is_subagent_source(source_path: Option<&str>) -> bool {
219 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
220 return false;
221 };
222
223 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
224 return true;
225 }
226
227 let filename = match std::path::Path::new(&source_path).file_name() {
228 Some(name) => name.to_string_lossy(),
229 None => return false,
230 };
231
232 filename.starts_with("agent-")
233 || filename.starts_with("agent_")
234 || filename.starts_with("subagent-")
235 || filename.starts_with("subagent_")
236}
237
238#[derive(Debug, Clone)]
240pub struct LocalSessionFilter {
241 pub team_id: Option<String>,
242 pub sync_status: Option<String>,
243 pub git_repo_name: Option<String>,
244 pub search: Option<String>,
245 pub tool: Option<String>,
246 pub sort: LocalSortOrder,
247 pub time_range: LocalTimeRange,
248 pub limit: Option<u32>,
249 pub offset: Option<u32>,
250}
251
252impl Default for LocalSessionFilter {
253 fn default() -> Self {
254 Self {
255 team_id: None,
256 sync_status: None,
257 git_repo_name: None,
258 search: None,
259 tool: None,
260 sort: LocalSortOrder::Recent,
261 time_range: LocalTimeRange::All,
262 limit: None,
263 offset: None,
264 }
265 }
266}
267
268#[derive(Debug, Clone, Default, PartialEq, Eq)]
270pub enum LocalSortOrder {
271 #[default]
272 Recent,
273 Popular,
274 Longest,
275}
276
277#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub enum LocalTimeRange {
280 Hours24,
281 Days7,
282 Days30,
283 #[default]
284 All,
285}
286
287#[derive(Debug, Clone)]
289pub struct RemoteSessionSummary {
290 pub id: String,
291 pub user_id: Option<String>,
292 pub nickname: Option<String>,
293 pub team_id: String,
294 pub tool: String,
295 pub agent_provider: Option<String>,
296 pub agent_model: Option<String>,
297 pub title: Option<String>,
298 pub description: Option<String>,
299 pub tags: Option<String>,
300 pub created_at: String,
301 pub uploaded_at: String,
302 pub message_count: i64,
303 pub task_count: i64,
304 pub event_count: i64,
305 pub duration_seconds: i64,
306 pub total_input_tokens: i64,
307 pub total_output_tokens: i64,
308 pub git_remote: Option<String>,
309 pub git_branch: Option<String>,
310 pub git_commit: Option<String>,
311 pub git_repo_name: Option<String>,
312 pub pr_number: Option<i64>,
313 pub pr_url: Option<String>,
314 pub working_directory: Option<String>,
315 pub files_modified: Option<String>,
316 pub files_read: Option<String>,
317 pub has_errors: bool,
318 pub max_active_agents: i64,
319}
320
321#[derive(Debug, Default)]
323pub struct LogFilter {
324 pub tool: Option<String>,
326 pub model: Option<String>,
328 pub since: Option<String>,
330 pub before: Option<String>,
332 pub touches: Option<String>,
334 pub grep: Option<String>,
336 pub has_errors: Option<bool>,
338 pub working_directory: Option<String>,
340 pub git_repo_name: Option<String>,
342 pub commit: Option<String>,
344 pub limit: Option<u32>,
346 pub offset: Option<u32>,
348}
349
350const FROM_CLAUSE: &str = "\
352FROM sessions s \
353LEFT JOIN session_sync ss ON ss.session_id = s.id \
354LEFT JOIN users u ON u.id = s.user_id";
355
356pub struct LocalDb {
359 conn: Mutex<Connection>,
360}
361
362impl LocalDb {
363 pub fn open() -> Result<Self> {
366 let path = default_db_path()?;
367 Self::open_path(&path)
368 }
369
370 pub fn open_path(path: &PathBuf) -> Result<Self> {
372 if let Some(parent) = path.parent() {
373 std::fs::create_dir_all(parent)
374 .with_context(|| format!("create dir for {}", path.display()))?;
375 }
376 match open_connection_with_latest_schema(path) {
377 Ok(conn) => Ok(Self {
378 conn: Mutex::new(conn),
379 }),
380 Err(err) => {
381 if !is_schema_compat_error(&err) {
382 return Err(err);
383 }
384
385 rotate_legacy_db(path)?;
388
389 let conn = open_connection_with_latest_schema(path)
390 .with_context(|| format!("recreate db {}", path.display()))?;
391 Ok(Self {
392 conn: Mutex::new(conn),
393 })
394 }
395 }
396 }
397
398 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
399 self.conn.lock().expect("local db mutex poisoned")
400 }
401
402 pub fn upsert_local_session(
405 &self,
406 session: &Session,
407 source_path: &str,
408 git: &GitContext,
409 ) -> Result<()> {
410 let title = session.context.title.as_deref();
411 let description = session.context.description.as_deref();
412 let tags = if session.context.tags.is_empty() {
413 None
414 } else {
415 Some(session.context.tags.join(","))
416 };
417 let created_at = session.context.created_at.to_rfc3339();
418 let cwd = session
419 .context
420 .attributes
421 .get("cwd")
422 .or_else(|| session.context.attributes.get("working_directory"))
423 .and_then(|v| v.as_str().map(String::from));
424
425 let (files_modified, files_read, has_errors) =
427 opensession_core::extract::extract_file_metadata(session);
428 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
429
430 let conn = self.conn();
431 conn.execute(
432 "INSERT INTO sessions \
433 (id, team_id, tool, agent_provider, agent_model, \
434 title, description, tags, created_at, \
435 message_count, user_message_count, task_count, event_count, duration_seconds, \
436 total_input_tokens, total_output_tokens, body_storage_key, \
437 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
438 files_modified, files_read, has_errors, max_active_agents) \
439 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
440 ON CONFLICT(id) DO UPDATE SET \
441 tool=excluded.tool, agent_provider=excluded.agent_provider, \
442 agent_model=excluded.agent_model, \
443 title=excluded.title, description=excluded.description, \
444 tags=excluded.tags, \
445 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
446 task_count=excluded.task_count, \
447 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
448 total_input_tokens=excluded.total_input_tokens, \
449 total_output_tokens=excluded.total_output_tokens, \
450 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
451 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
452 working_directory=excluded.working_directory, \
453 files_modified=excluded.files_modified, files_read=excluded.files_read, \
454 has_errors=excluded.has_errors, \
455 max_active_agents=excluded.max_active_agents",
456 params![
457 &session.session_id,
458 &session.agent.tool,
459 &session.agent.provider,
460 &session.agent.model,
461 title,
462 description,
463 &tags,
464 &created_at,
465 session.stats.message_count as i64,
466 session.stats.user_message_count as i64,
467 session.stats.task_count as i64,
468 session.stats.event_count as i64,
469 session.stats.duration_seconds as i64,
470 session.stats.total_input_tokens as i64,
471 session.stats.total_output_tokens as i64,
472 &git.remote,
473 &git.branch,
474 &git.commit,
475 &git.repo_name,
476 &cwd,
477 &files_modified,
478 &files_read,
479 has_errors,
480 max_active_agents,
481 ],
482 )?;
483
484 conn.execute(
485 "INSERT INTO session_sync (session_id, source_path, sync_status) \
486 VALUES (?1, ?2, 'local_only') \
487 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
488 params![&session.session_id, source_path],
489 )?;
490 Ok(())
491 }
492
493 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
496 let conn = self.conn();
497 conn.execute(
498 "INSERT INTO sessions \
499 (id, user_id, team_id, tool, agent_provider, agent_model, \
500 title, description, tags, created_at, uploaded_at, \
501 message_count, task_count, event_count, duration_seconds, \
502 total_input_tokens, total_output_tokens, body_storage_key, \
503 git_remote, git_branch, git_commit, git_repo_name, \
504 pr_number, pr_url, working_directory, \
505 files_modified, files_read, has_errors, max_active_agents) \
506 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
507 ON CONFLICT(id) DO UPDATE SET \
508 title=excluded.title, description=excluded.description, \
509 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
510 message_count=excluded.message_count, task_count=excluded.task_count, \
511 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
512 total_input_tokens=excluded.total_input_tokens, \
513 total_output_tokens=excluded.total_output_tokens, \
514 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
515 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
516 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
517 working_directory=excluded.working_directory, \
518 files_modified=excluded.files_modified, files_read=excluded.files_read, \
519 has_errors=excluded.has_errors, \
520 max_active_agents=excluded.max_active_agents",
521 params![
522 &summary.id,
523 &summary.user_id,
524 &summary.team_id,
525 &summary.tool,
526 &summary.agent_provider,
527 &summary.agent_model,
528 &summary.title,
529 &summary.description,
530 &summary.tags,
531 &summary.created_at,
532 &summary.uploaded_at,
533 summary.message_count,
534 summary.task_count,
535 summary.event_count,
536 summary.duration_seconds,
537 summary.total_input_tokens,
538 summary.total_output_tokens,
539 &summary.git_remote,
540 &summary.git_branch,
541 &summary.git_commit,
542 &summary.git_repo_name,
543 summary.pr_number,
544 &summary.pr_url,
545 &summary.working_directory,
546 &summary.files_modified,
547 &summary.files_read,
548 summary.has_errors,
549 summary.max_active_agents,
550 ],
551 )?;
552
553 conn.execute(
554 "INSERT INTO session_sync (session_id, sync_status) \
555 VALUES (?1, 'remote_only') \
556 ON CONFLICT(session_id) DO UPDATE SET \
557 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
558 params![&summary.id],
559 )?;
560 Ok(())
561 }
562
563 fn build_local_session_where_clause(
566 filter: &LocalSessionFilter,
567 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
568 let mut where_clauses = vec![
569 "1=1".to_string(),
570 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
571 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
572 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
573 ];
574 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
575 let mut idx = 1u32;
576
577 if let Some(ref team_id) = filter.team_id {
578 where_clauses.push(format!("s.team_id = ?{idx}"));
579 param_values.push(Box::new(team_id.clone()));
580 idx += 1;
581 }
582
583 if let Some(ref sync_status) = filter.sync_status {
584 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
585 param_values.push(Box::new(sync_status.clone()));
586 idx += 1;
587 }
588
589 if let Some(ref repo) = filter.git_repo_name {
590 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
591 param_values.push(Box::new(repo.clone()));
592 idx += 1;
593 }
594
595 if let Some(ref tool) = filter.tool {
596 where_clauses.push(format!("s.tool = ?{idx}"));
597 param_values.push(Box::new(tool.clone()));
598 idx += 1;
599 }
600
601 if let Some(ref search) = filter.search {
602 let like = format!("%{search}%");
603 where_clauses.push(format!(
604 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
605 i1 = idx,
606 i2 = idx + 1,
607 i3 = idx + 2,
608 ));
609 param_values.push(Box::new(like.clone()));
610 param_values.push(Box::new(like.clone()));
611 param_values.push(Box::new(like));
612 idx += 3;
613 }
614
615 let interval = match filter.time_range {
616 LocalTimeRange::Hours24 => Some("-1 day"),
617 LocalTimeRange::Days7 => Some("-7 days"),
618 LocalTimeRange::Days30 => Some("-30 days"),
619 LocalTimeRange::All => None,
620 };
621 if let Some(interval) = interval {
622 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
623 param_values.push(Box::new(interval.to_string()));
624 }
625
626 (where_clauses.join(" AND "), param_values)
627 }
628
629 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
630 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
631 let order_clause = match filter.sort {
632 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
633 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
634 LocalSortOrder::Recent => "s.created_at DESC",
635 };
636
637 let mut sql = format!(
638 "SELECT {LOCAL_SESSION_COLUMNS} \
639 {FROM_CLAUSE} WHERE {where_str} \
640 ORDER BY {order_clause}"
641 );
642
643 if let Some(limit) = filter.limit {
644 sql.push_str(" LIMIT ?");
645 param_values.push(Box::new(limit));
646 if let Some(offset) = filter.offset {
647 sql.push_str(" OFFSET ?");
648 param_values.push(Box::new(offset));
649 }
650 }
651
652 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
653 param_values.iter().map(|p| p.as_ref()).collect();
654 let conn = self.conn();
655 let mut stmt = conn.prepare(&sql)?;
656 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
657
658 let mut result = Vec::new();
659 for row in rows {
660 result.push(row?);
661 }
662
663 Ok(hide_opencode_child_sessions(result))
664 }
665
666 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
668 let mut count_filter = filter.clone();
669 count_filter.limit = None;
670 count_filter.offset = None;
671 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
672 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
673 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
674 param_values.iter().map(|p| p.as_ref()).collect();
675 let conn = self.conn();
676 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
677 Ok(count)
678 }
679
680 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
682 let mut tool_filter = filter.clone();
683 tool_filter.tool = None;
684 tool_filter.limit = None;
685 tool_filter.offset = None;
686 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
687 let sql = format!(
688 "SELECT DISTINCT s.tool \
689 {FROM_CLAUSE} WHERE {where_str} \
690 ORDER BY s.tool ASC"
691 );
692 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
693 param_values.iter().map(|p| p.as_ref()).collect();
694 let conn = self.conn();
695 let mut stmt = conn.prepare(&sql)?;
696 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
697
698 let mut tools = Vec::new();
699 for row in rows {
700 let tool = row?;
701 if !tool.trim().is_empty() {
702 tools.push(tool);
703 }
704 }
705 Ok(tools)
706 }
707
708 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
712 let mut where_clauses = vec!["1=1".to_string()];
713 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
714 let mut idx = 1u32;
715
716 if let Some(ref tool) = filter.tool {
717 where_clauses.push(format!("s.tool = ?{idx}"));
718 param_values.push(Box::new(tool.clone()));
719 idx += 1;
720 }
721
722 if let Some(ref model) = filter.model {
723 let like = model.replace('*', "%");
724 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
725 param_values.push(Box::new(like));
726 idx += 1;
727 }
728
729 if let Some(ref since) = filter.since {
730 where_clauses.push(format!("s.created_at >= ?{idx}"));
731 param_values.push(Box::new(since.clone()));
732 idx += 1;
733 }
734
735 if let Some(ref before) = filter.before {
736 where_clauses.push(format!("s.created_at < ?{idx}"));
737 param_values.push(Box::new(before.clone()));
738 idx += 1;
739 }
740
741 if let Some(ref touches) = filter.touches {
742 let like = format!("%\"{touches}\"%");
743 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
744 param_values.push(Box::new(like));
745 idx += 1;
746 }
747
748 if let Some(ref grep) = filter.grep {
749 let like = format!("%{grep}%");
750 where_clauses.push(format!(
751 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
752 i1 = idx,
753 i2 = idx + 1,
754 i3 = idx + 2,
755 ));
756 param_values.push(Box::new(like.clone()));
757 param_values.push(Box::new(like.clone()));
758 param_values.push(Box::new(like));
759 idx += 3;
760 }
761
762 if let Some(true) = filter.has_errors {
763 where_clauses.push("s.has_errors = 1".to_string());
764 }
765
766 if let Some(ref wd) = filter.working_directory {
767 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
768 param_values.push(Box::new(format!("{wd}%")));
769 idx += 1;
770 }
771
772 if let Some(ref repo) = filter.git_repo_name {
773 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
774 param_values.push(Box::new(repo.clone()));
775 idx += 1;
776 }
777
778 let mut extra_join = String::new();
780 if let Some(ref commit) = filter.commit {
781 extra_join =
782 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
783 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
784 param_values.push(Box::new(commit.clone()));
785 idx += 1;
786 }
787
788 let _ = idx; let where_str = where_clauses.join(" AND ");
791 let mut sql = format!(
792 "SELECT {LOCAL_SESSION_COLUMNS} \
793 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
794 ORDER BY s.created_at DESC"
795 );
796
797 if let Some(limit) = filter.limit {
798 sql.push_str(" LIMIT ?");
799 param_values.push(Box::new(limit));
800 if let Some(offset) = filter.offset {
801 sql.push_str(" OFFSET ?");
802 param_values.push(Box::new(offset));
803 }
804 }
805
806 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
807 param_values.iter().map(|p| p.as_ref()).collect();
808 let conn = self.conn();
809 let mut stmt = conn.prepare(&sql)?;
810 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
811
812 let mut result = Vec::new();
813 for row in rows {
814 result.push(row?);
815 }
816 Ok(hide_opencode_child_sessions(result))
817 }
818
819 pub fn get_sessions_by_tool_latest(
821 &self,
822 tool: &str,
823 count: u32,
824 ) -> Result<Vec<LocalSessionRow>> {
825 let sql = format!(
826 "SELECT {LOCAL_SESSION_COLUMNS} \
827 {FROM_CLAUSE} WHERE s.tool = ?1 \
828 ORDER BY s.created_at DESC"
829 );
830 let conn = self.conn();
831 let mut stmt = conn.prepare(&sql)?;
832 let rows = stmt.query_map(params![tool], row_to_local_session)?;
833 let mut result = Vec::new();
834 for row in rows {
835 result.push(row?);
836 }
837
838 let mut filtered = hide_opencode_child_sessions(result);
839 filtered.truncate(count as usize);
840 Ok(filtered)
841 }
842
843 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
845 let sql = format!(
846 "SELECT {LOCAL_SESSION_COLUMNS} \
847 {FROM_CLAUSE} \
848 ORDER BY s.created_at DESC"
849 );
850 let conn = self.conn();
851 let mut stmt = conn.prepare(&sql)?;
852 let rows = stmt.query_map([], row_to_local_session)?;
853 let mut result = Vec::new();
854 for row in rows {
855 result.push(row?);
856 }
857
858 let mut filtered = hide_opencode_child_sessions(result);
859 filtered.truncate(count as usize);
860 Ok(filtered)
861 }
862
863 pub fn get_session_by_tool_offset(
865 &self,
866 tool: &str,
867 offset: u32,
868 ) -> Result<Option<LocalSessionRow>> {
869 let sql = format!(
870 "SELECT {LOCAL_SESSION_COLUMNS} \
871 {FROM_CLAUSE} WHERE s.tool = ?1 \
872 ORDER BY s.created_at DESC"
873 );
874 let conn = self.conn();
875 let mut stmt = conn.prepare(&sql)?;
876 let rows = stmt.query_map(params![tool], row_to_local_session)?;
877 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
878 Ok(filtered.into_iter().nth(offset as usize))
879 }
880
881 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
883 let sql = format!(
884 "SELECT {LOCAL_SESSION_COLUMNS} \
885 {FROM_CLAUSE} \
886 ORDER BY s.created_at DESC"
887 );
888 let conn = self.conn();
889 let mut stmt = conn.prepare(&sql)?;
890 let rows = stmt.query_map([], row_to_local_session)?;
891 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
892 Ok(filtered.into_iter().nth(offset as usize))
893 }
894
895 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
897 let conn = self.conn();
898 let result = conn
899 .query_row(
900 "SELECT source_path FROM session_sync WHERE session_id = ?1",
901 params![session_id],
902 |row| row.get(0),
903 )
904 .optional()?;
905
906 Ok(result)
907 }
908
909 pub fn session_count(&self) -> Result<i64> {
911 let count = self
912 .conn()
913 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
914 Ok(count)
915 }
916
917 pub fn delete_session(&self, session_id: &str) -> Result<()> {
920 let conn = self.conn();
921 conn.execute(
922 "DELETE FROM body_cache WHERE session_id = ?1",
923 params![session_id],
924 )?;
925 conn.execute(
926 "DELETE FROM session_sync WHERE session_id = ?1",
927 params![session_id],
928 )?;
929 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
930 Ok(())
931 }
932
933 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
936 let cursor = self
937 .conn()
938 .query_row(
939 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
940 params![team_id],
941 |row| row.get(0),
942 )
943 .optional()?;
944 Ok(cursor)
945 }
946
947 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
948 self.conn().execute(
949 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
950 VALUES (?1, ?2, datetime('now')) \
951 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
952 params![team_id, cursor],
953 )?;
954 Ok(())
955 }
956
957 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
961 let sql = format!(
962 "SELECT {LOCAL_SESSION_COLUMNS} \
963 FROM sessions s \
964 INNER JOIN session_sync ss ON ss.session_id = s.id \
965 LEFT JOIN users u ON u.id = s.user_id \
966 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
967 ORDER BY s.created_at ASC"
968 );
969 let conn = self.conn();
970 let mut stmt = conn.prepare(&sql)?;
971 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
972 let mut result = Vec::new();
973 for row in rows {
974 result.push(row?);
975 }
976 Ok(result)
977 }
978
979 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
980 self.conn().execute(
981 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
982 WHERE session_id = ?1",
983 params![session_id],
984 )?;
985 Ok(())
986 }
987
988 pub fn was_uploaded_after(
990 &self,
991 source_path: &str,
992 modified: &chrono::DateTime<chrono::Utc>,
993 ) -> Result<bool> {
994 let result: Option<String> = self
995 .conn()
996 .query_row(
997 "SELECT last_synced_at FROM session_sync \
998 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
999 params![source_path],
1000 |row| row.get(0),
1001 )
1002 .optional()?;
1003
1004 if let Some(synced_at) = result {
1005 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1006 return Ok(dt >= *modified);
1007 }
1008 }
1009 Ok(false)
1010 }
1011
1012 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1015 self.conn().execute(
1016 "INSERT INTO body_cache (session_id, body, cached_at) \
1017 VALUES (?1, ?2, datetime('now')) \
1018 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1019 params![session_id, body],
1020 )?;
1021 Ok(())
1022 }
1023
1024 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1025 let body = self
1026 .conn()
1027 .query_row(
1028 "SELECT body FROM body_cache WHERE session_id = ?1",
1029 params![session_id],
1030 |row| row.get(0),
1031 )
1032 .optional()?;
1033 Ok(body)
1034 }
1035
1036 pub fn upsert_timeline_summary_cache(
1039 &self,
1040 lookup_key: &str,
1041 namespace: &str,
1042 compact: &str,
1043 payload: &str,
1044 raw: &str,
1045 ) -> Result<()> {
1046 self.conn().execute(
1047 "INSERT INTO timeline_summary_cache \
1048 (lookup_key, namespace, compact, payload, raw, cached_at) \
1049 VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1050 ON CONFLICT(lookup_key) DO UPDATE SET \
1051 namespace = excluded.namespace, \
1052 compact = excluded.compact, \
1053 payload = excluded.payload, \
1054 raw = excluded.raw, \
1055 cached_at = datetime('now')",
1056 params![lookup_key, namespace, compact, payload, raw],
1057 )?;
1058 Ok(())
1059 }
1060
1061 pub fn list_timeline_summary_cache_by_namespace(
1062 &self,
1063 namespace: &str,
1064 ) -> Result<Vec<TimelineSummaryCacheRow>> {
1065 let conn = self.conn();
1066 let mut stmt = conn.prepare(
1067 "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1068 FROM timeline_summary_cache \
1069 WHERE namespace = ?1 \
1070 ORDER BY cached_at DESC",
1071 )?;
1072 let rows = stmt.query_map(params![namespace], |row| {
1073 Ok(TimelineSummaryCacheRow {
1074 lookup_key: row.get(0)?,
1075 namespace: row.get(1)?,
1076 compact: row.get(2)?,
1077 payload: row.get(3)?,
1078 raw: row.get(4)?,
1079 cached_at: row.get(5)?,
1080 })
1081 })?;
1082
1083 let mut out = Vec::new();
1084 for row in rows {
1085 out.push(row?);
1086 }
1087 Ok(out)
1088 }
1089
1090 pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1091 let affected = self
1092 .conn()
1093 .execute("DELETE FROM timeline_summary_cache", [])?;
1094 Ok(affected)
1095 }
1096
1097 pub fn migrate_from_state_json(
1102 &self,
1103 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1104 ) -> Result<usize> {
1105 let mut count = 0;
1106 for (path, uploaded_at) in uploaded {
1107 let exists: bool = self
1108 .conn()
1109 .query_row(
1110 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1111 params![path],
1112 |row| row.get(0),
1113 )
1114 .unwrap_or(false);
1115
1116 if exists {
1117 self.conn().execute(
1118 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1119 WHERE source_path = ?2 AND sync_status = 'local_only'",
1120 params![uploaded_at.to_rfc3339(), path],
1121 )?;
1122 count += 1;
1123 }
1124 }
1125 Ok(count)
1126 }
1127
1128 pub fn link_commit_session(
1132 &self,
1133 commit_hash: &str,
1134 session_id: &str,
1135 repo_path: Option<&str>,
1136 branch: Option<&str>,
1137 ) -> Result<()> {
1138 self.conn().execute(
1139 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1140 VALUES (?1, ?2, ?3, ?4) \
1141 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1142 params![commit_hash, session_id, repo_path, branch],
1143 )?;
1144 Ok(())
1145 }
1146
1147 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1149 let sql = format!(
1150 "SELECT {LOCAL_SESSION_COLUMNS} \
1151 {FROM_CLAUSE} \
1152 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1153 WHERE csl.commit_hash = ?1 \
1154 ORDER BY s.created_at DESC"
1155 );
1156 let conn = self.conn();
1157 let mut stmt = conn.prepare(&sql)?;
1158 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1159 let mut result = Vec::new();
1160 for row in rows {
1161 result.push(row?);
1162 }
1163 Ok(result)
1164 }
1165
1166 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1168 let conn = self.conn();
1169 let mut stmt = conn.prepare(
1170 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1171 FROM commit_session_links WHERE session_id = ?1 \
1172 ORDER BY created_at DESC",
1173 )?;
1174 let rows = stmt.query_map(params![session_id], |row| {
1175 Ok(CommitLink {
1176 commit_hash: row.get(0)?,
1177 session_id: row.get(1)?,
1178 repo_path: row.get(2)?,
1179 branch: row.get(3)?,
1180 created_at: row.get(4)?,
1181 })
1182 })?;
1183 let mut result = Vec::new();
1184 for row in rows {
1185 result.push(row?);
1186 }
1187 Ok(result)
1188 }
1189
1190 pub fn find_active_session_for_repo(
1194 &self,
1195 repo_path: &str,
1196 since_minutes: u32,
1197 ) -> Result<Option<LocalSessionRow>> {
1198 let sql = format!(
1199 "SELECT {LOCAL_SESSION_COLUMNS} \
1200 {FROM_CLAUSE} \
1201 WHERE s.working_directory LIKE ?1 \
1202 AND s.created_at >= datetime('now', ?2) \
1203 ORDER BY s.created_at DESC LIMIT 1"
1204 );
1205 let since = format!("-{since_minutes} minutes");
1206 let like = format!("{repo_path}%");
1207 let conn = self.conn();
1208 let mut stmt = conn.prepare(&sql)?;
1209 let row = stmt
1210 .query_map(params![like, since], row_to_local_session)?
1211 .next()
1212 .transpose()?;
1213 Ok(row)
1214 }
1215
1216 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1218 let conn = self.conn();
1219 let mut stmt = conn
1220 .prepare("SELECT id FROM sessions")
1221 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1222 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1223 let mut set = std::collections::HashSet::new();
1224 if let Ok(rows) = rows {
1225 for row in rows.flatten() {
1226 set.insert(row);
1227 }
1228 }
1229 set
1230 }
1231
1232 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1234 let title = session.context.title.as_deref();
1235 let description = session.context.description.as_deref();
1236 let (files_modified, files_read, has_errors) =
1237 opensession_core::extract::extract_file_metadata(session);
1238 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1239
1240 self.conn().execute(
1241 "UPDATE sessions SET \
1242 title=?2, description=?3, \
1243 message_count=?4, user_message_count=?5, task_count=?6, \
1244 event_count=?7, duration_seconds=?8, \
1245 total_input_tokens=?9, total_output_tokens=?10, \
1246 files_modified=?11, files_read=?12, has_errors=?13, \
1247 max_active_agents=?14 \
1248 WHERE id=?1",
1249 params![
1250 &session.session_id,
1251 title,
1252 description,
1253 session.stats.message_count as i64,
1254 session.stats.user_message_count as i64,
1255 session.stats.task_count as i64,
1256 session.stats.event_count as i64,
1257 session.stats.duration_seconds as i64,
1258 session.stats.total_input_tokens as i64,
1259 session.stats.total_output_tokens as i64,
1260 &files_modified,
1261 &files_read,
1262 has_errors,
1263 max_active_agents,
1264 ],
1265 )?;
1266 Ok(())
1267 }
1268
1269 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1271 self.conn().execute(
1272 "INSERT INTO session_sync (session_id, source_path) \
1273 VALUES (?1, ?2) \
1274 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1275 params![session_id, source_path],
1276 )?;
1277 Ok(())
1278 }
1279
1280 pub fn list_repos(&self) -> Result<Vec<String>> {
1282 let conn = self.conn();
1283 let mut stmt = conn.prepare(
1284 "SELECT DISTINCT git_repo_name FROM sessions \
1285 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1286 )?;
1287 let rows = stmt.query_map([], |row| row.get(0))?;
1288 let mut result = Vec::new();
1289 for row in rows {
1290 result.push(row?);
1291 }
1292 Ok(result)
1293 }
1294
1295 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1297 let conn = self.conn();
1298 let mut stmt = conn.prepare(
1299 "SELECT DISTINCT working_directory FROM sessions \
1300 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1301 ORDER BY working_directory ASC",
1302 )?;
1303 let rows = stmt.query_map([], |row| row.get(0))?;
1304 let mut result = Vec::new();
1305 for row in rows {
1306 result.push(row?);
1307 }
1308 Ok(result)
1309 }
1310}
1311
1312fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1315 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1316 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1317
1318 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1320
1321 apply_local_migrations(&conn)?;
1322
1323 ensure_sessions_columns(&conn)?;
1326 validate_local_schema(&conn)?;
1327
1328 Ok(conn)
1329}
1330
1331fn apply_local_migrations(conn: &Connection) -> Result<()> {
1332 conn.execute_batch(
1333 "CREATE TABLE IF NOT EXISTS _migrations (
1334 id INTEGER PRIMARY KEY,
1335 name TEXT NOT NULL UNIQUE,
1336 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1337 );",
1338 )
1339 .context("create _migrations table for local db")?;
1340
1341 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1342 let already_applied: bool = conn
1343 .query_row(
1344 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1345 [name],
1346 |row| row.get(0),
1347 )
1348 .unwrap_or(false);
1349
1350 if already_applied {
1351 continue;
1352 }
1353
1354 if let Err(e) = conn.execute_batch(sql) {
1355 let msg = e.to_string().to_ascii_lowercase();
1356 if !is_local_migration_compat_error(&msg) {
1357 return Err(e).with_context(|| format!("apply local migration {name}"));
1358 }
1359 }
1360
1361 conn.execute(
1362 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1363 [name],
1364 )
1365 .with_context(|| format!("record local migration {name}"))?;
1366 }
1367
1368 Ok(())
1369}
1370
1371fn is_local_migration_compat_error(msg: &str) -> bool {
1372 msg.contains("duplicate column name")
1373 || msg.contains("no such column")
1374 || msg.contains("already exists")
1375}
1376
1377fn validate_local_schema(conn: &Connection) -> Result<()> {
1378 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1379 conn.prepare(&sql)
1380 .map(|_| ())
1381 .context("validate local session schema")
1382}
1383
1384fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1385 let msg = format!("{err:#}").to_ascii_lowercase();
1386 msg.contains("no such column")
1387 || msg.contains("no such table")
1388 || msg.contains("cannot add a column")
1389 || msg.contains("already exists")
1390 || msg.contains("views may not be indexed")
1391 || msg.contains("malformed database schema")
1392 || msg.contains("duplicate column name")
1393}
1394
1395fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1396 if !path.exists() {
1397 return Ok(());
1398 }
1399
1400 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1401 let backup_name = format!(
1402 "{}.legacy-{}.bak",
1403 path.file_name()
1404 .and_then(|n| n.to_str())
1405 .unwrap_or("local.db"),
1406 ts
1407 );
1408 let backup_path = path.with_file_name(backup_name);
1409 std::fs::rename(path, &backup_path).with_context(|| {
1410 format!(
1411 "rotate legacy db {} -> {}",
1412 path.display(),
1413 backup_path.display()
1414 )
1415 })?;
1416
1417 let wal = PathBuf::from(format!("{}-wal", path.display()));
1418 let shm = PathBuf::from(format!("{}-shm", path.display()));
1419 let _ = std::fs::remove_file(wal);
1420 let _ = std::fs::remove_file(shm);
1421 Ok(())
1422}
1423
1424const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1425 ("user_id", "TEXT"),
1426 ("team_id", "TEXT DEFAULT 'personal'"),
1427 ("tool", "TEXT DEFAULT ''"),
1428 ("agent_provider", "TEXT"),
1429 ("agent_model", "TEXT"),
1430 ("title", "TEXT"),
1431 ("description", "TEXT"),
1432 ("tags", "TEXT"),
1433 ("created_at", "TEXT DEFAULT ''"),
1434 ("uploaded_at", "TEXT DEFAULT ''"),
1435 ("message_count", "INTEGER DEFAULT 0"),
1436 ("user_message_count", "INTEGER DEFAULT 0"),
1437 ("task_count", "INTEGER DEFAULT 0"),
1438 ("event_count", "INTEGER DEFAULT 0"),
1439 ("duration_seconds", "INTEGER DEFAULT 0"),
1440 ("total_input_tokens", "INTEGER DEFAULT 0"),
1441 ("total_output_tokens", "INTEGER DEFAULT 0"),
1442 ("body_storage_key", "TEXT DEFAULT ''"),
1443 ("body_url", "TEXT"),
1444 ("git_remote", "TEXT"),
1445 ("git_branch", "TEXT"),
1446 ("git_commit", "TEXT"),
1447 ("git_repo_name", "TEXT"),
1448 ("pr_number", "INTEGER"),
1449 ("pr_url", "TEXT"),
1450 ("working_directory", "TEXT"),
1451 ("files_modified", "TEXT"),
1452 ("files_read", "TEXT"),
1453 ("has_errors", "BOOLEAN DEFAULT 0"),
1454 ("max_active_agents", "INTEGER DEFAULT 1"),
1455];
1456
1457fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1458 let mut existing = HashSet::new();
1459 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1460 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1461 for row in rows {
1462 existing.insert(row?);
1463 }
1464
1465 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1466 if existing.contains(*name) {
1467 continue;
1468 }
1469 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1470 conn.execute_batch(&sql)
1471 .with_context(|| format!("add legacy sessions column '{name}'"))?;
1472 }
1473
1474 Ok(())
1475}
1476
1477pub const LOCAL_SESSION_COLUMNS: &str = "\
1479s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1480s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1481s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1482s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1483s.total_input_tokens, s.total_output_tokens, \
1484s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1485s.pr_number, s.pr_url, s.working_directory, \
1486s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1487
1488fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1489 Ok(LocalSessionRow {
1490 id: row.get(0)?,
1491 source_path: row.get(1)?,
1492 sync_status: row.get(2)?,
1493 last_synced_at: row.get(3)?,
1494 user_id: row.get(4)?,
1495 nickname: row.get(5)?,
1496 team_id: row.get(6)?,
1497 tool: row.get(7)?,
1498 agent_provider: row.get(8)?,
1499 agent_model: row.get(9)?,
1500 title: row.get(10)?,
1501 description: row.get(11)?,
1502 tags: row.get(12)?,
1503 created_at: row.get(13)?,
1504 uploaded_at: row.get(14)?,
1505 message_count: row.get(15)?,
1506 user_message_count: row.get(16)?,
1507 task_count: row.get(17)?,
1508 event_count: row.get(18)?,
1509 duration_seconds: row.get(19)?,
1510 total_input_tokens: row.get(20)?,
1511 total_output_tokens: row.get(21)?,
1512 git_remote: row.get(22)?,
1513 git_branch: row.get(23)?,
1514 git_commit: row.get(24)?,
1515 git_repo_name: row.get(25)?,
1516 pr_number: row.get(26)?,
1517 pr_url: row.get(27)?,
1518 working_directory: row.get(28)?,
1519 files_modified: row.get(29)?,
1520 files_read: row.get(30)?,
1521 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1522 max_active_agents: row.get(32).unwrap_or(1),
1523 })
1524}
1525
1526fn default_db_path() -> Result<PathBuf> {
1527 let home = std::env::var("HOME")
1528 .or_else(|_| std::env::var("USERPROFILE"))
1529 .context("Could not determine home directory")?;
1530 Ok(PathBuf::from(home)
1531 .join(".local")
1532 .join("share")
1533 .join("opensession")
1534 .join("local.db"))
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use super::*;
1540
1541 use std::collections::BTreeSet;
1542 use std::fs::{create_dir_all, write};
1543 use tempfile::tempdir;
1544
1545 fn test_db() -> LocalDb {
1546 let dir = tempdir().unwrap();
1547 let path = dir.keep().join("test.db");
1548 LocalDb::open_path(&path).unwrap()
1549 }
1550
1551 fn temp_root() -> tempfile::TempDir {
1552 tempdir().unwrap()
1553 }
1554
1555 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1556 LocalSessionRow {
1557 id: id.to_string(),
1558 source_path: source_path.map(String::from),
1559 sync_status: "local_only".to_string(),
1560 last_synced_at: None,
1561 user_id: None,
1562 nickname: None,
1563 team_id: None,
1564 tool: tool.to_string(),
1565 agent_provider: None,
1566 agent_model: None,
1567 title: Some("test".to_string()),
1568 description: None,
1569 tags: None,
1570 created_at: "2024-01-01T00:00:00Z".to_string(),
1571 uploaded_at: None,
1572 message_count: 0,
1573 user_message_count: 0,
1574 task_count: 0,
1575 event_count: 0,
1576 duration_seconds: 0,
1577 total_input_tokens: 0,
1578 total_output_tokens: 0,
1579 git_remote: None,
1580 git_branch: None,
1581 git_commit: None,
1582 git_repo_name: None,
1583 pr_number: None,
1584 pr_url: None,
1585 working_directory: None,
1586 files_modified: None,
1587 files_read: None,
1588 has_errors: false,
1589 max_active_agents: 1,
1590 }
1591 }
1592
1593 #[test]
1594 fn test_open_and_schema() {
1595 let _db = test_db();
1596 }
1597
1598 #[test]
1599 fn test_open_backfills_legacy_sessions_columns() {
1600 let dir = tempfile::tempdir().unwrap();
1601 let path = dir.path().join("legacy.db");
1602 {
1603 let conn = Connection::open(&path).unwrap();
1604 conn.execute_batch(
1605 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1606 INSERT INTO sessions (id) VALUES ('legacy-1');",
1607 )
1608 .unwrap();
1609 }
1610
1611 let db = LocalDb::open_path(&path).unwrap();
1612 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1613 assert_eq!(rows.len(), 1);
1614 assert_eq!(rows[0].id, "legacy-1");
1615 assert_eq!(rows[0].user_message_count, 0);
1616 }
1617
1618 #[test]
1619 fn test_open_rotates_incompatible_legacy_schema() {
1620 let dir = tempfile::tempdir().unwrap();
1621 let path = dir.path().join("broken.db");
1622 {
1623 let conn = Connection::open(&path).unwrap();
1624 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1625 .unwrap();
1626 }
1627
1628 let db = LocalDb::open_path(&path).unwrap();
1629 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1630 assert!(rows.is_empty());
1631
1632 let rotated = std::fs::read_dir(dir.path())
1633 .unwrap()
1634 .filter_map(Result::ok)
1635 .any(|entry| {
1636 let name = entry.file_name();
1637 let name = name.to_string_lossy();
1638 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1639 });
1640 assert!(rotated, "expected rotated legacy backup file");
1641 }
1642
1643 #[test]
1644 fn test_is_opencode_child_session() {
1645 let root = temp_root();
1646 let dir = root.path().join("sessions");
1647 create_dir_all(&dir).unwrap();
1648 let parent_session = dir.join("parent.json");
1649 write(
1650 &parent_session,
1651 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1652 )
1653 .unwrap();
1654 let child_session = dir.join("child.json");
1655 write(
1656 &child_session,
1657 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1658 )
1659 .unwrap();
1660
1661 let parent = make_row(
1662 "ses_parent",
1663 "opencode",
1664 Some(parent_session.to_str().unwrap()),
1665 );
1666 let child = make_row(
1667 "ses_child",
1668 "opencode",
1669 Some(child_session.to_str().unwrap()),
1670 );
1671 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1672
1673 assert!(!is_opencode_child_session(&parent));
1674 assert!(is_opencode_child_session(&child));
1675 assert!(!is_opencode_child_session(&codex));
1676 }
1677
1678 #[test]
1679 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1680 let child = LocalSessionRow {
1681 id: "sess_child".to_string(),
1682 source_path: None,
1683 sync_status: "local_only".to_string(),
1684 last_synced_at: None,
1685 user_id: None,
1686 nickname: None,
1687 team_id: None,
1688 tool: "opencode".to_string(),
1689 agent_provider: None,
1690 agent_model: None,
1691 title: None,
1692 description: None,
1693 tags: None,
1694 created_at: "2024-01-01T00:00:00Z".to_string(),
1695 uploaded_at: None,
1696 message_count: 1,
1697 user_message_count: 0,
1698 task_count: 4,
1699 event_count: 4,
1700 duration_seconds: 0,
1701 total_input_tokens: 0,
1702 total_output_tokens: 0,
1703 git_remote: None,
1704 git_branch: None,
1705 git_commit: None,
1706 git_repo_name: None,
1707 pr_number: None,
1708 pr_url: None,
1709 working_directory: None,
1710 files_modified: None,
1711 files_read: None,
1712 has_errors: false,
1713 max_active_agents: 1,
1714 };
1715
1716 let parent = LocalSessionRow {
1717 id: "sess_parent".to_string(),
1718 source_path: None,
1719 sync_status: "local_only".to_string(),
1720 last_synced_at: None,
1721 user_id: None,
1722 nickname: None,
1723 team_id: None,
1724 tool: "opencode".to_string(),
1725 agent_provider: None,
1726 agent_model: None,
1727 title: Some("regular".to_string()),
1728 description: None,
1729 tags: None,
1730 created_at: "2024-01-01T00:00:00Z".to_string(),
1731 uploaded_at: None,
1732 message_count: 1,
1733 user_message_count: 1,
1734 task_count: 2,
1735 event_count: 20,
1736 duration_seconds: 0,
1737 total_input_tokens: 0,
1738 total_output_tokens: 0,
1739 git_remote: None,
1740 git_branch: None,
1741 git_commit: None,
1742 git_repo_name: None,
1743 pr_number: None,
1744 pr_url: None,
1745 working_directory: None,
1746 files_modified: None,
1747 files_read: None,
1748 has_errors: false,
1749 max_active_agents: 1,
1750 };
1751
1752 assert!(is_opencode_child_session(&child));
1753 assert!(!is_opencode_child_session(&parent));
1754 }
1755
1756 #[test]
1757 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1758 let child = LocalSessionRow {
1759 id: "sess_child_2".to_string(),
1760 source_path: None,
1761 sync_status: "local_only".to_string(),
1762 last_synced_at: None,
1763 user_id: None,
1764 nickname: None,
1765 team_id: None,
1766 tool: "opencode".to_string(),
1767 agent_provider: None,
1768 agent_model: None,
1769 title: None,
1770 description: None,
1771 tags: None,
1772 created_at: "2024-01-01T00:00:00Z".to_string(),
1773 uploaded_at: None,
1774 message_count: 2,
1775 user_message_count: 0,
1776 task_count: 4,
1777 event_count: 4,
1778 duration_seconds: 0,
1779 total_input_tokens: 0,
1780 total_output_tokens: 0,
1781 git_remote: None,
1782 git_branch: None,
1783 git_commit: None,
1784 git_repo_name: None,
1785 pr_number: None,
1786 pr_url: None,
1787 working_directory: None,
1788 files_modified: None,
1789 files_read: None,
1790 has_errors: false,
1791 max_active_agents: 1,
1792 };
1793
1794 let parent = LocalSessionRow {
1795 id: "sess_parent".to_string(),
1796 source_path: None,
1797 sync_status: "local_only".to_string(),
1798 last_synced_at: None,
1799 user_id: None,
1800 nickname: None,
1801 team_id: None,
1802 tool: "opencode".to_string(),
1803 agent_provider: None,
1804 agent_model: None,
1805 title: Some("regular".to_string()),
1806 description: None,
1807 tags: None,
1808 created_at: "2024-01-01T00:00:00Z".to_string(),
1809 uploaded_at: None,
1810 message_count: 2,
1811 user_message_count: 1,
1812 task_count: 5,
1813 event_count: 20,
1814 duration_seconds: 0,
1815 total_input_tokens: 0,
1816 total_output_tokens: 0,
1817 git_remote: None,
1818 git_branch: None,
1819 git_commit: None,
1820 git_repo_name: None,
1821 pr_number: None,
1822 pr_url: None,
1823 working_directory: None,
1824 files_modified: None,
1825 files_read: None,
1826 has_errors: false,
1827 max_active_agents: 1,
1828 };
1829
1830 assert!(is_opencode_child_session(&child));
1831 assert!(!is_opencode_child_session(&parent));
1832 }
1833
1834 #[test]
1835 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1836 let child = LocalSessionRow {
1837 id: "sess_child_3".to_string(),
1838 source_path: None,
1839 sync_status: "local_only".to_string(),
1840 last_synced_at: None,
1841 user_id: None,
1842 nickname: None,
1843 team_id: None,
1844 tool: "opencode".to_string(),
1845 agent_provider: None,
1846 agent_model: None,
1847 title: None,
1848 description: None,
1849 tags: None,
1850 created_at: "2024-01-01T00:00:00Z".to_string(),
1851 uploaded_at: None,
1852 message_count: 3,
1853 user_message_count: 0,
1854 task_count: 2,
1855 event_count: 6,
1856 duration_seconds: 0,
1857 total_input_tokens: 0,
1858 total_output_tokens: 0,
1859 git_remote: None,
1860 git_branch: None,
1861 git_commit: None,
1862 git_repo_name: None,
1863 pr_number: None,
1864 pr_url: None,
1865 working_directory: None,
1866 files_modified: None,
1867 files_read: None,
1868 has_errors: false,
1869 max_active_agents: 1,
1870 };
1871
1872 assert!(is_opencode_child_session(&child));
1873 }
1874
1875 #[test]
1876 fn test_parse_opencode_parent_session_id_aliases() {
1877 let root = temp_root();
1878 let dir = root.path().join("session-aliases");
1879 create_dir_all(&dir).unwrap();
1880 let child_session = dir.join("child.json");
1881 write(
1882 &child_session,
1883 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1884 )
1885 .unwrap();
1886 assert_eq!(
1887 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1888 Some("ses_parent")
1889 );
1890 }
1891
1892 #[test]
1893 fn test_parse_opencode_parent_session_id_nested_metadata() {
1894 let root = temp_root();
1895 let dir = root.path().join("session-nested");
1896 create_dir_all(&dir).unwrap();
1897 let child_session = dir.join("child.json");
1898 write(
1899 &child_session,
1900 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1901 )
1902 .unwrap();
1903 assert_eq!(
1904 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1905 Some("ses_parent")
1906 );
1907 }
1908
1909 #[test]
1910 fn test_is_claude_subagent_session() {
1911 let row = make_row(
1912 "ses_parent",
1913 "claude-code",
1914 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1915 );
1916 assert!(!is_opencode_child_session(&row));
1917 assert!(is_claude_subagent_session(&row));
1918 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1919 }
1920
1921 #[test]
1922 fn test_hide_opencode_child_sessions() {
1923 let root = temp_root();
1924 let dir = root.path().join("sessions");
1925 create_dir_all(&dir).unwrap();
1926 let parent_session = dir.join("parent.json");
1927 let child_session = dir.join("child.json");
1928 let orphan_session = dir.join("orphan.json");
1929
1930 write(
1931 &parent_session,
1932 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1933 )
1934 .unwrap();
1935 write(
1936 &child_session,
1937 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1938 )
1939 .unwrap();
1940 write(
1941 &orphan_session,
1942 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1943 )
1944 .unwrap();
1945
1946 let rows = vec![
1947 make_row(
1948 "ses_child",
1949 "opencode",
1950 Some(child_session.to_str().unwrap()),
1951 ),
1952 make_row(
1953 "ses_parent",
1954 "opencode",
1955 Some(parent_session.to_str().unwrap()),
1956 ),
1957 {
1958 let mut row = make_row("ses_other", "codex", None);
1959 row.user_message_count = 1;
1960 row
1961 },
1962 make_row(
1963 "ses_orphan",
1964 "opencode",
1965 Some(orphan_session.to_str().unwrap()),
1966 ),
1967 ];
1968
1969 let filtered = hide_opencode_child_sessions(rows);
1970 assert_eq!(filtered.len(), 3);
1971 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1972 }
1973
1974 #[test]
1975 fn test_sync_cursor() {
1976 let db = test_db();
1977 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1978 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1979 assert_eq!(
1980 db.get_sync_cursor("team1").unwrap(),
1981 Some("2024-01-01T00:00:00Z".to_string())
1982 );
1983 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1985 assert_eq!(
1986 db.get_sync_cursor("team1").unwrap(),
1987 Some("2024-06-01T00:00:00Z".to_string())
1988 );
1989 }
1990
1991 #[test]
1992 fn test_body_cache() {
1993 let db = test_db();
1994 assert_eq!(db.get_cached_body("s1").unwrap(), None);
1995 db.cache_body("s1", b"hello world").unwrap();
1996 assert_eq!(
1997 db.get_cached_body("s1").unwrap(),
1998 Some(b"hello world".to_vec())
1999 );
2000 }
2001
2002 #[test]
2003 fn test_timeline_summary_cache_roundtrip() {
2004 let db = test_db();
2005 db.upsert_timeline_summary_cache(
2006 "k1",
2007 "timeline:v1",
2008 "compact text",
2009 "{\"kind\":\"turn-summary\"}",
2010 "raw text",
2011 )
2012 .unwrap();
2013
2014 let rows = db
2015 .list_timeline_summary_cache_by_namespace("timeline:v1")
2016 .unwrap();
2017 assert_eq!(rows.len(), 1);
2018 assert_eq!(rows[0].lookup_key, "k1");
2019 assert_eq!(rows[0].namespace, "timeline:v1");
2020 assert_eq!(rows[0].compact, "compact text");
2021 assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2022 assert_eq!(rows[0].raw, "raw text");
2023
2024 let cleared = db.clear_timeline_summary_cache().unwrap();
2025 assert_eq!(cleared, 1);
2026 let rows_after = db
2027 .list_timeline_summary_cache_by_namespace("timeline:v1")
2028 .unwrap();
2029 assert!(rows_after.is_empty());
2030 }
2031
2032 #[test]
2033 fn test_local_migrations_include_timeline_summary_cache() {
2034 let db = test_db();
2035 let conn = db.conn();
2036 let applied: bool = conn
2037 .query_row(
2038 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2039 params!["local_0003_timeline_summary_cache"],
2040 |row| row.get(0),
2041 )
2042 .unwrap();
2043 assert!(applied);
2044 }
2045
2046 #[test]
2047 fn test_local_migration_files_match_api_local_migrations() {
2048 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2049 std::fs::read_dir(dir)
2050 .expect("read migrations directory")
2051 .filter_map(Result::ok)
2052 .map(|entry| entry.file_name().to_string_lossy().to_string())
2053 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2054 .collect()
2055 }
2056
2057 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2058 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2059 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2060
2061 assert_eq!(
2062 local_files, api_files,
2063 "local-db local migrations must stay in parity with api local migrations"
2064 );
2065 }
2066
2067 #[test]
2068 fn test_upsert_remote_session() {
2069 let db = test_db();
2070 let summary = RemoteSessionSummary {
2071 id: "remote-1".to_string(),
2072 user_id: Some("u1".to_string()),
2073 nickname: Some("alice".to_string()),
2074 team_id: "t1".to_string(),
2075 tool: "claude-code".to_string(),
2076 agent_provider: None,
2077 agent_model: None,
2078 title: Some("Test session".to_string()),
2079 description: None,
2080 tags: None,
2081 created_at: "2024-01-01T00:00:00Z".to_string(),
2082 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2083 message_count: 10,
2084 task_count: 2,
2085 event_count: 20,
2086 duration_seconds: 300,
2087 total_input_tokens: 1000,
2088 total_output_tokens: 500,
2089 git_remote: None,
2090 git_branch: None,
2091 git_commit: None,
2092 git_repo_name: None,
2093 pr_number: None,
2094 pr_url: None,
2095 working_directory: None,
2096 files_modified: None,
2097 files_read: None,
2098 has_errors: false,
2099 max_active_agents: 1,
2100 };
2101 db.upsert_remote_session(&summary).unwrap();
2102
2103 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2104 assert_eq!(sessions.len(), 1);
2105 assert_eq!(sessions[0].id, "remote-1");
2106 assert_eq!(sessions[0].sync_status, "remote_only");
2107 assert_eq!(sessions[0].nickname, None); }
2109
2110 #[test]
2111 fn test_list_filter_by_repo() {
2112 let db = test_db();
2113 let summary1 = RemoteSessionSummary {
2115 id: "s1".to_string(),
2116 user_id: None,
2117 nickname: None,
2118 team_id: "t1".to_string(),
2119 tool: "claude-code".to_string(),
2120 agent_provider: None,
2121 agent_model: None,
2122 title: Some("Session 1".to_string()),
2123 description: None,
2124 tags: None,
2125 created_at: "2024-01-01T00:00:00Z".to_string(),
2126 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2127 message_count: 5,
2128 task_count: 0,
2129 event_count: 10,
2130 duration_seconds: 60,
2131 total_input_tokens: 100,
2132 total_output_tokens: 50,
2133 git_remote: None,
2134 git_branch: None,
2135 git_commit: None,
2136 git_repo_name: None,
2137 pr_number: None,
2138 pr_url: None,
2139 working_directory: None,
2140 files_modified: None,
2141 files_read: None,
2142 has_errors: false,
2143 max_active_agents: 1,
2144 };
2145 db.upsert_remote_session(&summary1).unwrap();
2146
2147 let filter = LocalSessionFilter {
2149 team_id: Some("t1".to_string()),
2150 ..Default::default()
2151 };
2152 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2153
2154 let filter = LocalSessionFilter {
2155 team_id: Some("t999".to_string()),
2156 ..Default::default()
2157 };
2158 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2159 }
2160
2161 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2164 RemoteSessionSummary {
2165 id: id.to_string(),
2166 user_id: None,
2167 nickname: None,
2168 team_id: "t1".to_string(),
2169 tool: tool.to_string(),
2170 agent_provider: Some("anthropic".to_string()),
2171 agent_model: Some("claude-opus-4-6".to_string()),
2172 title: Some(title.to_string()),
2173 description: None,
2174 tags: None,
2175 created_at: created_at.to_string(),
2176 uploaded_at: created_at.to_string(),
2177 message_count: 5,
2178 task_count: 1,
2179 event_count: 10,
2180 duration_seconds: 300,
2181 total_input_tokens: 1000,
2182 total_output_tokens: 500,
2183 git_remote: None,
2184 git_branch: None,
2185 git_commit: None,
2186 git_repo_name: None,
2187 pr_number: None,
2188 pr_url: None,
2189 working_directory: None,
2190 files_modified: None,
2191 files_read: None,
2192 has_errors: false,
2193 max_active_agents: 1,
2194 }
2195 }
2196
2197 fn seed_sessions(db: &LocalDb) {
2198 db.upsert_remote_session(&make_summary(
2200 "s1",
2201 "claude-code",
2202 "First session",
2203 "2024-01-01T00:00:00Z",
2204 ))
2205 .unwrap();
2206 db.upsert_remote_session(&make_summary(
2207 "s2",
2208 "claude-code",
2209 "JWT auth work",
2210 "2024-01-02T00:00:00Z",
2211 ))
2212 .unwrap();
2213 db.upsert_remote_session(&make_summary(
2214 "s3",
2215 "gemini",
2216 "Gemini test",
2217 "2024-01-03T00:00:00Z",
2218 ))
2219 .unwrap();
2220 db.upsert_remote_session(&make_summary(
2221 "s4",
2222 "claude-code",
2223 "Error handling",
2224 "2024-01-04T00:00:00Z",
2225 ))
2226 .unwrap();
2227 db.upsert_remote_session(&make_summary(
2228 "s5",
2229 "claude-code",
2230 "Final polish",
2231 "2024-01-05T00:00:00Z",
2232 ))
2233 .unwrap();
2234 }
2235
2236 #[test]
2239 fn test_log_no_filters() {
2240 let db = test_db();
2241 seed_sessions(&db);
2242 let filter = LogFilter::default();
2243 let results = db.list_sessions_log(&filter).unwrap();
2244 assert_eq!(results.len(), 5);
2245 assert_eq!(results[0].id, "s5");
2247 assert_eq!(results[4].id, "s1");
2248 }
2249
2250 #[test]
2251 fn test_log_filter_by_tool() {
2252 let db = test_db();
2253 seed_sessions(&db);
2254 let filter = LogFilter {
2255 tool: Some("claude-code".to_string()),
2256 ..Default::default()
2257 };
2258 let results = db.list_sessions_log(&filter).unwrap();
2259 assert_eq!(results.len(), 4);
2260 assert!(results.iter().all(|s| s.tool == "claude-code"));
2261 }
2262
2263 #[test]
2264 fn test_log_filter_by_model_wildcard() {
2265 let db = test_db();
2266 seed_sessions(&db);
2267 let filter = LogFilter {
2268 model: Some("claude*".to_string()),
2269 ..Default::default()
2270 };
2271 let results = db.list_sessions_log(&filter).unwrap();
2272 assert_eq!(results.len(), 5); }
2274
2275 #[test]
2276 fn test_log_filter_since() {
2277 let db = test_db();
2278 seed_sessions(&db);
2279 let filter = LogFilter {
2280 since: Some("2024-01-03T00:00:00Z".to_string()),
2281 ..Default::default()
2282 };
2283 let results = db.list_sessions_log(&filter).unwrap();
2284 assert_eq!(results.len(), 3); }
2286
2287 #[test]
2288 fn test_log_filter_before() {
2289 let db = test_db();
2290 seed_sessions(&db);
2291 let filter = LogFilter {
2292 before: Some("2024-01-03T00:00:00Z".to_string()),
2293 ..Default::default()
2294 };
2295 let results = db.list_sessions_log(&filter).unwrap();
2296 assert_eq!(results.len(), 2); }
2298
2299 #[test]
2300 fn test_log_filter_since_and_before() {
2301 let db = test_db();
2302 seed_sessions(&db);
2303 let filter = LogFilter {
2304 since: Some("2024-01-02T00:00:00Z".to_string()),
2305 before: Some("2024-01-04T00:00:00Z".to_string()),
2306 ..Default::default()
2307 };
2308 let results = db.list_sessions_log(&filter).unwrap();
2309 assert_eq!(results.len(), 2); }
2311
2312 #[test]
2313 fn test_log_filter_grep() {
2314 let db = test_db();
2315 seed_sessions(&db);
2316 let filter = LogFilter {
2317 grep: Some("JWT".to_string()),
2318 ..Default::default()
2319 };
2320 let results = db.list_sessions_log(&filter).unwrap();
2321 assert_eq!(results.len(), 1);
2322 assert_eq!(results[0].id, "s2");
2323 }
2324
2325 #[test]
2326 fn test_log_limit_and_offset() {
2327 let db = test_db();
2328 seed_sessions(&db);
2329 let filter = LogFilter {
2330 limit: Some(2),
2331 offset: Some(1),
2332 ..Default::default()
2333 };
2334 let results = db.list_sessions_log(&filter).unwrap();
2335 assert_eq!(results.len(), 2);
2336 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2338 }
2339
2340 #[test]
2341 fn test_log_limit_only() {
2342 let db = test_db();
2343 seed_sessions(&db);
2344 let filter = LogFilter {
2345 limit: Some(3),
2346 ..Default::default()
2347 };
2348 let results = db.list_sessions_log(&filter).unwrap();
2349 assert_eq!(results.len(), 3);
2350 }
2351
2352 #[test]
2353 fn test_list_sessions_limit_offset() {
2354 let db = test_db();
2355 seed_sessions(&db);
2356 let filter = LocalSessionFilter {
2357 limit: Some(2),
2358 offset: Some(1),
2359 ..Default::default()
2360 };
2361 let results = db.list_sessions(&filter).unwrap();
2362 assert_eq!(results.len(), 2);
2363 assert_eq!(results[0].id, "s4");
2364 assert_eq!(results[1].id, "s3");
2365 }
2366
2367 #[test]
2368 fn test_count_sessions_filtered() {
2369 let db = test_db();
2370 seed_sessions(&db);
2371 let count = db
2372 .count_sessions_filtered(&LocalSessionFilter::default())
2373 .unwrap();
2374 assert_eq!(count, 5);
2375 }
2376
2377 #[test]
2378 fn test_list_working_directories_distinct_non_empty() {
2379 let db = test_db();
2380
2381 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2382 a.working_directory = Some("/tmp/repo-a".to_string());
2383 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2384 b.working_directory = Some("/tmp/repo-a".to_string());
2385 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2386 c.working_directory = Some("/tmp/repo-b".to_string());
2387 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2388 d.working_directory = Some("".to_string());
2389
2390 db.upsert_remote_session(&a).unwrap();
2391 db.upsert_remote_session(&b).unwrap();
2392 db.upsert_remote_session(&c).unwrap();
2393 db.upsert_remote_session(&d).unwrap();
2394
2395 let dirs = db.list_working_directories().unwrap();
2396 assert_eq!(
2397 dirs,
2398 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2399 );
2400 }
2401
2402 #[test]
2403 fn test_list_session_tools() {
2404 let db = test_db();
2405 seed_sessions(&db);
2406 let tools = db
2407 .list_session_tools(&LocalSessionFilter::default())
2408 .unwrap();
2409 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2410 }
2411
2412 #[test]
2413 fn test_log_combined_filters() {
2414 let db = test_db();
2415 seed_sessions(&db);
2416 let filter = LogFilter {
2417 tool: Some("claude-code".to_string()),
2418 since: Some("2024-01-03T00:00:00Z".to_string()),
2419 limit: Some(1),
2420 ..Default::default()
2421 };
2422 let results = db.list_sessions_log(&filter).unwrap();
2423 assert_eq!(results.len(), 1);
2424 assert_eq!(results[0].id, "s5"); }
2426
2427 #[test]
2430 fn test_get_session_by_offset() {
2431 let db = test_db();
2432 seed_sessions(&db);
2433 let row = db.get_session_by_offset(0).unwrap().unwrap();
2434 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2436 assert_eq!(row.id, "s3");
2437 assert!(db.get_session_by_offset(10).unwrap().is_none());
2438 }
2439
2440 #[test]
2441 fn test_get_session_by_tool_offset() {
2442 let db = test_db();
2443 seed_sessions(&db);
2444 let row = db
2445 .get_session_by_tool_offset("claude-code", 0)
2446 .unwrap()
2447 .unwrap();
2448 assert_eq!(row.id, "s5");
2449 let row = db
2450 .get_session_by_tool_offset("claude-code", 1)
2451 .unwrap()
2452 .unwrap();
2453 assert_eq!(row.id, "s4");
2454 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2455 assert_eq!(row.id, "s3");
2456 assert!(db
2457 .get_session_by_tool_offset("gemini", 1)
2458 .unwrap()
2459 .is_none());
2460 }
2461
2462 #[test]
2463 fn test_get_sessions_latest() {
2464 let db = test_db();
2465 seed_sessions(&db);
2466 let rows = db.get_sessions_latest(3).unwrap();
2467 assert_eq!(rows.len(), 3);
2468 assert_eq!(rows[0].id, "s5");
2469 assert_eq!(rows[1].id, "s4");
2470 assert_eq!(rows[2].id, "s3");
2471 }
2472
2473 #[test]
2474 fn test_get_sessions_by_tool_latest() {
2475 let db = test_db();
2476 seed_sessions(&db);
2477 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2478 assert_eq!(rows.len(), 2);
2479 assert_eq!(rows[0].id, "s5");
2480 assert_eq!(rows[1].id, "s4");
2481 }
2482
2483 #[test]
2484 fn test_get_sessions_latest_more_than_available() {
2485 let db = test_db();
2486 seed_sessions(&db);
2487 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2488 assert_eq!(rows.len(), 1); }
2490
2491 #[test]
2492 fn test_session_count() {
2493 let db = test_db();
2494 assert_eq!(db.session_count().unwrap(), 0);
2495 seed_sessions(&db);
2496 assert_eq!(db.session_count().unwrap(), 5);
2497 }
2498
2499 #[test]
2502 fn test_link_commit_session() {
2503 let db = test_db();
2504 seed_sessions(&db);
2505 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2506 .unwrap();
2507
2508 let commits = db.get_commits_by_session("s1").unwrap();
2509 assert_eq!(commits.len(), 1);
2510 assert_eq!(commits[0].commit_hash, "abc123");
2511 assert_eq!(commits[0].session_id, "s1");
2512 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2513 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2514
2515 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2516 assert_eq!(sessions.len(), 1);
2517 assert_eq!(sessions[0].id, "s1");
2518 }
2519
2520 #[test]
2521 fn test_get_sessions_by_commit() {
2522 let db = test_db();
2523 seed_sessions(&db);
2524 db.link_commit_session("abc123", "s1", None, None).unwrap();
2526 db.link_commit_session("abc123", "s2", None, None).unwrap();
2527 db.link_commit_session("abc123", "s3", None, None).unwrap();
2528
2529 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2530 assert_eq!(sessions.len(), 3);
2531 assert_eq!(sessions[0].id, "s3");
2533 assert_eq!(sessions[1].id, "s2");
2534 assert_eq!(sessions[2].id, "s1");
2535 }
2536
2537 #[test]
2538 fn test_get_commits_by_session() {
2539 let db = test_db();
2540 seed_sessions(&db);
2541 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2543 .unwrap();
2544 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2545 .unwrap();
2546 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2547 .unwrap();
2548
2549 let commits = db.get_commits_by_session("s1").unwrap();
2550 assert_eq!(commits.len(), 3);
2551 assert!(commits.iter().all(|c| c.session_id == "s1"));
2553 }
2554
2555 #[test]
2556 fn test_duplicate_link_ignored() {
2557 let db = test_db();
2558 seed_sessions(&db);
2559 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2560 .unwrap();
2561 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2563 .unwrap();
2564
2565 let commits = db.get_commits_by_session("s1").unwrap();
2566 assert_eq!(commits.len(), 1);
2567 }
2568
2569 #[test]
2570 fn test_log_filter_by_commit() {
2571 let db = test_db();
2572 seed_sessions(&db);
2573 db.link_commit_session("abc123", "s2", None, None).unwrap();
2574 db.link_commit_session("abc123", "s4", None, None).unwrap();
2575
2576 let filter = LogFilter {
2577 commit: Some("abc123".to_string()),
2578 ..Default::default()
2579 };
2580 let results = db.list_sessions_log(&filter).unwrap();
2581 assert_eq!(results.len(), 2);
2582 assert_eq!(results[0].id, "s4");
2583 assert_eq!(results[1].id, "s2");
2584
2585 let filter = LogFilter {
2587 commit: Some("nonexistent".to_string()),
2588 ..Default::default()
2589 };
2590 let results = db.list_sessions_log(&filter).unwrap();
2591 assert_eq!(results.len(), 0);
2592 }
2593}