1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0002_team_invite_keys",
21 include_str!("../migrations/0002_team_invite_keys.sql"),
22 ),
23 (
24 "0003_max_active_agents",
25 include_str!("../migrations/0003_max_active_agents.sql"),
26 ),
27 (
28 "0004_oauth_states_provider",
29 include_str!("../migrations/0004_oauth_states_provider.sql"),
30 ),
31 (
32 "0005_sessions_body_url_backfill",
33 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34 ),
35 (
36 "0006_sessions_remove_fk_constraints",
37 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38 ),
39 (
40 "0007_sessions_list_perf_indexes",
41 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42 ),
43 (
44 "0008_teams_force_public",
45 include_str!("../migrations/0008_teams_force_public.sql"),
46 ),
47 (
48 "0009_session_score_plugin",
49 include_str!("../migrations/0009_session_score_plugin.sql"),
50 ),
51];
52
53const LOCAL_MIGRATIONS: &[Migration] = &[
54 (
55 "local_0001_schema",
56 include_str!("../migrations/local_0001_schema.sql"),
57 ),
58 (
59 "local_0002_drop_unused_local_sessions",
60 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
61 ),
62 (
63 "local_0003_timeline_summary_cache",
64 include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
65 ),
66];
67
68#[derive(Debug, Clone)]
70pub struct LocalSessionRow {
71 pub id: String,
72 pub source_path: Option<String>,
73 pub sync_status: String,
74 pub last_synced_at: Option<String>,
75 pub user_id: Option<String>,
76 pub nickname: Option<String>,
77 pub team_id: Option<String>,
78 pub tool: String,
79 pub agent_provider: Option<String>,
80 pub agent_model: Option<String>,
81 pub title: Option<String>,
82 pub description: Option<String>,
83 pub tags: Option<String>,
84 pub created_at: String,
85 pub uploaded_at: Option<String>,
86 pub message_count: i64,
87 pub user_message_count: i64,
88 pub task_count: i64,
89 pub event_count: i64,
90 pub duration_seconds: i64,
91 pub total_input_tokens: i64,
92 pub total_output_tokens: i64,
93 pub git_remote: Option<String>,
94 pub git_branch: Option<String>,
95 pub git_commit: Option<String>,
96 pub git_repo_name: Option<String>,
97 pub pr_number: Option<i64>,
98 pub pr_url: Option<String>,
99 pub working_directory: Option<String>,
100 pub files_modified: Option<String>,
101 pub files_read: Option<String>,
102 pub has_errors: bool,
103 pub max_active_agents: i64,
104}
105
106#[derive(Debug, Clone)]
108pub struct CommitLink {
109 pub commit_hash: String,
110 pub session_id: String,
111 pub repo_path: Option<String>,
112 pub branch: Option<String>,
113 pub created_at: String,
114}
115
116#[derive(Debug, Clone)]
118pub struct TimelineSummaryCacheRow {
119 pub lookup_key: String,
120 pub namespace: String,
121 pub compact: String,
122 pub payload: String,
123 pub raw: String,
124 pub cached_at: String,
125}
126
127pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
129 if row.tool != "opencode" {
130 return false;
131 }
132
133 if row.user_message_count <= 0
137 && row.message_count <= 4
138 && row.task_count <= 4
139 && row.event_count > 0
140 && row.event_count <= 16
141 {
142 return true;
143 }
144
145 if is_opencode_subagent_source(row.source_path.as_deref()) {
146 return true;
147 }
148
149 let source_path = match row.source_path.as_deref() {
150 Some(path) if !path.trim().is_empty() => path,
151 _ => return false,
152 };
153
154 parse_opencode_parent_session_id(source_path)
155 .is_some_and(|parent_id| !parent_id.trim().is_empty())
156}
157
158pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
160 let text = fs::read_to_string(source_path).ok()?;
161 let json: Value = serde_json::from_str(&text).ok()?;
162 lookup_parent_session_id(&json)
163}
164
165fn lookup_parent_session_id(value: &Value) -> Option<String> {
166 match value {
167 Value::Object(obj) => {
168 for (key, value) in obj {
169 if is_parent_id_key(key) {
170 if let Some(parent_id) = value.as_str() {
171 let parent_id = parent_id.trim();
172 if !parent_id.is_empty() {
173 return Some(parent_id.to_string());
174 }
175 }
176 }
177 if let Some(parent_id) = lookup_parent_session_id(value) {
178 return Some(parent_id);
179 }
180 }
181 None
182 }
183 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
184 _ => None,
185 }
186}
187
188fn is_parent_id_key(key: &str) -> bool {
189 let flat = key
190 .chars()
191 .filter(|c| c.is_ascii_alphanumeric())
192 .map(|c| c.to_ascii_lowercase())
193 .collect::<String>();
194
195 flat == "parentid"
196 || flat == "parentuuid"
197 || flat == "parentsessionid"
198 || flat == "parentsessionuuid"
199 || flat.ends_with("parentsessionid")
200 || (flat.contains("parent") && flat.ends_with("id"))
201 || (flat.contains("parent") && flat.ends_with("uuid"))
202}
203
204pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
206 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
207 rows
208}
209
210fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
211 is_subagent_source(source_path)
212}
213
214fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
215 if row.tool != "claude-code" {
216 return false;
217 }
218
219 is_subagent_source(row.source_path.as_deref())
220}
221
222fn is_subagent_source(source_path: Option<&str>) -> bool {
223 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
224 return false;
225 };
226
227 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
228 return true;
229 }
230
231 let filename = match std::path::Path::new(&source_path).file_name() {
232 Some(name) => name.to_string_lossy(),
233 None => return false,
234 };
235
236 filename.starts_with("agent-")
237 || filename.starts_with("agent_")
238 || filename.starts_with("subagent-")
239 || filename.starts_with("subagent_")
240}
241
242#[derive(Debug, Clone)]
244pub struct LocalSessionFilter {
245 pub team_id: Option<String>,
246 pub sync_status: Option<String>,
247 pub git_repo_name: Option<String>,
248 pub search: Option<String>,
249 pub tool: Option<String>,
250 pub sort: LocalSortOrder,
251 pub time_range: LocalTimeRange,
252 pub limit: Option<u32>,
253 pub offset: Option<u32>,
254}
255
256impl Default for LocalSessionFilter {
257 fn default() -> Self {
258 Self {
259 team_id: None,
260 sync_status: None,
261 git_repo_name: None,
262 search: None,
263 tool: None,
264 sort: LocalSortOrder::Recent,
265 time_range: LocalTimeRange::All,
266 limit: None,
267 offset: None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Default, PartialEq, Eq)]
274pub enum LocalSortOrder {
275 #[default]
276 Recent,
277 Popular,
278 Longest,
279}
280
281#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalTimeRange {
284 Hours24,
285 Days7,
286 Days30,
287 #[default]
288 All,
289}
290
291#[derive(Debug, Clone)]
293pub struct RemoteSessionSummary {
294 pub id: String,
295 pub user_id: Option<String>,
296 pub nickname: Option<String>,
297 pub team_id: String,
298 pub tool: String,
299 pub agent_provider: Option<String>,
300 pub agent_model: Option<String>,
301 pub title: Option<String>,
302 pub description: Option<String>,
303 pub tags: Option<String>,
304 pub created_at: String,
305 pub uploaded_at: String,
306 pub message_count: i64,
307 pub task_count: i64,
308 pub event_count: i64,
309 pub duration_seconds: i64,
310 pub total_input_tokens: i64,
311 pub total_output_tokens: i64,
312 pub git_remote: Option<String>,
313 pub git_branch: Option<String>,
314 pub git_commit: Option<String>,
315 pub git_repo_name: Option<String>,
316 pub pr_number: Option<i64>,
317 pub pr_url: Option<String>,
318 pub working_directory: Option<String>,
319 pub files_modified: Option<String>,
320 pub files_read: Option<String>,
321 pub has_errors: bool,
322 pub max_active_agents: i64,
323}
324
325#[derive(Debug, Default)]
327pub struct LogFilter {
328 pub tool: Option<String>,
330 pub model: Option<String>,
332 pub since: Option<String>,
334 pub before: Option<String>,
336 pub touches: Option<String>,
338 pub grep: Option<String>,
340 pub has_errors: Option<bool>,
342 pub working_directory: Option<String>,
344 pub git_repo_name: Option<String>,
346 pub commit: Option<String>,
348 pub limit: Option<u32>,
350 pub offset: Option<u32>,
352}
353
354const FROM_CLAUSE: &str = "\
356FROM sessions s \
357LEFT JOIN session_sync ss ON ss.session_id = s.id \
358LEFT JOIN users u ON u.id = s.user_id";
359
360pub struct LocalDb {
364 conn: Mutex<Connection>,
365}
366
367impl LocalDb {
368 pub fn open() -> Result<Self> {
371 let path = default_db_path()?;
372 Self::open_path(&path)
373 }
374
375 pub fn open_path(path: &PathBuf) -> Result<Self> {
377 if let Some(parent) = path.parent() {
378 std::fs::create_dir_all(parent)
379 .with_context(|| format!("create dir for {}", path.display()))?;
380 }
381 match open_connection_with_latest_schema(path) {
382 Ok(conn) => Ok(Self {
383 conn: Mutex::new(conn),
384 }),
385 Err(err) => {
386 if !is_schema_compat_error(&err) {
387 return Err(err);
388 }
389
390 rotate_legacy_db(path)?;
393
394 let conn = open_connection_with_latest_schema(path)
395 .with_context(|| format!("recreate db {}", path.display()))?;
396 Ok(Self {
397 conn: Mutex::new(conn),
398 })
399 }
400 }
401 }
402
403 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
404 self.conn.lock().expect("local db mutex poisoned")
405 }
406
407 pub fn upsert_local_session(
410 &self,
411 session: &Session,
412 source_path: &str,
413 git: &GitContext,
414 ) -> Result<()> {
415 let title = session.context.title.as_deref();
416 let description = session.context.description.as_deref();
417 let tags = if session.context.tags.is_empty() {
418 None
419 } else {
420 Some(session.context.tags.join(","))
421 };
422 let created_at = session.context.created_at.to_rfc3339();
423 let cwd = session
424 .context
425 .attributes
426 .get("cwd")
427 .or_else(|| session.context.attributes.get("working_directory"))
428 .and_then(|v| v.as_str().map(String::from));
429
430 let (files_modified, files_read, has_errors) =
432 opensession_core::extract::extract_file_metadata(session);
433 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
434
435 let conn = self.conn();
436 conn.execute(
439 "INSERT INTO sessions \
440 (id, team_id, tool, agent_provider, agent_model, \
441 title, description, tags, created_at, \
442 message_count, user_message_count, task_count, event_count, duration_seconds, \
443 total_input_tokens, total_output_tokens, body_storage_key, \
444 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
445 files_modified, files_read, has_errors, max_active_agents) \
446 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
447 ON CONFLICT(id) DO UPDATE SET \
448 tool=excluded.tool, agent_provider=excluded.agent_provider, \
449 agent_model=excluded.agent_model, \
450 title=excluded.title, description=excluded.description, \
451 tags=excluded.tags, \
452 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
453 task_count=excluded.task_count, \
454 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
455 total_input_tokens=excluded.total_input_tokens, \
456 total_output_tokens=excluded.total_output_tokens, \
457 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
458 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
459 working_directory=excluded.working_directory, \
460 files_modified=excluded.files_modified, files_read=excluded.files_read, \
461 has_errors=excluded.has_errors, \
462 max_active_agents=excluded.max_active_agents",
463 params![
464 &session.session_id,
465 &session.agent.tool,
466 &session.agent.provider,
467 &session.agent.model,
468 title,
469 description,
470 &tags,
471 &created_at,
472 session.stats.message_count as i64,
473 session.stats.user_message_count as i64,
474 session.stats.task_count as i64,
475 session.stats.event_count as i64,
476 session.stats.duration_seconds as i64,
477 session.stats.total_input_tokens as i64,
478 session.stats.total_output_tokens as i64,
479 &git.remote,
480 &git.branch,
481 &git.commit,
482 &git.repo_name,
483 &cwd,
484 &files_modified,
485 &files_read,
486 has_errors,
487 max_active_agents,
488 ],
489 )?;
490
491 conn.execute(
492 "INSERT INTO session_sync (session_id, source_path, sync_status) \
493 VALUES (?1, ?2, 'local_only') \
494 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
495 params![&session.session_id, source_path],
496 )?;
497 Ok(())
498 }
499
500 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
503 let conn = self.conn();
504 conn.execute(
507 "INSERT INTO sessions \
508 (id, user_id, team_id, tool, agent_provider, agent_model, \
509 title, description, tags, created_at, uploaded_at, \
510 message_count, task_count, event_count, duration_seconds, \
511 total_input_tokens, total_output_tokens, body_storage_key, \
512 git_remote, git_branch, git_commit, git_repo_name, \
513 pr_number, pr_url, working_directory, \
514 files_modified, files_read, has_errors, max_active_agents) \
515 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
516 ON CONFLICT(id) DO UPDATE SET \
517 title=excluded.title, description=excluded.description, \
518 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
519 message_count=excluded.message_count, task_count=excluded.task_count, \
520 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
521 total_input_tokens=excluded.total_input_tokens, \
522 total_output_tokens=excluded.total_output_tokens, \
523 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
524 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
525 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
526 working_directory=excluded.working_directory, \
527 files_modified=excluded.files_modified, files_read=excluded.files_read, \
528 has_errors=excluded.has_errors, \
529 max_active_agents=excluded.max_active_agents",
530 params![
531 &summary.id,
532 &summary.user_id,
533 &summary.team_id,
534 &summary.tool,
535 &summary.agent_provider,
536 &summary.agent_model,
537 &summary.title,
538 &summary.description,
539 &summary.tags,
540 &summary.created_at,
541 &summary.uploaded_at,
542 summary.message_count,
543 summary.task_count,
544 summary.event_count,
545 summary.duration_seconds,
546 summary.total_input_tokens,
547 summary.total_output_tokens,
548 &summary.git_remote,
549 &summary.git_branch,
550 &summary.git_commit,
551 &summary.git_repo_name,
552 summary.pr_number,
553 &summary.pr_url,
554 &summary.working_directory,
555 &summary.files_modified,
556 &summary.files_read,
557 summary.has_errors,
558 summary.max_active_agents,
559 ],
560 )?;
561
562 conn.execute(
563 "INSERT INTO session_sync (session_id, sync_status) \
564 VALUES (?1, 'remote_only') \
565 ON CONFLICT(session_id) DO UPDATE SET \
566 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
567 params![&summary.id],
568 )?;
569 Ok(())
570 }
571
572 fn build_local_session_where_clause(
575 filter: &LocalSessionFilter,
576 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
577 let mut where_clauses = vec![
578 "1=1".to_string(),
579 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
580 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
581 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
582 ];
583 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
584 let mut idx = 1u32;
585
586 if let Some(ref team_id) = filter.team_id {
587 where_clauses.push(format!("s.team_id = ?{idx}"));
588 param_values.push(Box::new(team_id.clone()));
589 idx += 1;
590 }
591
592 if let Some(ref sync_status) = filter.sync_status {
593 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
594 param_values.push(Box::new(sync_status.clone()));
595 idx += 1;
596 }
597
598 if let Some(ref repo) = filter.git_repo_name {
599 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
600 param_values.push(Box::new(repo.clone()));
601 idx += 1;
602 }
603
604 if let Some(ref tool) = filter.tool {
605 where_clauses.push(format!("s.tool = ?{idx}"));
606 param_values.push(Box::new(tool.clone()));
607 idx += 1;
608 }
609
610 if let Some(ref search) = filter.search {
611 let like = format!("%{search}%");
612 where_clauses.push(format!(
613 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
614 i1 = idx,
615 i2 = idx + 1,
616 i3 = idx + 2,
617 ));
618 param_values.push(Box::new(like.clone()));
619 param_values.push(Box::new(like.clone()));
620 param_values.push(Box::new(like));
621 idx += 3;
622 }
623
624 let interval = match filter.time_range {
625 LocalTimeRange::Hours24 => Some("-1 day"),
626 LocalTimeRange::Days7 => Some("-7 days"),
627 LocalTimeRange::Days30 => Some("-30 days"),
628 LocalTimeRange::All => None,
629 };
630 if let Some(interval) = interval {
631 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
632 param_values.push(Box::new(interval.to_string()));
633 }
634
635 (where_clauses.join(" AND "), param_values)
636 }
637
638 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
639 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
640 let order_clause = match filter.sort {
641 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
642 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
643 LocalSortOrder::Recent => "s.created_at DESC",
644 };
645
646 let mut sql = format!(
647 "SELECT {LOCAL_SESSION_COLUMNS} \
648 {FROM_CLAUSE} WHERE {where_str} \
649 ORDER BY {order_clause}"
650 );
651
652 if let Some(limit) = filter.limit {
653 sql.push_str(" LIMIT ?");
654 param_values.push(Box::new(limit));
655 if let Some(offset) = filter.offset {
656 sql.push_str(" OFFSET ?");
657 param_values.push(Box::new(offset));
658 }
659 }
660
661 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
662 param_values.iter().map(|p| p.as_ref()).collect();
663 let conn = self.conn();
664 let mut stmt = conn.prepare(&sql)?;
665 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
666
667 let mut result = Vec::new();
668 for row in rows {
669 result.push(row?);
670 }
671
672 Ok(hide_opencode_child_sessions(result))
673 }
674
675 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
677 let mut count_filter = filter.clone();
678 count_filter.limit = None;
679 count_filter.offset = None;
680 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
681 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
682 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
683 param_values.iter().map(|p| p.as_ref()).collect();
684 let conn = self.conn();
685 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
686 Ok(count)
687 }
688
689 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
691 let mut tool_filter = filter.clone();
692 tool_filter.tool = None;
693 tool_filter.limit = None;
694 tool_filter.offset = None;
695 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
696 let sql = format!(
697 "SELECT DISTINCT s.tool \
698 {FROM_CLAUSE} WHERE {where_str} \
699 ORDER BY s.tool ASC"
700 );
701 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
702 param_values.iter().map(|p| p.as_ref()).collect();
703 let conn = self.conn();
704 let mut stmt = conn.prepare(&sql)?;
705 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
706
707 let mut tools = Vec::new();
708 for row in rows {
709 let tool = row?;
710 if !tool.trim().is_empty() {
711 tools.push(tool);
712 }
713 }
714 Ok(tools)
715 }
716
717 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
721 let mut where_clauses = vec!["1=1".to_string()];
722 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
723 let mut idx = 1u32;
724
725 if let Some(ref tool) = filter.tool {
726 where_clauses.push(format!("s.tool = ?{idx}"));
727 param_values.push(Box::new(tool.clone()));
728 idx += 1;
729 }
730
731 if let Some(ref model) = filter.model {
732 let like = model.replace('*', "%");
733 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
734 param_values.push(Box::new(like));
735 idx += 1;
736 }
737
738 if let Some(ref since) = filter.since {
739 where_clauses.push(format!("s.created_at >= ?{idx}"));
740 param_values.push(Box::new(since.clone()));
741 idx += 1;
742 }
743
744 if let Some(ref before) = filter.before {
745 where_clauses.push(format!("s.created_at < ?{idx}"));
746 param_values.push(Box::new(before.clone()));
747 idx += 1;
748 }
749
750 if let Some(ref touches) = filter.touches {
751 let like = format!("%\"{touches}\"%");
752 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
753 param_values.push(Box::new(like));
754 idx += 1;
755 }
756
757 if let Some(ref grep) = filter.grep {
758 let like = format!("%{grep}%");
759 where_clauses.push(format!(
760 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
761 i1 = idx,
762 i2 = idx + 1,
763 i3 = idx + 2,
764 ));
765 param_values.push(Box::new(like.clone()));
766 param_values.push(Box::new(like.clone()));
767 param_values.push(Box::new(like));
768 idx += 3;
769 }
770
771 if let Some(true) = filter.has_errors {
772 where_clauses.push("s.has_errors = 1".to_string());
773 }
774
775 if let Some(ref wd) = filter.working_directory {
776 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
777 param_values.push(Box::new(format!("{wd}%")));
778 idx += 1;
779 }
780
781 if let Some(ref repo) = filter.git_repo_name {
782 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
783 param_values.push(Box::new(repo.clone()));
784 idx += 1;
785 }
786
787 let mut extra_join = String::new();
789 if let Some(ref commit) = filter.commit {
790 extra_join =
791 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
792 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
793 param_values.push(Box::new(commit.clone()));
794 idx += 1;
795 }
796
797 let _ = idx; let where_str = where_clauses.join(" AND ");
800 let mut sql = format!(
801 "SELECT {LOCAL_SESSION_COLUMNS} \
802 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
803 ORDER BY s.created_at DESC"
804 );
805
806 if let Some(limit) = filter.limit {
807 sql.push_str(" LIMIT ?");
808 param_values.push(Box::new(limit));
809 if let Some(offset) = filter.offset {
810 sql.push_str(" OFFSET ?");
811 param_values.push(Box::new(offset));
812 }
813 }
814
815 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
816 param_values.iter().map(|p| p.as_ref()).collect();
817 let conn = self.conn();
818 let mut stmt = conn.prepare(&sql)?;
819 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
820
821 let mut result = Vec::new();
822 for row in rows {
823 result.push(row?);
824 }
825 Ok(hide_opencode_child_sessions(result))
826 }
827
828 pub fn get_sessions_by_tool_latest(
830 &self,
831 tool: &str,
832 count: u32,
833 ) -> Result<Vec<LocalSessionRow>> {
834 let sql = format!(
835 "SELECT {LOCAL_SESSION_COLUMNS} \
836 {FROM_CLAUSE} WHERE s.tool = ?1 \
837 ORDER BY s.created_at DESC"
838 );
839 let conn = self.conn();
840 let mut stmt = conn.prepare(&sql)?;
841 let rows = stmt.query_map(params![tool], row_to_local_session)?;
842 let mut result = Vec::new();
843 for row in rows {
844 result.push(row?);
845 }
846
847 let mut filtered = hide_opencode_child_sessions(result);
848 filtered.truncate(count as usize);
849 Ok(filtered)
850 }
851
852 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
854 let sql = format!(
855 "SELECT {LOCAL_SESSION_COLUMNS} \
856 {FROM_CLAUSE} \
857 ORDER BY s.created_at DESC"
858 );
859 let conn = self.conn();
860 let mut stmt = conn.prepare(&sql)?;
861 let rows = stmt.query_map([], row_to_local_session)?;
862 let mut result = Vec::new();
863 for row in rows {
864 result.push(row?);
865 }
866
867 let mut filtered = hide_opencode_child_sessions(result);
868 filtered.truncate(count as usize);
869 Ok(filtered)
870 }
871
872 pub fn get_session_by_tool_offset(
874 &self,
875 tool: &str,
876 offset: u32,
877 ) -> Result<Option<LocalSessionRow>> {
878 let sql = format!(
879 "SELECT {LOCAL_SESSION_COLUMNS} \
880 {FROM_CLAUSE} WHERE s.tool = ?1 \
881 ORDER BY s.created_at DESC"
882 );
883 let conn = self.conn();
884 let mut stmt = conn.prepare(&sql)?;
885 let rows = stmt.query_map(params![tool], row_to_local_session)?;
886 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
887 Ok(filtered.into_iter().nth(offset as usize))
888 }
889
890 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
892 let sql = format!(
893 "SELECT {LOCAL_SESSION_COLUMNS} \
894 {FROM_CLAUSE} \
895 ORDER BY s.created_at DESC"
896 );
897 let conn = self.conn();
898 let mut stmt = conn.prepare(&sql)?;
899 let rows = stmt.query_map([], row_to_local_session)?;
900 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
901 Ok(filtered.into_iter().nth(offset as usize))
902 }
903
904 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
906 let conn = self.conn();
907 let result = conn
908 .query_row(
909 "SELECT source_path FROM session_sync WHERE session_id = ?1",
910 params![session_id],
911 |row| row.get(0),
912 )
913 .optional()?;
914
915 Ok(result)
916 }
917
918 pub fn session_count(&self) -> Result<i64> {
920 let count = self
921 .conn()
922 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
923 Ok(count)
924 }
925
926 pub fn delete_session(&self, session_id: &str) -> Result<()> {
929 let conn = self.conn();
930 conn.execute(
931 "DELETE FROM body_cache WHERE session_id = ?1",
932 params![session_id],
933 )?;
934 conn.execute(
935 "DELETE FROM session_sync WHERE session_id = ?1",
936 params![session_id],
937 )?;
938 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
939 Ok(())
940 }
941
942 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
945 let cursor = self
946 .conn()
947 .query_row(
948 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
949 params![team_id],
950 |row| row.get(0),
951 )
952 .optional()?;
953 Ok(cursor)
954 }
955
956 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
957 self.conn().execute(
958 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
959 VALUES (?1, ?2, datetime('now')) \
960 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
961 params![team_id, cursor],
962 )?;
963 Ok(())
964 }
965
966 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
970 let sql = format!(
971 "SELECT {LOCAL_SESSION_COLUMNS} \
972 FROM sessions s \
973 INNER JOIN session_sync ss ON ss.session_id = s.id \
974 LEFT JOIN users u ON u.id = s.user_id \
975 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
976 ORDER BY s.created_at ASC"
977 );
978 let conn = self.conn();
979 let mut stmt = conn.prepare(&sql)?;
980 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
981 let mut result = Vec::new();
982 for row in rows {
983 result.push(row?);
984 }
985 Ok(result)
986 }
987
988 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
989 self.conn().execute(
990 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
991 WHERE session_id = ?1",
992 params![session_id],
993 )?;
994 Ok(())
995 }
996
997 pub fn was_uploaded_after(
999 &self,
1000 source_path: &str,
1001 modified: &chrono::DateTime<chrono::Utc>,
1002 ) -> Result<bool> {
1003 let result: Option<String> = self
1004 .conn()
1005 .query_row(
1006 "SELECT last_synced_at FROM session_sync \
1007 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1008 params![source_path],
1009 |row| row.get(0),
1010 )
1011 .optional()?;
1012
1013 if let Some(synced_at) = result {
1014 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1015 return Ok(dt >= *modified);
1016 }
1017 }
1018 Ok(false)
1019 }
1020
1021 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1024 self.conn().execute(
1025 "INSERT INTO body_cache (session_id, body, cached_at) \
1026 VALUES (?1, ?2, datetime('now')) \
1027 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1028 params![session_id, body],
1029 )?;
1030 Ok(())
1031 }
1032
1033 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1034 let body = self
1035 .conn()
1036 .query_row(
1037 "SELECT body FROM body_cache WHERE session_id = ?1",
1038 params![session_id],
1039 |row| row.get(0),
1040 )
1041 .optional()?;
1042 Ok(body)
1043 }
1044
1045 pub fn upsert_timeline_summary_cache(
1048 &self,
1049 lookup_key: &str,
1050 namespace: &str,
1051 compact: &str,
1052 payload: &str,
1053 raw: &str,
1054 ) -> Result<()> {
1055 self.conn().execute(
1056 "INSERT INTO timeline_summary_cache \
1057 (lookup_key, namespace, compact, payload, raw, cached_at) \
1058 VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1059 ON CONFLICT(lookup_key) DO UPDATE SET \
1060 namespace = excluded.namespace, \
1061 compact = excluded.compact, \
1062 payload = excluded.payload, \
1063 raw = excluded.raw, \
1064 cached_at = datetime('now')",
1065 params![lookup_key, namespace, compact, payload, raw],
1066 )?;
1067 Ok(())
1068 }
1069
1070 pub fn list_timeline_summary_cache_by_namespace(
1071 &self,
1072 namespace: &str,
1073 ) -> Result<Vec<TimelineSummaryCacheRow>> {
1074 let conn = self.conn();
1075 let mut stmt = conn.prepare(
1076 "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1077 FROM timeline_summary_cache \
1078 WHERE namespace = ?1 \
1079 ORDER BY cached_at DESC",
1080 )?;
1081 let rows = stmt.query_map(params![namespace], |row| {
1082 Ok(TimelineSummaryCacheRow {
1083 lookup_key: row.get(0)?,
1084 namespace: row.get(1)?,
1085 compact: row.get(2)?,
1086 payload: row.get(3)?,
1087 raw: row.get(4)?,
1088 cached_at: row.get(5)?,
1089 })
1090 })?;
1091
1092 let mut out = Vec::new();
1093 for row in rows {
1094 out.push(row?);
1095 }
1096 Ok(out)
1097 }
1098
1099 pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1100 let affected = self
1101 .conn()
1102 .execute("DELETE FROM timeline_summary_cache", [])?;
1103 Ok(affected)
1104 }
1105
1106 pub fn migrate_from_state_json(
1111 &self,
1112 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1113 ) -> Result<usize> {
1114 let mut count = 0;
1115 for (path, uploaded_at) in uploaded {
1116 let exists: bool = self
1117 .conn()
1118 .query_row(
1119 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1120 params![path],
1121 |row| row.get(0),
1122 )
1123 .unwrap_or(false);
1124
1125 if exists {
1126 self.conn().execute(
1127 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1128 WHERE source_path = ?2 AND sync_status = 'local_only'",
1129 params![uploaded_at.to_rfc3339(), path],
1130 )?;
1131 count += 1;
1132 }
1133 }
1134 Ok(count)
1135 }
1136
1137 pub fn link_commit_session(
1141 &self,
1142 commit_hash: &str,
1143 session_id: &str,
1144 repo_path: Option<&str>,
1145 branch: Option<&str>,
1146 ) -> Result<()> {
1147 self.conn().execute(
1148 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1149 VALUES (?1, ?2, ?3, ?4) \
1150 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1151 params![commit_hash, session_id, repo_path, branch],
1152 )?;
1153 Ok(())
1154 }
1155
1156 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1158 let sql = format!(
1159 "SELECT {LOCAL_SESSION_COLUMNS} \
1160 {FROM_CLAUSE} \
1161 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1162 WHERE csl.commit_hash = ?1 \
1163 ORDER BY s.created_at DESC"
1164 );
1165 let conn = self.conn();
1166 let mut stmt = conn.prepare(&sql)?;
1167 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1168 let mut result = Vec::new();
1169 for row in rows {
1170 result.push(row?);
1171 }
1172 Ok(result)
1173 }
1174
1175 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1177 let conn = self.conn();
1178 let mut stmt = conn.prepare(
1179 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1180 FROM commit_session_links WHERE session_id = ?1 \
1181 ORDER BY created_at DESC",
1182 )?;
1183 let rows = stmt.query_map(params![session_id], |row| {
1184 Ok(CommitLink {
1185 commit_hash: row.get(0)?,
1186 session_id: row.get(1)?,
1187 repo_path: row.get(2)?,
1188 branch: row.get(3)?,
1189 created_at: row.get(4)?,
1190 })
1191 })?;
1192 let mut result = Vec::new();
1193 for row in rows {
1194 result.push(row?);
1195 }
1196 Ok(result)
1197 }
1198
1199 pub fn find_active_session_for_repo(
1203 &self,
1204 repo_path: &str,
1205 since_minutes: u32,
1206 ) -> Result<Option<LocalSessionRow>> {
1207 let sql = format!(
1208 "SELECT {LOCAL_SESSION_COLUMNS} \
1209 {FROM_CLAUSE} \
1210 WHERE s.working_directory LIKE ?1 \
1211 AND s.created_at >= datetime('now', ?2) \
1212 ORDER BY s.created_at DESC LIMIT 1"
1213 );
1214 let since = format!("-{since_minutes} minutes");
1215 let like = format!("{repo_path}%");
1216 let conn = self.conn();
1217 let mut stmt = conn.prepare(&sql)?;
1218 let row = stmt
1219 .query_map(params![like, since], row_to_local_session)?
1220 .next()
1221 .transpose()?;
1222 Ok(row)
1223 }
1224
1225 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1227 let conn = self.conn();
1228 let mut stmt = conn
1229 .prepare("SELECT id FROM sessions")
1230 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1231 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1232 let mut set = std::collections::HashSet::new();
1233 if let Ok(rows) = rows {
1234 for row in rows.flatten() {
1235 set.insert(row);
1236 }
1237 }
1238 set
1239 }
1240
1241 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1243 let title = session.context.title.as_deref();
1244 let description = session.context.description.as_deref();
1245 let (files_modified, files_read, has_errors) =
1246 opensession_core::extract::extract_file_metadata(session);
1247 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1248
1249 self.conn().execute(
1250 "UPDATE sessions SET \
1251 title=?2, description=?3, \
1252 message_count=?4, user_message_count=?5, task_count=?6, \
1253 event_count=?7, duration_seconds=?8, \
1254 total_input_tokens=?9, total_output_tokens=?10, \
1255 files_modified=?11, files_read=?12, has_errors=?13, \
1256 max_active_agents=?14 \
1257 WHERE id=?1",
1258 params![
1259 &session.session_id,
1260 title,
1261 description,
1262 session.stats.message_count as i64,
1263 session.stats.user_message_count as i64,
1264 session.stats.task_count as i64,
1265 session.stats.event_count as i64,
1266 session.stats.duration_seconds as i64,
1267 session.stats.total_input_tokens as i64,
1268 session.stats.total_output_tokens as i64,
1269 &files_modified,
1270 &files_read,
1271 has_errors,
1272 max_active_agents,
1273 ],
1274 )?;
1275 Ok(())
1276 }
1277
1278 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1280 self.conn().execute(
1281 "INSERT INTO session_sync (session_id, source_path) \
1282 VALUES (?1, ?2) \
1283 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1284 params![session_id, source_path],
1285 )?;
1286 Ok(())
1287 }
1288
1289 pub fn list_repos(&self) -> Result<Vec<String>> {
1291 let conn = self.conn();
1292 let mut stmt = conn.prepare(
1293 "SELECT DISTINCT git_repo_name FROM sessions \
1294 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1295 )?;
1296 let rows = stmt.query_map([], |row| row.get(0))?;
1297 let mut result = Vec::new();
1298 for row in rows {
1299 result.push(row?);
1300 }
1301 Ok(result)
1302 }
1303
1304 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1306 let conn = self.conn();
1307 let mut stmt = conn.prepare(
1308 "SELECT DISTINCT working_directory FROM sessions \
1309 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1310 ORDER BY working_directory ASC",
1311 )?;
1312 let rows = stmt.query_map([], |row| row.get(0))?;
1313 let mut result = Vec::new();
1314 for row in rows {
1315 result.push(row?);
1316 }
1317 Ok(result)
1318 }
1319}
1320
1321fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1324 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1325 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1326
1327 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1329
1330 apply_local_migrations(&conn)?;
1331
1332 ensure_sessions_columns(&conn)?;
1335 validate_local_schema(&conn)?;
1336
1337 Ok(conn)
1338}
1339
1340fn apply_local_migrations(conn: &Connection) -> Result<()> {
1341 conn.execute_batch(
1342 "CREATE TABLE IF NOT EXISTS _migrations (
1343 id INTEGER PRIMARY KEY,
1344 name TEXT NOT NULL UNIQUE,
1345 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1346 );",
1347 )
1348 .context("create _migrations table for local db")?;
1349
1350 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1351 let already_applied: bool = conn
1352 .query_row(
1353 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1354 [name],
1355 |row| row.get(0),
1356 )
1357 .unwrap_or(false);
1358
1359 if already_applied {
1360 continue;
1361 }
1362
1363 if let Err(e) = conn.execute_batch(sql) {
1364 let msg = e.to_string().to_ascii_lowercase();
1365 if !is_local_migration_compat_error(&msg) {
1366 return Err(e).with_context(|| format!("apply local migration {name}"));
1367 }
1368 }
1369
1370 conn.execute(
1371 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1372 [name],
1373 )
1374 .with_context(|| format!("record local migration {name}"))?;
1375 }
1376
1377 Ok(())
1378}
1379
1380fn is_local_migration_compat_error(msg: &str) -> bool {
1381 msg.contains("duplicate column name")
1382 || msg.contains("no such column")
1383 || msg.contains("already exists")
1384}
1385
1386fn validate_local_schema(conn: &Connection) -> Result<()> {
1387 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1388 conn.prepare(&sql)
1389 .map(|_| ())
1390 .context("validate local session schema")
1391}
1392
1393fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1394 let msg = format!("{err:#}").to_ascii_lowercase();
1395 msg.contains("no such column")
1396 || msg.contains("no such table")
1397 || msg.contains("cannot add a column")
1398 || msg.contains("already exists")
1399 || msg.contains("views may not be indexed")
1400 || msg.contains("malformed database schema")
1401 || msg.contains("duplicate column name")
1402}
1403
1404fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1405 if !path.exists() {
1406 return Ok(());
1407 }
1408
1409 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1410 let backup_name = format!(
1411 "{}.legacy-{}.bak",
1412 path.file_name()
1413 .and_then(|n| n.to_str())
1414 .unwrap_or("local.db"),
1415 ts
1416 );
1417 let backup_path = path.with_file_name(backup_name);
1418 std::fs::rename(path, &backup_path).with_context(|| {
1419 format!(
1420 "rotate legacy db {} -> {}",
1421 path.display(),
1422 backup_path.display()
1423 )
1424 })?;
1425
1426 let wal = PathBuf::from(format!("{}-wal", path.display()));
1427 let shm = PathBuf::from(format!("{}-shm", path.display()));
1428 let _ = std::fs::remove_file(wal);
1429 let _ = std::fs::remove_file(shm);
1430 Ok(())
1431}
1432
1433const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1434 ("user_id", "TEXT"),
1435 ("team_id", "TEXT DEFAULT 'personal'"),
1436 ("tool", "TEXT DEFAULT ''"),
1437 ("agent_provider", "TEXT"),
1438 ("agent_model", "TEXT"),
1439 ("title", "TEXT"),
1440 ("description", "TEXT"),
1441 ("tags", "TEXT"),
1442 ("created_at", "TEXT DEFAULT ''"),
1443 ("uploaded_at", "TEXT DEFAULT ''"),
1444 ("message_count", "INTEGER DEFAULT 0"),
1445 ("user_message_count", "INTEGER DEFAULT 0"),
1446 ("task_count", "INTEGER DEFAULT 0"),
1447 ("event_count", "INTEGER DEFAULT 0"),
1448 ("duration_seconds", "INTEGER DEFAULT 0"),
1449 ("total_input_tokens", "INTEGER DEFAULT 0"),
1450 ("total_output_tokens", "INTEGER DEFAULT 0"),
1451 ("body_storage_key", "TEXT DEFAULT ''"),
1453 ("body_url", "TEXT"),
1454 ("git_remote", "TEXT"),
1455 ("git_branch", "TEXT"),
1456 ("git_commit", "TEXT"),
1457 ("git_repo_name", "TEXT"),
1458 ("pr_number", "INTEGER"),
1459 ("pr_url", "TEXT"),
1460 ("working_directory", "TEXT"),
1461 ("files_modified", "TEXT"),
1462 ("files_read", "TEXT"),
1463 ("has_errors", "BOOLEAN DEFAULT 0"),
1464 ("max_active_agents", "INTEGER DEFAULT 1"),
1465];
1466
1467fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1468 let mut existing = HashSet::new();
1469 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1470 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1471 for row in rows {
1472 existing.insert(row?);
1473 }
1474
1475 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1476 if existing.contains(*name) {
1477 continue;
1478 }
1479 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1480 conn.execute_batch(&sql)
1481 .with_context(|| format!("add legacy sessions column '{name}'"))?;
1482 }
1483
1484 Ok(())
1485}
1486
1487pub const LOCAL_SESSION_COLUMNS: &str = "\
1489s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1490s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1491s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1492s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1493s.total_input_tokens, s.total_output_tokens, \
1494s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1495s.pr_number, s.pr_url, s.working_directory, \
1496s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1497
1498fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1499 Ok(LocalSessionRow {
1500 id: row.get(0)?,
1501 source_path: row.get(1)?,
1502 sync_status: row.get(2)?,
1503 last_synced_at: row.get(3)?,
1504 user_id: row.get(4)?,
1505 nickname: row.get(5)?,
1506 team_id: row.get(6)?,
1507 tool: row.get(7)?,
1508 agent_provider: row.get(8)?,
1509 agent_model: row.get(9)?,
1510 title: row.get(10)?,
1511 description: row.get(11)?,
1512 tags: row.get(12)?,
1513 created_at: row.get(13)?,
1514 uploaded_at: row.get(14)?,
1515 message_count: row.get(15)?,
1516 user_message_count: row.get(16)?,
1517 task_count: row.get(17)?,
1518 event_count: row.get(18)?,
1519 duration_seconds: row.get(19)?,
1520 total_input_tokens: row.get(20)?,
1521 total_output_tokens: row.get(21)?,
1522 git_remote: row.get(22)?,
1523 git_branch: row.get(23)?,
1524 git_commit: row.get(24)?,
1525 git_repo_name: row.get(25)?,
1526 pr_number: row.get(26)?,
1527 pr_url: row.get(27)?,
1528 working_directory: row.get(28)?,
1529 files_modified: row.get(29)?,
1530 files_read: row.get(30)?,
1531 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1532 max_active_agents: row.get(32).unwrap_or(1),
1533 })
1534}
1535
1536fn default_db_path() -> Result<PathBuf> {
1537 let home = std::env::var("HOME")
1538 .or_else(|_| std::env::var("USERPROFILE"))
1539 .context("Could not determine home directory")?;
1540 Ok(PathBuf::from(home)
1541 .join(".local")
1542 .join("share")
1543 .join("opensession")
1544 .join("local.db"))
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549 use super::*;
1550
1551 use std::collections::BTreeSet;
1552 use std::fs::{create_dir_all, write};
1553 use tempfile::tempdir;
1554
1555 fn test_db() -> LocalDb {
1556 let dir = tempdir().unwrap();
1557 let path = dir.keep().join("test.db");
1558 LocalDb::open_path(&path).unwrap()
1559 }
1560
1561 fn temp_root() -> tempfile::TempDir {
1562 tempdir().unwrap()
1563 }
1564
1565 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1566 LocalSessionRow {
1567 id: id.to_string(),
1568 source_path: source_path.map(String::from),
1569 sync_status: "local_only".to_string(),
1570 last_synced_at: None,
1571 user_id: None,
1572 nickname: None,
1573 team_id: None,
1574 tool: tool.to_string(),
1575 agent_provider: None,
1576 agent_model: None,
1577 title: Some("test".to_string()),
1578 description: None,
1579 tags: None,
1580 created_at: "2024-01-01T00:00:00Z".to_string(),
1581 uploaded_at: None,
1582 message_count: 0,
1583 user_message_count: 0,
1584 task_count: 0,
1585 event_count: 0,
1586 duration_seconds: 0,
1587 total_input_tokens: 0,
1588 total_output_tokens: 0,
1589 git_remote: None,
1590 git_branch: None,
1591 git_commit: None,
1592 git_repo_name: None,
1593 pr_number: None,
1594 pr_url: None,
1595 working_directory: None,
1596 files_modified: None,
1597 files_read: None,
1598 has_errors: false,
1599 max_active_agents: 1,
1600 }
1601 }
1602
1603 #[test]
1604 fn test_open_and_schema() {
1605 let _db = test_db();
1606 }
1607
1608 #[test]
1609 fn test_open_backfills_legacy_sessions_columns() {
1610 let dir = tempfile::tempdir().unwrap();
1611 let path = dir.path().join("legacy.db");
1612 {
1613 let conn = Connection::open(&path).unwrap();
1614 conn.execute_batch(
1615 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1616 INSERT INTO sessions (id) VALUES ('legacy-1');",
1617 )
1618 .unwrap();
1619 }
1620
1621 let db = LocalDb::open_path(&path).unwrap();
1622 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1623 assert_eq!(rows.len(), 1);
1624 assert_eq!(rows[0].id, "legacy-1");
1625 assert_eq!(rows[0].user_message_count, 0);
1626 }
1627
1628 #[test]
1629 fn test_open_rotates_incompatible_legacy_schema() {
1630 let dir = tempfile::tempdir().unwrap();
1631 let path = dir.path().join("broken.db");
1632 {
1633 let conn = Connection::open(&path).unwrap();
1634 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1635 .unwrap();
1636 }
1637
1638 let db = LocalDb::open_path(&path).unwrap();
1639 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1640 assert!(rows.is_empty());
1641
1642 let rotated = std::fs::read_dir(dir.path())
1643 .unwrap()
1644 .filter_map(Result::ok)
1645 .any(|entry| {
1646 let name = entry.file_name();
1647 let name = name.to_string_lossy();
1648 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1649 });
1650 assert!(rotated, "expected rotated legacy backup file");
1651 }
1652
1653 #[test]
1654 fn test_is_opencode_child_session() {
1655 let root = temp_root();
1656 let dir = root.path().join("sessions");
1657 create_dir_all(&dir).unwrap();
1658 let parent_session = dir.join("parent.json");
1659 write(
1660 &parent_session,
1661 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1662 )
1663 .unwrap();
1664 let child_session = dir.join("child.json");
1665 write(
1666 &child_session,
1667 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1668 )
1669 .unwrap();
1670
1671 let parent = make_row(
1672 "ses_parent",
1673 "opencode",
1674 Some(parent_session.to_str().unwrap()),
1675 );
1676 let child = make_row(
1677 "ses_child",
1678 "opencode",
1679 Some(child_session.to_str().unwrap()),
1680 );
1681 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1682
1683 assert!(!is_opencode_child_session(&parent));
1684 assert!(is_opencode_child_session(&child));
1685 assert!(!is_opencode_child_session(&codex));
1686 }
1687
1688 #[test]
1689 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1690 let child = LocalSessionRow {
1691 id: "sess_child".to_string(),
1692 source_path: None,
1693 sync_status: "local_only".to_string(),
1694 last_synced_at: None,
1695 user_id: None,
1696 nickname: None,
1697 team_id: None,
1698 tool: "opencode".to_string(),
1699 agent_provider: None,
1700 agent_model: None,
1701 title: None,
1702 description: None,
1703 tags: None,
1704 created_at: "2024-01-01T00:00:00Z".to_string(),
1705 uploaded_at: None,
1706 message_count: 1,
1707 user_message_count: 0,
1708 task_count: 4,
1709 event_count: 4,
1710 duration_seconds: 0,
1711 total_input_tokens: 0,
1712 total_output_tokens: 0,
1713 git_remote: None,
1714 git_branch: None,
1715 git_commit: None,
1716 git_repo_name: None,
1717 pr_number: None,
1718 pr_url: None,
1719 working_directory: None,
1720 files_modified: None,
1721 files_read: None,
1722 has_errors: false,
1723 max_active_agents: 1,
1724 };
1725
1726 let parent = LocalSessionRow {
1727 id: "sess_parent".to_string(),
1728 source_path: None,
1729 sync_status: "local_only".to_string(),
1730 last_synced_at: None,
1731 user_id: None,
1732 nickname: None,
1733 team_id: None,
1734 tool: "opencode".to_string(),
1735 agent_provider: None,
1736 agent_model: None,
1737 title: Some("regular".to_string()),
1738 description: None,
1739 tags: None,
1740 created_at: "2024-01-01T00:00:00Z".to_string(),
1741 uploaded_at: None,
1742 message_count: 1,
1743 user_message_count: 1,
1744 task_count: 2,
1745 event_count: 20,
1746 duration_seconds: 0,
1747 total_input_tokens: 0,
1748 total_output_tokens: 0,
1749 git_remote: None,
1750 git_branch: None,
1751 git_commit: None,
1752 git_repo_name: None,
1753 pr_number: None,
1754 pr_url: None,
1755 working_directory: None,
1756 files_modified: None,
1757 files_read: None,
1758 has_errors: false,
1759 max_active_agents: 1,
1760 };
1761
1762 assert!(is_opencode_child_session(&child));
1763 assert!(!is_opencode_child_session(&parent));
1764 }
1765
1766 #[test]
1767 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1768 let child = LocalSessionRow {
1769 id: "sess_child_2".to_string(),
1770 source_path: None,
1771 sync_status: "local_only".to_string(),
1772 last_synced_at: None,
1773 user_id: None,
1774 nickname: None,
1775 team_id: None,
1776 tool: "opencode".to_string(),
1777 agent_provider: None,
1778 agent_model: None,
1779 title: None,
1780 description: None,
1781 tags: None,
1782 created_at: "2024-01-01T00:00:00Z".to_string(),
1783 uploaded_at: None,
1784 message_count: 2,
1785 user_message_count: 0,
1786 task_count: 4,
1787 event_count: 4,
1788 duration_seconds: 0,
1789 total_input_tokens: 0,
1790 total_output_tokens: 0,
1791 git_remote: None,
1792 git_branch: None,
1793 git_commit: None,
1794 git_repo_name: None,
1795 pr_number: None,
1796 pr_url: None,
1797 working_directory: None,
1798 files_modified: None,
1799 files_read: None,
1800 has_errors: false,
1801 max_active_agents: 1,
1802 };
1803
1804 let parent = LocalSessionRow {
1805 id: "sess_parent".to_string(),
1806 source_path: None,
1807 sync_status: "local_only".to_string(),
1808 last_synced_at: None,
1809 user_id: None,
1810 nickname: None,
1811 team_id: None,
1812 tool: "opencode".to_string(),
1813 agent_provider: None,
1814 agent_model: None,
1815 title: Some("regular".to_string()),
1816 description: None,
1817 tags: None,
1818 created_at: "2024-01-01T00:00:00Z".to_string(),
1819 uploaded_at: None,
1820 message_count: 2,
1821 user_message_count: 1,
1822 task_count: 5,
1823 event_count: 20,
1824 duration_seconds: 0,
1825 total_input_tokens: 0,
1826 total_output_tokens: 0,
1827 git_remote: None,
1828 git_branch: None,
1829 git_commit: None,
1830 git_repo_name: None,
1831 pr_number: None,
1832 pr_url: None,
1833 working_directory: None,
1834 files_modified: None,
1835 files_read: None,
1836 has_errors: false,
1837 max_active_agents: 1,
1838 };
1839
1840 assert!(is_opencode_child_session(&child));
1841 assert!(!is_opencode_child_session(&parent));
1842 }
1843
1844 #[test]
1845 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1846 let child = LocalSessionRow {
1847 id: "sess_child_3".to_string(),
1848 source_path: None,
1849 sync_status: "local_only".to_string(),
1850 last_synced_at: None,
1851 user_id: None,
1852 nickname: None,
1853 team_id: None,
1854 tool: "opencode".to_string(),
1855 agent_provider: None,
1856 agent_model: None,
1857 title: None,
1858 description: None,
1859 tags: None,
1860 created_at: "2024-01-01T00:00:00Z".to_string(),
1861 uploaded_at: None,
1862 message_count: 3,
1863 user_message_count: 0,
1864 task_count: 2,
1865 event_count: 6,
1866 duration_seconds: 0,
1867 total_input_tokens: 0,
1868 total_output_tokens: 0,
1869 git_remote: None,
1870 git_branch: None,
1871 git_commit: None,
1872 git_repo_name: None,
1873 pr_number: None,
1874 pr_url: None,
1875 working_directory: None,
1876 files_modified: None,
1877 files_read: None,
1878 has_errors: false,
1879 max_active_agents: 1,
1880 };
1881
1882 assert!(is_opencode_child_session(&child));
1883 }
1884
1885 #[test]
1886 fn test_parse_opencode_parent_session_id_aliases() {
1887 let root = temp_root();
1888 let dir = root.path().join("session-aliases");
1889 create_dir_all(&dir).unwrap();
1890 let child_session = dir.join("child.json");
1891 write(
1892 &child_session,
1893 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1894 )
1895 .unwrap();
1896 assert_eq!(
1897 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1898 Some("ses_parent")
1899 );
1900 }
1901
1902 #[test]
1903 fn test_parse_opencode_parent_session_id_nested_metadata() {
1904 let root = temp_root();
1905 let dir = root.path().join("session-nested");
1906 create_dir_all(&dir).unwrap();
1907 let child_session = dir.join("child.json");
1908 write(
1909 &child_session,
1910 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1911 )
1912 .unwrap();
1913 assert_eq!(
1914 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1915 Some("ses_parent")
1916 );
1917 }
1918
1919 #[test]
1920 fn test_is_claude_subagent_session() {
1921 let row = make_row(
1922 "ses_parent",
1923 "claude-code",
1924 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1925 );
1926 assert!(!is_opencode_child_session(&row));
1927 assert!(is_claude_subagent_session(&row));
1928 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1929 }
1930
1931 #[test]
1932 fn test_hide_opencode_child_sessions() {
1933 let root = temp_root();
1934 let dir = root.path().join("sessions");
1935 create_dir_all(&dir).unwrap();
1936 let parent_session = dir.join("parent.json");
1937 let child_session = dir.join("child.json");
1938 let orphan_session = dir.join("orphan.json");
1939
1940 write(
1941 &parent_session,
1942 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1943 )
1944 .unwrap();
1945 write(
1946 &child_session,
1947 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1948 )
1949 .unwrap();
1950 write(
1951 &orphan_session,
1952 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1953 )
1954 .unwrap();
1955
1956 let rows = vec![
1957 make_row(
1958 "ses_child",
1959 "opencode",
1960 Some(child_session.to_str().unwrap()),
1961 ),
1962 make_row(
1963 "ses_parent",
1964 "opencode",
1965 Some(parent_session.to_str().unwrap()),
1966 ),
1967 {
1968 let mut row = make_row("ses_other", "codex", None);
1969 row.user_message_count = 1;
1970 row
1971 },
1972 make_row(
1973 "ses_orphan",
1974 "opencode",
1975 Some(orphan_session.to_str().unwrap()),
1976 ),
1977 ];
1978
1979 let filtered = hide_opencode_child_sessions(rows);
1980 assert_eq!(filtered.len(), 3);
1981 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1982 }
1983
1984 #[test]
1985 fn test_sync_cursor() {
1986 let db = test_db();
1987 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1988 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1989 assert_eq!(
1990 db.get_sync_cursor("team1").unwrap(),
1991 Some("2024-01-01T00:00:00Z".to_string())
1992 );
1993 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1995 assert_eq!(
1996 db.get_sync_cursor("team1").unwrap(),
1997 Some("2024-06-01T00:00:00Z".to_string())
1998 );
1999 }
2000
2001 #[test]
2002 fn test_body_cache() {
2003 let db = test_db();
2004 assert_eq!(db.get_cached_body("s1").unwrap(), None);
2005 db.cache_body("s1", b"hello world").unwrap();
2006 assert_eq!(
2007 db.get_cached_body("s1").unwrap(),
2008 Some(b"hello world".to_vec())
2009 );
2010 }
2011
2012 #[test]
2013 fn test_timeline_summary_cache_roundtrip() {
2014 let db = test_db();
2015 db.upsert_timeline_summary_cache(
2016 "k1",
2017 "timeline:v1",
2018 "compact text",
2019 "{\"kind\":\"turn-summary\"}",
2020 "raw text",
2021 )
2022 .unwrap();
2023
2024 let rows = db
2025 .list_timeline_summary_cache_by_namespace("timeline:v1")
2026 .unwrap();
2027 assert_eq!(rows.len(), 1);
2028 assert_eq!(rows[0].lookup_key, "k1");
2029 assert_eq!(rows[0].namespace, "timeline:v1");
2030 assert_eq!(rows[0].compact, "compact text");
2031 assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2032 assert_eq!(rows[0].raw, "raw text");
2033
2034 let cleared = db.clear_timeline_summary_cache().unwrap();
2035 assert_eq!(cleared, 1);
2036 let rows_after = db
2037 .list_timeline_summary_cache_by_namespace("timeline:v1")
2038 .unwrap();
2039 assert!(rows_after.is_empty());
2040 }
2041
2042 #[test]
2043 fn test_local_migrations_include_timeline_summary_cache() {
2044 let db = test_db();
2045 let conn = db.conn();
2046 let applied: bool = conn
2047 .query_row(
2048 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2049 params!["local_0003_timeline_summary_cache"],
2050 |row| row.get(0),
2051 )
2052 .unwrap();
2053 assert!(applied);
2054 }
2055
2056 #[test]
2057 fn test_local_migration_files_match_api_local_migrations() {
2058 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2059 std::fs::read_dir(dir)
2060 .expect("read migrations directory")
2061 .filter_map(Result::ok)
2062 .map(|entry| entry.file_name().to_string_lossy().to_string())
2063 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2064 .collect()
2065 }
2066
2067 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2068 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2069 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2070
2071 assert_eq!(
2072 local_files, api_files,
2073 "local-db local migrations must stay in parity with api local migrations"
2074 );
2075 }
2076
2077 #[test]
2078 fn test_upsert_remote_session() {
2079 let db = test_db();
2080 let summary = RemoteSessionSummary {
2081 id: "remote-1".to_string(),
2082 user_id: Some("u1".to_string()),
2083 nickname: Some("alice".to_string()),
2084 team_id: "t1".to_string(),
2085 tool: "claude-code".to_string(),
2086 agent_provider: None,
2087 agent_model: None,
2088 title: Some("Test session".to_string()),
2089 description: None,
2090 tags: None,
2091 created_at: "2024-01-01T00:00:00Z".to_string(),
2092 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2093 message_count: 10,
2094 task_count: 2,
2095 event_count: 20,
2096 duration_seconds: 300,
2097 total_input_tokens: 1000,
2098 total_output_tokens: 500,
2099 git_remote: None,
2100 git_branch: None,
2101 git_commit: None,
2102 git_repo_name: None,
2103 pr_number: None,
2104 pr_url: None,
2105 working_directory: None,
2106 files_modified: None,
2107 files_read: None,
2108 has_errors: false,
2109 max_active_agents: 1,
2110 };
2111 db.upsert_remote_session(&summary).unwrap();
2112
2113 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2114 assert_eq!(sessions.len(), 1);
2115 assert_eq!(sessions[0].id, "remote-1");
2116 assert_eq!(sessions[0].sync_status, "remote_only");
2117 assert_eq!(sessions[0].nickname, None); }
2119
2120 #[test]
2121 fn test_list_filter_by_repo() {
2122 let db = test_db();
2123 let summary1 = RemoteSessionSummary {
2125 id: "s1".to_string(),
2126 user_id: None,
2127 nickname: None,
2128 team_id: "t1".to_string(),
2129 tool: "claude-code".to_string(),
2130 agent_provider: None,
2131 agent_model: None,
2132 title: Some("Session 1".to_string()),
2133 description: None,
2134 tags: None,
2135 created_at: "2024-01-01T00:00:00Z".to_string(),
2136 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2137 message_count: 5,
2138 task_count: 0,
2139 event_count: 10,
2140 duration_seconds: 60,
2141 total_input_tokens: 100,
2142 total_output_tokens: 50,
2143 git_remote: None,
2144 git_branch: None,
2145 git_commit: None,
2146 git_repo_name: None,
2147 pr_number: None,
2148 pr_url: None,
2149 working_directory: None,
2150 files_modified: None,
2151 files_read: None,
2152 has_errors: false,
2153 max_active_agents: 1,
2154 };
2155 db.upsert_remote_session(&summary1).unwrap();
2156
2157 let filter = LocalSessionFilter {
2159 team_id: Some("t1".to_string()),
2160 ..Default::default()
2161 };
2162 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2163
2164 let filter = LocalSessionFilter {
2165 team_id: Some("t999".to_string()),
2166 ..Default::default()
2167 };
2168 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2169 }
2170
2171 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2174 RemoteSessionSummary {
2175 id: id.to_string(),
2176 user_id: None,
2177 nickname: None,
2178 team_id: "t1".to_string(),
2179 tool: tool.to_string(),
2180 agent_provider: Some("anthropic".to_string()),
2181 agent_model: Some("claude-opus-4-6".to_string()),
2182 title: Some(title.to_string()),
2183 description: None,
2184 tags: None,
2185 created_at: created_at.to_string(),
2186 uploaded_at: created_at.to_string(),
2187 message_count: 5,
2188 task_count: 1,
2189 event_count: 10,
2190 duration_seconds: 300,
2191 total_input_tokens: 1000,
2192 total_output_tokens: 500,
2193 git_remote: None,
2194 git_branch: None,
2195 git_commit: None,
2196 git_repo_name: None,
2197 pr_number: None,
2198 pr_url: None,
2199 working_directory: None,
2200 files_modified: None,
2201 files_read: None,
2202 has_errors: false,
2203 max_active_agents: 1,
2204 }
2205 }
2206
2207 fn seed_sessions(db: &LocalDb) {
2208 db.upsert_remote_session(&make_summary(
2210 "s1",
2211 "claude-code",
2212 "First session",
2213 "2024-01-01T00:00:00Z",
2214 ))
2215 .unwrap();
2216 db.upsert_remote_session(&make_summary(
2217 "s2",
2218 "claude-code",
2219 "JWT auth work",
2220 "2024-01-02T00:00:00Z",
2221 ))
2222 .unwrap();
2223 db.upsert_remote_session(&make_summary(
2224 "s3",
2225 "gemini",
2226 "Gemini test",
2227 "2024-01-03T00:00:00Z",
2228 ))
2229 .unwrap();
2230 db.upsert_remote_session(&make_summary(
2231 "s4",
2232 "claude-code",
2233 "Error handling",
2234 "2024-01-04T00:00:00Z",
2235 ))
2236 .unwrap();
2237 db.upsert_remote_session(&make_summary(
2238 "s5",
2239 "claude-code",
2240 "Final polish",
2241 "2024-01-05T00:00:00Z",
2242 ))
2243 .unwrap();
2244 }
2245
2246 #[test]
2249 fn test_log_no_filters() {
2250 let db = test_db();
2251 seed_sessions(&db);
2252 let filter = LogFilter::default();
2253 let results = db.list_sessions_log(&filter).unwrap();
2254 assert_eq!(results.len(), 5);
2255 assert_eq!(results[0].id, "s5");
2257 assert_eq!(results[4].id, "s1");
2258 }
2259
2260 #[test]
2261 fn test_log_filter_by_tool() {
2262 let db = test_db();
2263 seed_sessions(&db);
2264 let filter = LogFilter {
2265 tool: Some("claude-code".to_string()),
2266 ..Default::default()
2267 };
2268 let results = db.list_sessions_log(&filter).unwrap();
2269 assert_eq!(results.len(), 4);
2270 assert!(results.iter().all(|s| s.tool == "claude-code"));
2271 }
2272
2273 #[test]
2274 fn test_log_filter_by_model_wildcard() {
2275 let db = test_db();
2276 seed_sessions(&db);
2277 let filter = LogFilter {
2278 model: Some("claude*".to_string()),
2279 ..Default::default()
2280 };
2281 let results = db.list_sessions_log(&filter).unwrap();
2282 assert_eq!(results.len(), 5); }
2284
2285 #[test]
2286 fn test_log_filter_since() {
2287 let db = test_db();
2288 seed_sessions(&db);
2289 let filter = LogFilter {
2290 since: Some("2024-01-03T00:00:00Z".to_string()),
2291 ..Default::default()
2292 };
2293 let results = db.list_sessions_log(&filter).unwrap();
2294 assert_eq!(results.len(), 3); }
2296
2297 #[test]
2298 fn test_log_filter_before() {
2299 let db = test_db();
2300 seed_sessions(&db);
2301 let filter = LogFilter {
2302 before: Some("2024-01-03T00:00:00Z".to_string()),
2303 ..Default::default()
2304 };
2305 let results = db.list_sessions_log(&filter).unwrap();
2306 assert_eq!(results.len(), 2); }
2308
2309 #[test]
2310 fn test_log_filter_since_and_before() {
2311 let db = test_db();
2312 seed_sessions(&db);
2313 let filter = LogFilter {
2314 since: Some("2024-01-02T00:00:00Z".to_string()),
2315 before: Some("2024-01-04T00:00:00Z".to_string()),
2316 ..Default::default()
2317 };
2318 let results = db.list_sessions_log(&filter).unwrap();
2319 assert_eq!(results.len(), 2); }
2321
2322 #[test]
2323 fn test_log_filter_grep() {
2324 let db = test_db();
2325 seed_sessions(&db);
2326 let filter = LogFilter {
2327 grep: Some("JWT".to_string()),
2328 ..Default::default()
2329 };
2330 let results = db.list_sessions_log(&filter).unwrap();
2331 assert_eq!(results.len(), 1);
2332 assert_eq!(results[0].id, "s2");
2333 }
2334
2335 #[test]
2336 fn test_log_limit_and_offset() {
2337 let db = test_db();
2338 seed_sessions(&db);
2339 let filter = LogFilter {
2340 limit: Some(2),
2341 offset: Some(1),
2342 ..Default::default()
2343 };
2344 let results = db.list_sessions_log(&filter).unwrap();
2345 assert_eq!(results.len(), 2);
2346 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2348 }
2349
2350 #[test]
2351 fn test_log_limit_only() {
2352 let db = test_db();
2353 seed_sessions(&db);
2354 let filter = LogFilter {
2355 limit: Some(3),
2356 ..Default::default()
2357 };
2358 let results = db.list_sessions_log(&filter).unwrap();
2359 assert_eq!(results.len(), 3);
2360 }
2361
2362 #[test]
2363 fn test_list_sessions_limit_offset() {
2364 let db = test_db();
2365 seed_sessions(&db);
2366 let filter = LocalSessionFilter {
2367 limit: Some(2),
2368 offset: Some(1),
2369 ..Default::default()
2370 };
2371 let results = db.list_sessions(&filter).unwrap();
2372 assert_eq!(results.len(), 2);
2373 assert_eq!(results[0].id, "s4");
2374 assert_eq!(results[1].id, "s3");
2375 }
2376
2377 #[test]
2378 fn test_count_sessions_filtered() {
2379 let db = test_db();
2380 seed_sessions(&db);
2381 let count = db
2382 .count_sessions_filtered(&LocalSessionFilter::default())
2383 .unwrap();
2384 assert_eq!(count, 5);
2385 }
2386
2387 #[test]
2388 fn test_list_working_directories_distinct_non_empty() {
2389 let db = test_db();
2390
2391 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2392 a.working_directory = Some("/tmp/repo-a".to_string());
2393 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2394 b.working_directory = Some("/tmp/repo-a".to_string());
2395 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2396 c.working_directory = Some("/tmp/repo-b".to_string());
2397 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2398 d.working_directory = Some("".to_string());
2399
2400 db.upsert_remote_session(&a).unwrap();
2401 db.upsert_remote_session(&b).unwrap();
2402 db.upsert_remote_session(&c).unwrap();
2403 db.upsert_remote_session(&d).unwrap();
2404
2405 let dirs = db.list_working_directories().unwrap();
2406 assert_eq!(
2407 dirs,
2408 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2409 );
2410 }
2411
2412 #[test]
2413 fn test_list_session_tools() {
2414 let db = test_db();
2415 seed_sessions(&db);
2416 let tools = db
2417 .list_session_tools(&LocalSessionFilter::default())
2418 .unwrap();
2419 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2420 }
2421
2422 #[test]
2423 fn test_log_combined_filters() {
2424 let db = test_db();
2425 seed_sessions(&db);
2426 let filter = LogFilter {
2427 tool: Some("claude-code".to_string()),
2428 since: Some("2024-01-03T00:00:00Z".to_string()),
2429 limit: Some(1),
2430 ..Default::default()
2431 };
2432 let results = db.list_sessions_log(&filter).unwrap();
2433 assert_eq!(results.len(), 1);
2434 assert_eq!(results[0].id, "s5"); }
2436
2437 #[test]
2440 fn test_get_session_by_offset() {
2441 let db = test_db();
2442 seed_sessions(&db);
2443 let row = db.get_session_by_offset(0).unwrap().unwrap();
2444 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2446 assert_eq!(row.id, "s3");
2447 assert!(db.get_session_by_offset(10).unwrap().is_none());
2448 }
2449
2450 #[test]
2451 fn test_get_session_by_tool_offset() {
2452 let db = test_db();
2453 seed_sessions(&db);
2454 let row = db
2455 .get_session_by_tool_offset("claude-code", 0)
2456 .unwrap()
2457 .unwrap();
2458 assert_eq!(row.id, "s5");
2459 let row = db
2460 .get_session_by_tool_offset("claude-code", 1)
2461 .unwrap()
2462 .unwrap();
2463 assert_eq!(row.id, "s4");
2464 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2465 assert_eq!(row.id, "s3");
2466 assert!(db
2467 .get_session_by_tool_offset("gemini", 1)
2468 .unwrap()
2469 .is_none());
2470 }
2471
2472 #[test]
2473 fn test_get_sessions_latest() {
2474 let db = test_db();
2475 seed_sessions(&db);
2476 let rows = db.get_sessions_latest(3).unwrap();
2477 assert_eq!(rows.len(), 3);
2478 assert_eq!(rows[0].id, "s5");
2479 assert_eq!(rows[1].id, "s4");
2480 assert_eq!(rows[2].id, "s3");
2481 }
2482
2483 #[test]
2484 fn test_get_sessions_by_tool_latest() {
2485 let db = test_db();
2486 seed_sessions(&db);
2487 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2488 assert_eq!(rows.len(), 2);
2489 assert_eq!(rows[0].id, "s5");
2490 assert_eq!(rows[1].id, "s4");
2491 }
2492
2493 #[test]
2494 fn test_get_sessions_latest_more_than_available() {
2495 let db = test_db();
2496 seed_sessions(&db);
2497 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2498 assert_eq!(rows.len(), 1); }
2500
2501 #[test]
2502 fn test_session_count() {
2503 let db = test_db();
2504 assert_eq!(db.session_count().unwrap(), 0);
2505 seed_sessions(&db);
2506 assert_eq!(db.session_count().unwrap(), 5);
2507 }
2508
2509 #[test]
2512 fn test_link_commit_session() {
2513 let db = test_db();
2514 seed_sessions(&db);
2515 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2516 .unwrap();
2517
2518 let commits = db.get_commits_by_session("s1").unwrap();
2519 assert_eq!(commits.len(), 1);
2520 assert_eq!(commits[0].commit_hash, "abc123");
2521 assert_eq!(commits[0].session_id, "s1");
2522 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2523 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2524
2525 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2526 assert_eq!(sessions.len(), 1);
2527 assert_eq!(sessions[0].id, "s1");
2528 }
2529
2530 #[test]
2531 fn test_get_sessions_by_commit() {
2532 let db = test_db();
2533 seed_sessions(&db);
2534 db.link_commit_session("abc123", "s1", None, None).unwrap();
2536 db.link_commit_session("abc123", "s2", None, None).unwrap();
2537 db.link_commit_session("abc123", "s3", None, None).unwrap();
2538
2539 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2540 assert_eq!(sessions.len(), 3);
2541 assert_eq!(sessions[0].id, "s3");
2543 assert_eq!(sessions[1].id, "s2");
2544 assert_eq!(sessions[2].id, "s1");
2545 }
2546
2547 #[test]
2548 fn test_get_commits_by_session() {
2549 let db = test_db();
2550 seed_sessions(&db);
2551 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2553 .unwrap();
2554 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2555 .unwrap();
2556 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2557 .unwrap();
2558
2559 let commits = db.get_commits_by_session("s1").unwrap();
2560 assert_eq!(commits.len(), 3);
2561 assert!(commits.iter().all(|c| c.session_id == "s1"));
2563 }
2564
2565 #[test]
2566 fn test_duplicate_link_ignored() {
2567 let db = test_db();
2568 seed_sessions(&db);
2569 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2570 .unwrap();
2571 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2573 .unwrap();
2574
2575 let commits = db.get_commits_by_session("s1").unwrap();
2576 assert_eq!(commits.len(), 1);
2577 }
2578
2579 #[test]
2580 fn test_log_filter_by_commit() {
2581 let db = test_db();
2582 seed_sessions(&db);
2583 db.link_commit_session("abc123", "s2", None, None).unwrap();
2584 db.link_commit_session("abc123", "s4", None, None).unwrap();
2585
2586 let filter = LogFilter {
2587 commit: Some("abc123".to_string()),
2588 ..Default::default()
2589 };
2590 let results = db.list_sessions_log(&filter).unwrap();
2591 assert_eq!(results.len(), 2);
2592 assert_eq!(results[0].id, "s4");
2593 assert_eq!(results[1].id, "s2");
2594
2595 let filter = LogFilter {
2597 commit: Some("nonexistent".to_string()),
2598 ..Default::default()
2599 };
2600 let results = db.list_sessions_log(&filter).unwrap();
2601 assert_eq!(results.len(), 0);
2602 }
2603}