1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0002_team_invite_keys",
21 include_str!("../migrations/0002_team_invite_keys.sql"),
22 ),
23 (
24 "0003_max_active_agents",
25 include_str!("../migrations/0003_max_active_agents.sql"),
26 ),
27 (
28 "0004_oauth_states_provider",
29 include_str!("../migrations/0004_oauth_states_provider.sql"),
30 ),
31 (
32 "0005_sessions_body_url_backfill",
33 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34 ),
35 (
36 "0006_sessions_remove_fk_constraints",
37 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38 ),
39 (
40 "0007_sessions_list_perf_indexes",
41 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42 ),
43 (
44 "0008_teams_force_public",
45 include_str!("../migrations/0008_teams_force_public.sql"),
46 ),
47 (
48 "0009_session_score_plugin",
49 include_str!("../migrations/0009_session_score_plugin.sql"),
50 ),
51];
52
53const LOCAL_MIGRATIONS: &[Migration] = &[
54 (
55 "local_0001_schema",
56 include_str!("../migrations/local_0001_schema.sql"),
57 ),
58 (
59 "local_0002_drop_unused_local_sessions",
60 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
61 ),
62 (
63 "local_0003_timeline_summary_cache",
64 include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
65 ),
66];
67
68#[derive(Debug, Clone)]
70pub struct LocalSessionRow {
71 pub id: String,
72 pub source_path: Option<String>,
73 pub sync_status: String,
74 pub last_synced_at: Option<String>,
75 pub user_id: Option<String>,
76 pub nickname: Option<String>,
77 pub team_id: Option<String>,
78 pub tool: String,
79 pub agent_provider: Option<String>,
80 pub agent_model: Option<String>,
81 pub title: Option<String>,
82 pub description: Option<String>,
83 pub tags: Option<String>,
84 pub created_at: String,
85 pub uploaded_at: Option<String>,
86 pub message_count: i64,
87 pub user_message_count: i64,
88 pub task_count: i64,
89 pub event_count: i64,
90 pub duration_seconds: i64,
91 pub total_input_tokens: i64,
92 pub total_output_tokens: i64,
93 pub git_remote: Option<String>,
94 pub git_branch: Option<String>,
95 pub git_commit: Option<String>,
96 pub git_repo_name: Option<String>,
97 pub pr_number: Option<i64>,
98 pub pr_url: Option<String>,
99 pub working_directory: Option<String>,
100 pub files_modified: Option<String>,
101 pub files_read: Option<String>,
102 pub has_errors: bool,
103 pub max_active_agents: i64,
104}
105
106#[derive(Debug, Clone)]
108pub struct CommitLink {
109 pub commit_hash: String,
110 pub session_id: String,
111 pub repo_path: Option<String>,
112 pub branch: Option<String>,
113 pub created_at: String,
114}
115
116#[derive(Debug, Clone)]
118pub struct TimelineSummaryCacheRow {
119 pub lookup_key: String,
120 pub namespace: String,
121 pub compact: String,
122 pub payload: String,
123 pub raw: String,
124 pub cached_at: String,
125}
126
127pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
129 if row.tool != "opencode" {
130 return false;
131 }
132
133 if row.user_message_count <= 0
137 && row.message_count <= 4
138 && row.task_count <= 4
139 && row.event_count > 0
140 && row.event_count <= 16
141 {
142 return true;
143 }
144
145 if is_opencode_subagent_source(row.source_path.as_deref()) {
146 return true;
147 }
148
149 let source_path = match row.source_path.as_deref() {
150 Some(path) if !path.trim().is_empty() => path,
151 _ => return false,
152 };
153
154 parse_opencode_parent_session_id(source_path)
155 .is_some_and(|parent_id| !parent_id.trim().is_empty())
156}
157
158pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
160 let text = fs::read_to_string(source_path).ok()?;
161 let json: Value = serde_json::from_str(&text).ok()?;
162 lookup_parent_session_id(&json)
163}
164
165fn lookup_parent_session_id(value: &Value) -> Option<String> {
166 match value {
167 Value::Object(obj) => {
168 for (key, value) in obj {
169 if is_parent_id_key(key) {
170 if let Some(parent_id) = value.as_str() {
171 let parent_id = parent_id.trim();
172 if !parent_id.is_empty() {
173 return Some(parent_id.to_string());
174 }
175 }
176 }
177 if let Some(parent_id) = lookup_parent_session_id(value) {
178 return Some(parent_id);
179 }
180 }
181 None
182 }
183 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
184 _ => None,
185 }
186}
187
188fn is_parent_id_key(key: &str) -> bool {
189 let flat = key
190 .chars()
191 .filter(|c| c.is_ascii_alphanumeric())
192 .map(|c| c.to_ascii_lowercase())
193 .collect::<String>();
194
195 flat == "parentid"
196 || flat == "parentuuid"
197 || flat == "parentsessionid"
198 || flat == "parentsessionuuid"
199 || flat.ends_with("parentsessionid")
200 || (flat.contains("parent") && flat.ends_with("id"))
201 || (flat.contains("parent") && flat.ends_with("uuid"))
202}
203
204pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
206 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
207 rows
208}
209
210fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
211 is_subagent_source(source_path)
212}
213
214fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
215 if row.tool != "claude-code" {
216 return false;
217 }
218
219 is_subagent_source(row.source_path.as_deref())
220}
221
222fn is_subagent_source(source_path: Option<&str>) -> bool {
223 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
224 return false;
225 };
226
227 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
228 return true;
229 }
230
231 let filename = match std::path::Path::new(&source_path).file_name() {
232 Some(name) => name.to_string_lossy(),
233 None => return false,
234 };
235
236 filename.starts_with("agent-")
237 || filename.starts_with("agent_")
238 || filename.starts_with("subagent-")
239 || filename.starts_with("subagent_")
240}
241
242#[derive(Debug, Clone)]
244pub struct LocalSessionFilter {
245 pub team_id: Option<String>,
246 pub sync_status: Option<String>,
247 pub git_repo_name: Option<String>,
248 pub search: Option<String>,
249 pub tool: Option<String>,
250 pub sort: LocalSortOrder,
251 pub time_range: LocalTimeRange,
252 pub limit: Option<u32>,
253 pub offset: Option<u32>,
254}
255
256impl Default for LocalSessionFilter {
257 fn default() -> Self {
258 Self {
259 team_id: None,
260 sync_status: None,
261 git_repo_name: None,
262 search: None,
263 tool: None,
264 sort: LocalSortOrder::Recent,
265 time_range: LocalTimeRange::All,
266 limit: None,
267 offset: None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Default, PartialEq, Eq)]
274pub enum LocalSortOrder {
275 #[default]
276 Recent,
277 Popular,
278 Longest,
279}
280
281#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalTimeRange {
284 Hours24,
285 Days7,
286 Days30,
287 #[default]
288 All,
289}
290
291#[derive(Debug, Clone)]
293pub struct RemoteSessionSummary {
294 pub id: String,
295 pub user_id: Option<String>,
296 pub nickname: Option<String>,
297 pub team_id: String,
298 pub tool: String,
299 pub agent_provider: Option<String>,
300 pub agent_model: Option<String>,
301 pub title: Option<String>,
302 pub description: Option<String>,
303 pub tags: Option<String>,
304 pub created_at: String,
305 pub uploaded_at: String,
306 pub message_count: i64,
307 pub task_count: i64,
308 pub event_count: i64,
309 pub duration_seconds: i64,
310 pub total_input_tokens: i64,
311 pub total_output_tokens: i64,
312 pub git_remote: Option<String>,
313 pub git_branch: Option<String>,
314 pub git_commit: Option<String>,
315 pub git_repo_name: Option<String>,
316 pub pr_number: Option<i64>,
317 pub pr_url: Option<String>,
318 pub working_directory: Option<String>,
319 pub files_modified: Option<String>,
320 pub files_read: Option<String>,
321 pub has_errors: bool,
322 pub max_active_agents: i64,
323}
324
325#[derive(Debug, Default)]
327pub struct LogFilter {
328 pub tool: Option<String>,
330 pub model: Option<String>,
332 pub since: Option<String>,
334 pub before: Option<String>,
336 pub touches: Option<String>,
338 pub grep: Option<String>,
340 pub has_errors: Option<bool>,
342 pub working_directory: Option<String>,
344 pub git_repo_name: Option<String>,
346 pub commit: Option<String>,
348 pub limit: Option<u32>,
350 pub offset: Option<u32>,
352}
353
354const FROM_CLAUSE: &str = "\
356FROM sessions s \
357LEFT JOIN session_sync ss ON ss.session_id = s.id \
358LEFT JOIN users u ON u.id = s.user_id";
359
360pub struct LocalDb {
363 conn: Mutex<Connection>,
364}
365
366impl LocalDb {
367 pub fn open() -> Result<Self> {
370 let path = default_db_path()?;
371 Self::open_path(&path)
372 }
373
374 pub fn open_path(path: &PathBuf) -> Result<Self> {
376 if let Some(parent) = path.parent() {
377 std::fs::create_dir_all(parent)
378 .with_context(|| format!("create dir for {}", path.display()))?;
379 }
380 match open_connection_with_latest_schema(path) {
381 Ok(conn) => Ok(Self {
382 conn: Mutex::new(conn),
383 }),
384 Err(err) => {
385 if !is_schema_compat_error(&err) {
386 return Err(err);
387 }
388
389 rotate_legacy_db(path)?;
392
393 let conn = open_connection_with_latest_schema(path)
394 .with_context(|| format!("recreate db {}", path.display()))?;
395 Ok(Self {
396 conn: Mutex::new(conn),
397 })
398 }
399 }
400 }
401
402 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
403 self.conn.lock().expect("local db mutex poisoned")
404 }
405
406 pub fn upsert_local_session(
409 &self,
410 session: &Session,
411 source_path: &str,
412 git: &GitContext,
413 ) -> Result<()> {
414 let title = session.context.title.as_deref();
415 let description = session.context.description.as_deref();
416 let tags = if session.context.tags.is_empty() {
417 None
418 } else {
419 Some(session.context.tags.join(","))
420 };
421 let created_at = session.context.created_at.to_rfc3339();
422 let cwd = session
423 .context
424 .attributes
425 .get("cwd")
426 .or_else(|| session.context.attributes.get("working_directory"))
427 .and_then(|v| v.as_str().map(String::from));
428
429 let (files_modified, files_read, has_errors) =
431 opensession_core::extract::extract_file_metadata(session);
432 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
433
434 let conn = self.conn();
435 conn.execute(
436 "INSERT INTO sessions \
437 (id, team_id, tool, agent_provider, agent_model, \
438 title, description, tags, created_at, \
439 message_count, user_message_count, task_count, event_count, duration_seconds, \
440 total_input_tokens, total_output_tokens, body_storage_key, \
441 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
442 files_modified, files_read, has_errors, max_active_agents) \
443 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
444 ON CONFLICT(id) DO UPDATE SET \
445 tool=excluded.tool, agent_provider=excluded.agent_provider, \
446 agent_model=excluded.agent_model, \
447 title=excluded.title, description=excluded.description, \
448 tags=excluded.tags, \
449 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
450 task_count=excluded.task_count, \
451 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
452 total_input_tokens=excluded.total_input_tokens, \
453 total_output_tokens=excluded.total_output_tokens, \
454 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
455 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
456 working_directory=excluded.working_directory, \
457 files_modified=excluded.files_modified, files_read=excluded.files_read, \
458 has_errors=excluded.has_errors, \
459 max_active_agents=excluded.max_active_agents",
460 params![
461 &session.session_id,
462 &session.agent.tool,
463 &session.agent.provider,
464 &session.agent.model,
465 title,
466 description,
467 &tags,
468 &created_at,
469 session.stats.message_count as i64,
470 session.stats.user_message_count as i64,
471 session.stats.task_count as i64,
472 session.stats.event_count as i64,
473 session.stats.duration_seconds as i64,
474 session.stats.total_input_tokens as i64,
475 session.stats.total_output_tokens as i64,
476 &git.remote,
477 &git.branch,
478 &git.commit,
479 &git.repo_name,
480 &cwd,
481 &files_modified,
482 &files_read,
483 has_errors,
484 max_active_agents,
485 ],
486 )?;
487
488 conn.execute(
489 "INSERT INTO session_sync (session_id, source_path, sync_status) \
490 VALUES (?1, ?2, 'local_only') \
491 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
492 params![&session.session_id, source_path],
493 )?;
494 Ok(())
495 }
496
497 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
500 let conn = self.conn();
501 conn.execute(
502 "INSERT INTO sessions \
503 (id, user_id, team_id, tool, agent_provider, agent_model, \
504 title, description, tags, created_at, uploaded_at, \
505 message_count, task_count, event_count, duration_seconds, \
506 total_input_tokens, total_output_tokens, body_storage_key, \
507 git_remote, git_branch, git_commit, git_repo_name, \
508 pr_number, pr_url, working_directory, \
509 files_modified, files_read, has_errors, max_active_agents) \
510 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
511 ON CONFLICT(id) DO UPDATE SET \
512 title=excluded.title, description=excluded.description, \
513 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
514 message_count=excluded.message_count, task_count=excluded.task_count, \
515 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
516 total_input_tokens=excluded.total_input_tokens, \
517 total_output_tokens=excluded.total_output_tokens, \
518 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
519 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
520 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
521 working_directory=excluded.working_directory, \
522 files_modified=excluded.files_modified, files_read=excluded.files_read, \
523 has_errors=excluded.has_errors, \
524 max_active_agents=excluded.max_active_agents",
525 params![
526 &summary.id,
527 &summary.user_id,
528 &summary.team_id,
529 &summary.tool,
530 &summary.agent_provider,
531 &summary.agent_model,
532 &summary.title,
533 &summary.description,
534 &summary.tags,
535 &summary.created_at,
536 &summary.uploaded_at,
537 summary.message_count,
538 summary.task_count,
539 summary.event_count,
540 summary.duration_seconds,
541 summary.total_input_tokens,
542 summary.total_output_tokens,
543 &summary.git_remote,
544 &summary.git_branch,
545 &summary.git_commit,
546 &summary.git_repo_name,
547 summary.pr_number,
548 &summary.pr_url,
549 &summary.working_directory,
550 &summary.files_modified,
551 &summary.files_read,
552 summary.has_errors,
553 summary.max_active_agents,
554 ],
555 )?;
556
557 conn.execute(
558 "INSERT INTO session_sync (session_id, sync_status) \
559 VALUES (?1, 'remote_only') \
560 ON CONFLICT(session_id) DO UPDATE SET \
561 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
562 params![&summary.id],
563 )?;
564 Ok(())
565 }
566
567 fn build_local_session_where_clause(
570 filter: &LocalSessionFilter,
571 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
572 let mut where_clauses = vec![
573 "1=1".to_string(),
574 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
575 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
576 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
577 ];
578 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
579 let mut idx = 1u32;
580
581 if let Some(ref team_id) = filter.team_id {
582 where_clauses.push(format!("s.team_id = ?{idx}"));
583 param_values.push(Box::new(team_id.clone()));
584 idx += 1;
585 }
586
587 if let Some(ref sync_status) = filter.sync_status {
588 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
589 param_values.push(Box::new(sync_status.clone()));
590 idx += 1;
591 }
592
593 if let Some(ref repo) = filter.git_repo_name {
594 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
595 param_values.push(Box::new(repo.clone()));
596 idx += 1;
597 }
598
599 if let Some(ref tool) = filter.tool {
600 where_clauses.push(format!("s.tool = ?{idx}"));
601 param_values.push(Box::new(tool.clone()));
602 idx += 1;
603 }
604
605 if let Some(ref search) = filter.search {
606 let like = format!("%{search}%");
607 where_clauses.push(format!(
608 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
609 i1 = idx,
610 i2 = idx + 1,
611 i3 = idx + 2,
612 ));
613 param_values.push(Box::new(like.clone()));
614 param_values.push(Box::new(like.clone()));
615 param_values.push(Box::new(like));
616 idx += 3;
617 }
618
619 let interval = match filter.time_range {
620 LocalTimeRange::Hours24 => Some("-1 day"),
621 LocalTimeRange::Days7 => Some("-7 days"),
622 LocalTimeRange::Days30 => Some("-30 days"),
623 LocalTimeRange::All => None,
624 };
625 if let Some(interval) = interval {
626 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
627 param_values.push(Box::new(interval.to_string()));
628 }
629
630 (where_clauses.join(" AND "), param_values)
631 }
632
633 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
634 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
635 let order_clause = match filter.sort {
636 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
637 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
638 LocalSortOrder::Recent => "s.created_at DESC",
639 };
640
641 let mut sql = format!(
642 "SELECT {LOCAL_SESSION_COLUMNS} \
643 {FROM_CLAUSE} WHERE {where_str} \
644 ORDER BY {order_clause}"
645 );
646
647 if let Some(limit) = filter.limit {
648 sql.push_str(" LIMIT ?");
649 param_values.push(Box::new(limit));
650 if let Some(offset) = filter.offset {
651 sql.push_str(" OFFSET ?");
652 param_values.push(Box::new(offset));
653 }
654 }
655
656 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
657 param_values.iter().map(|p| p.as_ref()).collect();
658 let conn = self.conn();
659 let mut stmt = conn.prepare(&sql)?;
660 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
661
662 let mut result = Vec::new();
663 for row in rows {
664 result.push(row?);
665 }
666
667 Ok(hide_opencode_child_sessions(result))
668 }
669
670 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
672 let mut count_filter = filter.clone();
673 count_filter.limit = None;
674 count_filter.offset = None;
675 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
676 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
677 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
678 param_values.iter().map(|p| p.as_ref()).collect();
679 let conn = self.conn();
680 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
681 Ok(count)
682 }
683
684 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
686 let mut tool_filter = filter.clone();
687 tool_filter.tool = None;
688 tool_filter.limit = None;
689 tool_filter.offset = None;
690 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
691 let sql = format!(
692 "SELECT DISTINCT s.tool \
693 {FROM_CLAUSE} WHERE {where_str} \
694 ORDER BY s.tool ASC"
695 );
696 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
697 param_values.iter().map(|p| p.as_ref()).collect();
698 let conn = self.conn();
699 let mut stmt = conn.prepare(&sql)?;
700 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
701
702 let mut tools = Vec::new();
703 for row in rows {
704 let tool = row?;
705 if !tool.trim().is_empty() {
706 tools.push(tool);
707 }
708 }
709 Ok(tools)
710 }
711
712 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
716 let mut where_clauses = vec!["1=1".to_string()];
717 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
718 let mut idx = 1u32;
719
720 if let Some(ref tool) = filter.tool {
721 where_clauses.push(format!("s.tool = ?{idx}"));
722 param_values.push(Box::new(tool.clone()));
723 idx += 1;
724 }
725
726 if let Some(ref model) = filter.model {
727 let like = model.replace('*', "%");
728 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
729 param_values.push(Box::new(like));
730 idx += 1;
731 }
732
733 if let Some(ref since) = filter.since {
734 where_clauses.push(format!("s.created_at >= ?{idx}"));
735 param_values.push(Box::new(since.clone()));
736 idx += 1;
737 }
738
739 if let Some(ref before) = filter.before {
740 where_clauses.push(format!("s.created_at < ?{idx}"));
741 param_values.push(Box::new(before.clone()));
742 idx += 1;
743 }
744
745 if let Some(ref touches) = filter.touches {
746 let like = format!("%\"{touches}\"%");
747 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
748 param_values.push(Box::new(like));
749 idx += 1;
750 }
751
752 if let Some(ref grep) = filter.grep {
753 let like = format!("%{grep}%");
754 where_clauses.push(format!(
755 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
756 i1 = idx,
757 i2 = idx + 1,
758 i3 = idx + 2,
759 ));
760 param_values.push(Box::new(like.clone()));
761 param_values.push(Box::new(like.clone()));
762 param_values.push(Box::new(like));
763 idx += 3;
764 }
765
766 if let Some(true) = filter.has_errors {
767 where_clauses.push("s.has_errors = 1".to_string());
768 }
769
770 if let Some(ref wd) = filter.working_directory {
771 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
772 param_values.push(Box::new(format!("{wd}%")));
773 idx += 1;
774 }
775
776 if let Some(ref repo) = filter.git_repo_name {
777 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
778 param_values.push(Box::new(repo.clone()));
779 idx += 1;
780 }
781
782 let mut extra_join = String::new();
784 if let Some(ref commit) = filter.commit {
785 extra_join =
786 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
787 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
788 param_values.push(Box::new(commit.clone()));
789 idx += 1;
790 }
791
792 let _ = idx; let where_str = where_clauses.join(" AND ");
795 let mut sql = format!(
796 "SELECT {LOCAL_SESSION_COLUMNS} \
797 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
798 ORDER BY s.created_at DESC"
799 );
800
801 if let Some(limit) = filter.limit {
802 sql.push_str(" LIMIT ?");
803 param_values.push(Box::new(limit));
804 if let Some(offset) = filter.offset {
805 sql.push_str(" OFFSET ?");
806 param_values.push(Box::new(offset));
807 }
808 }
809
810 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
811 param_values.iter().map(|p| p.as_ref()).collect();
812 let conn = self.conn();
813 let mut stmt = conn.prepare(&sql)?;
814 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
815
816 let mut result = Vec::new();
817 for row in rows {
818 result.push(row?);
819 }
820 Ok(hide_opencode_child_sessions(result))
821 }
822
823 pub fn get_sessions_by_tool_latest(
825 &self,
826 tool: &str,
827 count: u32,
828 ) -> Result<Vec<LocalSessionRow>> {
829 let sql = format!(
830 "SELECT {LOCAL_SESSION_COLUMNS} \
831 {FROM_CLAUSE} WHERE s.tool = ?1 \
832 ORDER BY s.created_at DESC"
833 );
834 let conn = self.conn();
835 let mut stmt = conn.prepare(&sql)?;
836 let rows = stmt.query_map(params![tool], row_to_local_session)?;
837 let mut result = Vec::new();
838 for row in rows {
839 result.push(row?);
840 }
841
842 let mut filtered = hide_opencode_child_sessions(result);
843 filtered.truncate(count as usize);
844 Ok(filtered)
845 }
846
847 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
849 let sql = format!(
850 "SELECT {LOCAL_SESSION_COLUMNS} \
851 {FROM_CLAUSE} \
852 ORDER BY s.created_at DESC"
853 );
854 let conn = self.conn();
855 let mut stmt = conn.prepare(&sql)?;
856 let rows = stmt.query_map([], row_to_local_session)?;
857 let mut result = Vec::new();
858 for row in rows {
859 result.push(row?);
860 }
861
862 let mut filtered = hide_opencode_child_sessions(result);
863 filtered.truncate(count as usize);
864 Ok(filtered)
865 }
866
867 pub fn get_session_by_tool_offset(
869 &self,
870 tool: &str,
871 offset: u32,
872 ) -> Result<Option<LocalSessionRow>> {
873 let sql = format!(
874 "SELECT {LOCAL_SESSION_COLUMNS} \
875 {FROM_CLAUSE} WHERE s.tool = ?1 \
876 ORDER BY s.created_at DESC"
877 );
878 let conn = self.conn();
879 let mut stmt = conn.prepare(&sql)?;
880 let rows = stmt.query_map(params![tool], row_to_local_session)?;
881 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
882 Ok(filtered.into_iter().nth(offset as usize))
883 }
884
885 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
887 let sql = format!(
888 "SELECT {LOCAL_SESSION_COLUMNS} \
889 {FROM_CLAUSE} \
890 ORDER BY s.created_at DESC"
891 );
892 let conn = self.conn();
893 let mut stmt = conn.prepare(&sql)?;
894 let rows = stmt.query_map([], row_to_local_session)?;
895 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
896 Ok(filtered.into_iter().nth(offset as usize))
897 }
898
899 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
901 let conn = self.conn();
902 let result = conn
903 .query_row(
904 "SELECT source_path FROM session_sync WHERE session_id = ?1",
905 params![session_id],
906 |row| row.get(0),
907 )
908 .optional()?;
909
910 Ok(result)
911 }
912
913 pub fn session_count(&self) -> Result<i64> {
915 let count = self
916 .conn()
917 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
918 Ok(count)
919 }
920
921 pub fn delete_session(&self, session_id: &str) -> Result<()> {
924 let conn = self.conn();
925 conn.execute(
926 "DELETE FROM body_cache WHERE session_id = ?1",
927 params![session_id],
928 )?;
929 conn.execute(
930 "DELETE FROM session_sync WHERE session_id = ?1",
931 params![session_id],
932 )?;
933 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
934 Ok(())
935 }
936
937 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
940 let cursor = self
941 .conn()
942 .query_row(
943 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
944 params![team_id],
945 |row| row.get(0),
946 )
947 .optional()?;
948 Ok(cursor)
949 }
950
951 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
952 self.conn().execute(
953 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
954 VALUES (?1, ?2, datetime('now')) \
955 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
956 params![team_id, cursor],
957 )?;
958 Ok(())
959 }
960
961 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
965 let sql = format!(
966 "SELECT {LOCAL_SESSION_COLUMNS} \
967 FROM sessions s \
968 INNER JOIN session_sync ss ON ss.session_id = s.id \
969 LEFT JOIN users u ON u.id = s.user_id \
970 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
971 ORDER BY s.created_at ASC"
972 );
973 let conn = self.conn();
974 let mut stmt = conn.prepare(&sql)?;
975 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
976 let mut result = Vec::new();
977 for row in rows {
978 result.push(row?);
979 }
980 Ok(result)
981 }
982
983 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
984 self.conn().execute(
985 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
986 WHERE session_id = ?1",
987 params![session_id],
988 )?;
989 Ok(())
990 }
991
992 pub fn was_uploaded_after(
994 &self,
995 source_path: &str,
996 modified: &chrono::DateTime<chrono::Utc>,
997 ) -> Result<bool> {
998 let result: Option<String> = self
999 .conn()
1000 .query_row(
1001 "SELECT last_synced_at FROM session_sync \
1002 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1003 params![source_path],
1004 |row| row.get(0),
1005 )
1006 .optional()?;
1007
1008 if let Some(synced_at) = result {
1009 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1010 return Ok(dt >= *modified);
1011 }
1012 }
1013 Ok(false)
1014 }
1015
1016 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1019 self.conn().execute(
1020 "INSERT INTO body_cache (session_id, body, cached_at) \
1021 VALUES (?1, ?2, datetime('now')) \
1022 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1023 params![session_id, body],
1024 )?;
1025 Ok(())
1026 }
1027
1028 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1029 let body = self
1030 .conn()
1031 .query_row(
1032 "SELECT body FROM body_cache WHERE session_id = ?1",
1033 params![session_id],
1034 |row| row.get(0),
1035 )
1036 .optional()?;
1037 Ok(body)
1038 }
1039
1040 pub fn upsert_timeline_summary_cache(
1043 &self,
1044 lookup_key: &str,
1045 namespace: &str,
1046 compact: &str,
1047 payload: &str,
1048 raw: &str,
1049 ) -> Result<()> {
1050 self.conn().execute(
1051 "INSERT INTO timeline_summary_cache \
1052 (lookup_key, namespace, compact, payload, raw, cached_at) \
1053 VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1054 ON CONFLICT(lookup_key) DO UPDATE SET \
1055 namespace = excluded.namespace, \
1056 compact = excluded.compact, \
1057 payload = excluded.payload, \
1058 raw = excluded.raw, \
1059 cached_at = datetime('now')",
1060 params![lookup_key, namespace, compact, payload, raw],
1061 )?;
1062 Ok(())
1063 }
1064
1065 pub fn list_timeline_summary_cache_by_namespace(
1066 &self,
1067 namespace: &str,
1068 ) -> Result<Vec<TimelineSummaryCacheRow>> {
1069 let conn = self.conn();
1070 let mut stmt = conn.prepare(
1071 "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1072 FROM timeline_summary_cache \
1073 WHERE namespace = ?1 \
1074 ORDER BY cached_at DESC",
1075 )?;
1076 let rows = stmt.query_map(params![namespace], |row| {
1077 Ok(TimelineSummaryCacheRow {
1078 lookup_key: row.get(0)?,
1079 namespace: row.get(1)?,
1080 compact: row.get(2)?,
1081 payload: row.get(3)?,
1082 raw: row.get(4)?,
1083 cached_at: row.get(5)?,
1084 })
1085 })?;
1086
1087 let mut out = Vec::new();
1088 for row in rows {
1089 out.push(row?);
1090 }
1091 Ok(out)
1092 }
1093
1094 pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1095 let affected = self
1096 .conn()
1097 .execute("DELETE FROM timeline_summary_cache", [])?;
1098 Ok(affected)
1099 }
1100
1101 pub fn migrate_from_state_json(
1106 &self,
1107 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1108 ) -> Result<usize> {
1109 let mut count = 0;
1110 for (path, uploaded_at) in uploaded {
1111 let exists: bool = self
1112 .conn()
1113 .query_row(
1114 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1115 params![path],
1116 |row| row.get(0),
1117 )
1118 .unwrap_or(false);
1119
1120 if exists {
1121 self.conn().execute(
1122 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1123 WHERE source_path = ?2 AND sync_status = 'local_only'",
1124 params![uploaded_at.to_rfc3339(), path],
1125 )?;
1126 count += 1;
1127 }
1128 }
1129 Ok(count)
1130 }
1131
1132 pub fn link_commit_session(
1136 &self,
1137 commit_hash: &str,
1138 session_id: &str,
1139 repo_path: Option<&str>,
1140 branch: Option<&str>,
1141 ) -> Result<()> {
1142 self.conn().execute(
1143 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1144 VALUES (?1, ?2, ?3, ?4) \
1145 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1146 params![commit_hash, session_id, repo_path, branch],
1147 )?;
1148 Ok(())
1149 }
1150
1151 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1153 let sql = format!(
1154 "SELECT {LOCAL_SESSION_COLUMNS} \
1155 {FROM_CLAUSE} \
1156 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1157 WHERE csl.commit_hash = ?1 \
1158 ORDER BY s.created_at DESC"
1159 );
1160 let conn = self.conn();
1161 let mut stmt = conn.prepare(&sql)?;
1162 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1163 let mut result = Vec::new();
1164 for row in rows {
1165 result.push(row?);
1166 }
1167 Ok(result)
1168 }
1169
1170 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1172 let conn = self.conn();
1173 let mut stmt = conn.prepare(
1174 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1175 FROM commit_session_links WHERE session_id = ?1 \
1176 ORDER BY created_at DESC",
1177 )?;
1178 let rows = stmt.query_map(params![session_id], |row| {
1179 Ok(CommitLink {
1180 commit_hash: row.get(0)?,
1181 session_id: row.get(1)?,
1182 repo_path: row.get(2)?,
1183 branch: row.get(3)?,
1184 created_at: row.get(4)?,
1185 })
1186 })?;
1187 let mut result = Vec::new();
1188 for row in rows {
1189 result.push(row?);
1190 }
1191 Ok(result)
1192 }
1193
1194 pub fn find_active_session_for_repo(
1198 &self,
1199 repo_path: &str,
1200 since_minutes: u32,
1201 ) -> Result<Option<LocalSessionRow>> {
1202 let sql = format!(
1203 "SELECT {LOCAL_SESSION_COLUMNS} \
1204 {FROM_CLAUSE} \
1205 WHERE s.working_directory LIKE ?1 \
1206 AND s.created_at >= datetime('now', ?2) \
1207 ORDER BY s.created_at DESC LIMIT 1"
1208 );
1209 let since = format!("-{since_minutes} minutes");
1210 let like = format!("{repo_path}%");
1211 let conn = self.conn();
1212 let mut stmt = conn.prepare(&sql)?;
1213 let row = stmt
1214 .query_map(params![like, since], row_to_local_session)?
1215 .next()
1216 .transpose()?;
1217 Ok(row)
1218 }
1219
1220 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1222 let conn = self.conn();
1223 let mut stmt = conn
1224 .prepare("SELECT id FROM sessions")
1225 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1226 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1227 let mut set = std::collections::HashSet::new();
1228 if let Ok(rows) = rows {
1229 for row in rows.flatten() {
1230 set.insert(row);
1231 }
1232 }
1233 set
1234 }
1235
1236 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1238 let title = session.context.title.as_deref();
1239 let description = session.context.description.as_deref();
1240 let (files_modified, files_read, has_errors) =
1241 opensession_core::extract::extract_file_metadata(session);
1242 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1243
1244 self.conn().execute(
1245 "UPDATE sessions SET \
1246 title=?2, description=?3, \
1247 message_count=?4, user_message_count=?5, task_count=?6, \
1248 event_count=?7, duration_seconds=?8, \
1249 total_input_tokens=?9, total_output_tokens=?10, \
1250 files_modified=?11, files_read=?12, has_errors=?13, \
1251 max_active_agents=?14 \
1252 WHERE id=?1",
1253 params![
1254 &session.session_id,
1255 title,
1256 description,
1257 session.stats.message_count as i64,
1258 session.stats.user_message_count as i64,
1259 session.stats.task_count as i64,
1260 session.stats.event_count as i64,
1261 session.stats.duration_seconds as i64,
1262 session.stats.total_input_tokens as i64,
1263 session.stats.total_output_tokens as i64,
1264 &files_modified,
1265 &files_read,
1266 has_errors,
1267 max_active_agents,
1268 ],
1269 )?;
1270 Ok(())
1271 }
1272
1273 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1275 self.conn().execute(
1276 "INSERT INTO session_sync (session_id, source_path) \
1277 VALUES (?1, ?2) \
1278 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1279 params![session_id, source_path],
1280 )?;
1281 Ok(())
1282 }
1283
1284 pub fn list_repos(&self) -> Result<Vec<String>> {
1286 let conn = self.conn();
1287 let mut stmt = conn.prepare(
1288 "SELECT DISTINCT git_repo_name FROM sessions \
1289 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1290 )?;
1291 let rows = stmt.query_map([], |row| row.get(0))?;
1292 let mut result = Vec::new();
1293 for row in rows {
1294 result.push(row?);
1295 }
1296 Ok(result)
1297 }
1298
1299 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1301 let conn = self.conn();
1302 let mut stmt = conn.prepare(
1303 "SELECT DISTINCT working_directory FROM sessions \
1304 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1305 ORDER BY working_directory ASC",
1306 )?;
1307 let rows = stmt.query_map([], |row| row.get(0))?;
1308 let mut result = Vec::new();
1309 for row in rows {
1310 result.push(row?);
1311 }
1312 Ok(result)
1313 }
1314}
1315
1316fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1319 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1320 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1321
1322 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1324
1325 apply_local_migrations(&conn)?;
1326
1327 ensure_sessions_columns(&conn)?;
1330 validate_local_schema(&conn)?;
1331
1332 Ok(conn)
1333}
1334
1335fn apply_local_migrations(conn: &Connection) -> Result<()> {
1336 conn.execute_batch(
1337 "CREATE TABLE IF NOT EXISTS _migrations (
1338 id INTEGER PRIMARY KEY,
1339 name TEXT NOT NULL UNIQUE,
1340 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1341 );",
1342 )
1343 .context("create _migrations table for local db")?;
1344
1345 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1346 let already_applied: bool = conn
1347 .query_row(
1348 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1349 [name],
1350 |row| row.get(0),
1351 )
1352 .unwrap_or(false);
1353
1354 if already_applied {
1355 continue;
1356 }
1357
1358 if let Err(e) = conn.execute_batch(sql) {
1359 let msg = e.to_string().to_ascii_lowercase();
1360 if !is_local_migration_compat_error(&msg) {
1361 return Err(e).with_context(|| format!("apply local migration {name}"));
1362 }
1363 }
1364
1365 conn.execute(
1366 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1367 [name],
1368 )
1369 .with_context(|| format!("record local migration {name}"))?;
1370 }
1371
1372 Ok(())
1373}
1374
1375fn is_local_migration_compat_error(msg: &str) -> bool {
1376 msg.contains("duplicate column name")
1377 || msg.contains("no such column")
1378 || msg.contains("already exists")
1379}
1380
1381fn validate_local_schema(conn: &Connection) -> Result<()> {
1382 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1383 conn.prepare(&sql)
1384 .map(|_| ())
1385 .context("validate local session schema")
1386}
1387
1388fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1389 let msg = format!("{err:#}").to_ascii_lowercase();
1390 msg.contains("no such column")
1391 || msg.contains("no such table")
1392 || msg.contains("cannot add a column")
1393 || msg.contains("already exists")
1394 || msg.contains("views may not be indexed")
1395 || msg.contains("malformed database schema")
1396 || msg.contains("duplicate column name")
1397}
1398
1399fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1400 if !path.exists() {
1401 return Ok(());
1402 }
1403
1404 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1405 let backup_name = format!(
1406 "{}.legacy-{}.bak",
1407 path.file_name()
1408 .and_then(|n| n.to_str())
1409 .unwrap_or("local.db"),
1410 ts
1411 );
1412 let backup_path = path.with_file_name(backup_name);
1413 std::fs::rename(path, &backup_path).with_context(|| {
1414 format!(
1415 "rotate legacy db {} -> {}",
1416 path.display(),
1417 backup_path.display()
1418 )
1419 })?;
1420
1421 let wal = PathBuf::from(format!("{}-wal", path.display()));
1422 let shm = PathBuf::from(format!("{}-shm", path.display()));
1423 let _ = std::fs::remove_file(wal);
1424 let _ = std::fs::remove_file(shm);
1425 Ok(())
1426}
1427
1428const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1429 ("user_id", "TEXT"),
1430 ("team_id", "TEXT DEFAULT 'personal'"),
1431 ("tool", "TEXT DEFAULT ''"),
1432 ("agent_provider", "TEXT"),
1433 ("agent_model", "TEXT"),
1434 ("title", "TEXT"),
1435 ("description", "TEXT"),
1436 ("tags", "TEXT"),
1437 ("created_at", "TEXT DEFAULT ''"),
1438 ("uploaded_at", "TEXT DEFAULT ''"),
1439 ("message_count", "INTEGER DEFAULT 0"),
1440 ("user_message_count", "INTEGER DEFAULT 0"),
1441 ("task_count", "INTEGER DEFAULT 0"),
1442 ("event_count", "INTEGER DEFAULT 0"),
1443 ("duration_seconds", "INTEGER DEFAULT 0"),
1444 ("total_input_tokens", "INTEGER DEFAULT 0"),
1445 ("total_output_tokens", "INTEGER DEFAULT 0"),
1446 ("body_storage_key", "TEXT DEFAULT ''"),
1447 ("body_url", "TEXT"),
1448 ("git_remote", "TEXT"),
1449 ("git_branch", "TEXT"),
1450 ("git_commit", "TEXT"),
1451 ("git_repo_name", "TEXT"),
1452 ("pr_number", "INTEGER"),
1453 ("pr_url", "TEXT"),
1454 ("working_directory", "TEXT"),
1455 ("files_modified", "TEXT"),
1456 ("files_read", "TEXT"),
1457 ("has_errors", "BOOLEAN DEFAULT 0"),
1458 ("max_active_agents", "INTEGER DEFAULT 1"),
1459];
1460
1461fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1462 let mut existing = HashSet::new();
1463 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1464 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1465 for row in rows {
1466 existing.insert(row?);
1467 }
1468
1469 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1470 if existing.contains(*name) {
1471 continue;
1472 }
1473 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1474 conn.execute_batch(&sql)
1475 .with_context(|| format!("add legacy sessions column '{name}'"))?;
1476 }
1477
1478 Ok(())
1479}
1480
1481pub const LOCAL_SESSION_COLUMNS: &str = "\
1483s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1484s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1485s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1486s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1487s.total_input_tokens, s.total_output_tokens, \
1488s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1489s.pr_number, s.pr_url, s.working_directory, \
1490s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1491
1492fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1493 Ok(LocalSessionRow {
1494 id: row.get(0)?,
1495 source_path: row.get(1)?,
1496 sync_status: row.get(2)?,
1497 last_synced_at: row.get(3)?,
1498 user_id: row.get(4)?,
1499 nickname: row.get(5)?,
1500 team_id: row.get(6)?,
1501 tool: row.get(7)?,
1502 agent_provider: row.get(8)?,
1503 agent_model: row.get(9)?,
1504 title: row.get(10)?,
1505 description: row.get(11)?,
1506 tags: row.get(12)?,
1507 created_at: row.get(13)?,
1508 uploaded_at: row.get(14)?,
1509 message_count: row.get(15)?,
1510 user_message_count: row.get(16)?,
1511 task_count: row.get(17)?,
1512 event_count: row.get(18)?,
1513 duration_seconds: row.get(19)?,
1514 total_input_tokens: row.get(20)?,
1515 total_output_tokens: row.get(21)?,
1516 git_remote: row.get(22)?,
1517 git_branch: row.get(23)?,
1518 git_commit: row.get(24)?,
1519 git_repo_name: row.get(25)?,
1520 pr_number: row.get(26)?,
1521 pr_url: row.get(27)?,
1522 working_directory: row.get(28)?,
1523 files_modified: row.get(29)?,
1524 files_read: row.get(30)?,
1525 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1526 max_active_agents: row.get(32).unwrap_or(1),
1527 })
1528}
1529
1530fn default_db_path() -> Result<PathBuf> {
1531 let home = std::env::var("HOME")
1532 .or_else(|_| std::env::var("USERPROFILE"))
1533 .context("Could not determine home directory")?;
1534 Ok(PathBuf::from(home)
1535 .join(".local")
1536 .join("share")
1537 .join("opensession")
1538 .join("local.db"))
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543 use super::*;
1544
1545 use std::collections::BTreeSet;
1546 use std::fs::{create_dir_all, write};
1547 use tempfile::tempdir;
1548
1549 fn test_db() -> LocalDb {
1550 let dir = tempdir().unwrap();
1551 let path = dir.keep().join("test.db");
1552 LocalDb::open_path(&path).unwrap()
1553 }
1554
1555 fn temp_root() -> tempfile::TempDir {
1556 tempdir().unwrap()
1557 }
1558
1559 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1560 LocalSessionRow {
1561 id: id.to_string(),
1562 source_path: source_path.map(String::from),
1563 sync_status: "local_only".to_string(),
1564 last_synced_at: None,
1565 user_id: None,
1566 nickname: None,
1567 team_id: None,
1568 tool: tool.to_string(),
1569 agent_provider: None,
1570 agent_model: None,
1571 title: Some("test".to_string()),
1572 description: None,
1573 tags: None,
1574 created_at: "2024-01-01T00:00:00Z".to_string(),
1575 uploaded_at: None,
1576 message_count: 0,
1577 user_message_count: 0,
1578 task_count: 0,
1579 event_count: 0,
1580 duration_seconds: 0,
1581 total_input_tokens: 0,
1582 total_output_tokens: 0,
1583 git_remote: None,
1584 git_branch: None,
1585 git_commit: None,
1586 git_repo_name: None,
1587 pr_number: None,
1588 pr_url: None,
1589 working_directory: None,
1590 files_modified: None,
1591 files_read: None,
1592 has_errors: false,
1593 max_active_agents: 1,
1594 }
1595 }
1596
1597 #[test]
1598 fn test_open_and_schema() {
1599 let _db = test_db();
1600 }
1601
1602 #[test]
1603 fn test_open_backfills_legacy_sessions_columns() {
1604 let dir = tempfile::tempdir().unwrap();
1605 let path = dir.path().join("legacy.db");
1606 {
1607 let conn = Connection::open(&path).unwrap();
1608 conn.execute_batch(
1609 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1610 INSERT INTO sessions (id) VALUES ('legacy-1');",
1611 )
1612 .unwrap();
1613 }
1614
1615 let db = LocalDb::open_path(&path).unwrap();
1616 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1617 assert_eq!(rows.len(), 1);
1618 assert_eq!(rows[0].id, "legacy-1");
1619 assert_eq!(rows[0].user_message_count, 0);
1620 }
1621
1622 #[test]
1623 fn test_open_rotates_incompatible_legacy_schema() {
1624 let dir = tempfile::tempdir().unwrap();
1625 let path = dir.path().join("broken.db");
1626 {
1627 let conn = Connection::open(&path).unwrap();
1628 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1629 .unwrap();
1630 }
1631
1632 let db = LocalDb::open_path(&path).unwrap();
1633 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1634 assert!(rows.is_empty());
1635
1636 let rotated = std::fs::read_dir(dir.path())
1637 .unwrap()
1638 .filter_map(Result::ok)
1639 .any(|entry| {
1640 let name = entry.file_name();
1641 let name = name.to_string_lossy();
1642 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1643 });
1644 assert!(rotated, "expected rotated legacy backup file");
1645 }
1646
1647 #[test]
1648 fn test_is_opencode_child_session() {
1649 let root = temp_root();
1650 let dir = root.path().join("sessions");
1651 create_dir_all(&dir).unwrap();
1652 let parent_session = dir.join("parent.json");
1653 write(
1654 &parent_session,
1655 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1656 )
1657 .unwrap();
1658 let child_session = dir.join("child.json");
1659 write(
1660 &child_session,
1661 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1662 )
1663 .unwrap();
1664
1665 let parent = make_row(
1666 "ses_parent",
1667 "opencode",
1668 Some(parent_session.to_str().unwrap()),
1669 );
1670 let child = make_row(
1671 "ses_child",
1672 "opencode",
1673 Some(child_session.to_str().unwrap()),
1674 );
1675 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1676
1677 assert!(!is_opencode_child_session(&parent));
1678 assert!(is_opencode_child_session(&child));
1679 assert!(!is_opencode_child_session(&codex));
1680 }
1681
1682 #[test]
1683 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1684 let child = LocalSessionRow {
1685 id: "sess_child".to_string(),
1686 source_path: None,
1687 sync_status: "local_only".to_string(),
1688 last_synced_at: None,
1689 user_id: None,
1690 nickname: None,
1691 team_id: None,
1692 tool: "opencode".to_string(),
1693 agent_provider: None,
1694 agent_model: None,
1695 title: None,
1696 description: None,
1697 tags: None,
1698 created_at: "2024-01-01T00:00:00Z".to_string(),
1699 uploaded_at: None,
1700 message_count: 1,
1701 user_message_count: 0,
1702 task_count: 4,
1703 event_count: 4,
1704 duration_seconds: 0,
1705 total_input_tokens: 0,
1706 total_output_tokens: 0,
1707 git_remote: None,
1708 git_branch: None,
1709 git_commit: None,
1710 git_repo_name: None,
1711 pr_number: None,
1712 pr_url: None,
1713 working_directory: None,
1714 files_modified: None,
1715 files_read: None,
1716 has_errors: false,
1717 max_active_agents: 1,
1718 };
1719
1720 let parent = LocalSessionRow {
1721 id: "sess_parent".to_string(),
1722 source_path: None,
1723 sync_status: "local_only".to_string(),
1724 last_synced_at: None,
1725 user_id: None,
1726 nickname: None,
1727 team_id: None,
1728 tool: "opencode".to_string(),
1729 agent_provider: None,
1730 agent_model: None,
1731 title: Some("regular".to_string()),
1732 description: None,
1733 tags: None,
1734 created_at: "2024-01-01T00:00:00Z".to_string(),
1735 uploaded_at: None,
1736 message_count: 1,
1737 user_message_count: 1,
1738 task_count: 2,
1739 event_count: 20,
1740 duration_seconds: 0,
1741 total_input_tokens: 0,
1742 total_output_tokens: 0,
1743 git_remote: None,
1744 git_branch: None,
1745 git_commit: None,
1746 git_repo_name: None,
1747 pr_number: None,
1748 pr_url: None,
1749 working_directory: None,
1750 files_modified: None,
1751 files_read: None,
1752 has_errors: false,
1753 max_active_agents: 1,
1754 };
1755
1756 assert!(is_opencode_child_session(&child));
1757 assert!(!is_opencode_child_session(&parent));
1758 }
1759
1760 #[test]
1761 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1762 let child = LocalSessionRow {
1763 id: "sess_child_2".to_string(),
1764 source_path: None,
1765 sync_status: "local_only".to_string(),
1766 last_synced_at: None,
1767 user_id: None,
1768 nickname: None,
1769 team_id: None,
1770 tool: "opencode".to_string(),
1771 agent_provider: None,
1772 agent_model: None,
1773 title: None,
1774 description: None,
1775 tags: None,
1776 created_at: "2024-01-01T00:00:00Z".to_string(),
1777 uploaded_at: None,
1778 message_count: 2,
1779 user_message_count: 0,
1780 task_count: 4,
1781 event_count: 4,
1782 duration_seconds: 0,
1783 total_input_tokens: 0,
1784 total_output_tokens: 0,
1785 git_remote: None,
1786 git_branch: None,
1787 git_commit: None,
1788 git_repo_name: None,
1789 pr_number: None,
1790 pr_url: None,
1791 working_directory: None,
1792 files_modified: None,
1793 files_read: None,
1794 has_errors: false,
1795 max_active_agents: 1,
1796 };
1797
1798 let parent = LocalSessionRow {
1799 id: "sess_parent".to_string(),
1800 source_path: None,
1801 sync_status: "local_only".to_string(),
1802 last_synced_at: None,
1803 user_id: None,
1804 nickname: None,
1805 team_id: None,
1806 tool: "opencode".to_string(),
1807 agent_provider: None,
1808 agent_model: None,
1809 title: Some("regular".to_string()),
1810 description: None,
1811 tags: None,
1812 created_at: "2024-01-01T00:00:00Z".to_string(),
1813 uploaded_at: None,
1814 message_count: 2,
1815 user_message_count: 1,
1816 task_count: 5,
1817 event_count: 20,
1818 duration_seconds: 0,
1819 total_input_tokens: 0,
1820 total_output_tokens: 0,
1821 git_remote: None,
1822 git_branch: None,
1823 git_commit: None,
1824 git_repo_name: None,
1825 pr_number: None,
1826 pr_url: None,
1827 working_directory: None,
1828 files_modified: None,
1829 files_read: None,
1830 has_errors: false,
1831 max_active_agents: 1,
1832 };
1833
1834 assert!(is_opencode_child_session(&child));
1835 assert!(!is_opencode_child_session(&parent));
1836 }
1837
1838 #[test]
1839 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1840 let child = LocalSessionRow {
1841 id: "sess_child_3".to_string(),
1842 source_path: None,
1843 sync_status: "local_only".to_string(),
1844 last_synced_at: None,
1845 user_id: None,
1846 nickname: None,
1847 team_id: None,
1848 tool: "opencode".to_string(),
1849 agent_provider: None,
1850 agent_model: None,
1851 title: None,
1852 description: None,
1853 tags: None,
1854 created_at: "2024-01-01T00:00:00Z".to_string(),
1855 uploaded_at: None,
1856 message_count: 3,
1857 user_message_count: 0,
1858 task_count: 2,
1859 event_count: 6,
1860 duration_seconds: 0,
1861 total_input_tokens: 0,
1862 total_output_tokens: 0,
1863 git_remote: None,
1864 git_branch: None,
1865 git_commit: None,
1866 git_repo_name: None,
1867 pr_number: None,
1868 pr_url: None,
1869 working_directory: None,
1870 files_modified: None,
1871 files_read: None,
1872 has_errors: false,
1873 max_active_agents: 1,
1874 };
1875
1876 assert!(is_opencode_child_session(&child));
1877 }
1878
1879 #[test]
1880 fn test_parse_opencode_parent_session_id_aliases() {
1881 let root = temp_root();
1882 let dir = root.path().join("session-aliases");
1883 create_dir_all(&dir).unwrap();
1884 let child_session = dir.join("child.json");
1885 write(
1886 &child_session,
1887 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1888 )
1889 .unwrap();
1890 assert_eq!(
1891 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1892 Some("ses_parent")
1893 );
1894 }
1895
1896 #[test]
1897 fn test_parse_opencode_parent_session_id_nested_metadata() {
1898 let root = temp_root();
1899 let dir = root.path().join("session-nested");
1900 create_dir_all(&dir).unwrap();
1901 let child_session = dir.join("child.json");
1902 write(
1903 &child_session,
1904 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1905 )
1906 .unwrap();
1907 assert_eq!(
1908 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1909 Some("ses_parent")
1910 );
1911 }
1912
1913 #[test]
1914 fn test_is_claude_subagent_session() {
1915 let row = make_row(
1916 "ses_parent",
1917 "claude-code",
1918 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1919 );
1920 assert!(!is_opencode_child_session(&row));
1921 assert!(is_claude_subagent_session(&row));
1922 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1923 }
1924
1925 #[test]
1926 fn test_hide_opencode_child_sessions() {
1927 let root = temp_root();
1928 let dir = root.path().join("sessions");
1929 create_dir_all(&dir).unwrap();
1930 let parent_session = dir.join("parent.json");
1931 let child_session = dir.join("child.json");
1932 let orphan_session = dir.join("orphan.json");
1933
1934 write(
1935 &parent_session,
1936 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1937 )
1938 .unwrap();
1939 write(
1940 &child_session,
1941 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1942 )
1943 .unwrap();
1944 write(
1945 &orphan_session,
1946 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1947 )
1948 .unwrap();
1949
1950 let rows = vec![
1951 make_row(
1952 "ses_child",
1953 "opencode",
1954 Some(child_session.to_str().unwrap()),
1955 ),
1956 make_row(
1957 "ses_parent",
1958 "opencode",
1959 Some(parent_session.to_str().unwrap()),
1960 ),
1961 {
1962 let mut row = make_row("ses_other", "codex", None);
1963 row.user_message_count = 1;
1964 row
1965 },
1966 make_row(
1967 "ses_orphan",
1968 "opencode",
1969 Some(orphan_session.to_str().unwrap()),
1970 ),
1971 ];
1972
1973 let filtered = hide_opencode_child_sessions(rows);
1974 assert_eq!(filtered.len(), 3);
1975 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1976 }
1977
1978 #[test]
1979 fn test_sync_cursor() {
1980 let db = test_db();
1981 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1982 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1983 assert_eq!(
1984 db.get_sync_cursor("team1").unwrap(),
1985 Some("2024-01-01T00:00:00Z".to_string())
1986 );
1987 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1989 assert_eq!(
1990 db.get_sync_cursor("team1").unwrap(),
1991 Some("2024-06-01T00:00:00Z".to_string())
1992 );
1993 }
1994
1995 #[test]
1996 fn test_body_cache() {
1997 let db = test_db();
1998 assert_eq!(db.get_cached_body("s1").unwrap(), None);
1999 db.cache_body("s1", b"hello world").unwrap();
2000 assert_eq!(
2001 db.get_cached_body("s1").unwrap(),
2002 Some(b"hello world".to_vec())
2003 );
2004 }
2005
2006 #[test]
2007 fn test_timeline_summary_cache_roundtrip() {
2008 let db = test_db();
2009 db.upsert_timeline_summary_cache(
2010 "k1",
2011 "timeline:v1",
2012 "compact text",
2013 "{\"kind\":\"turn-summary\"}",
2014 "raw text",
2015 )
2016 .unwrap();
2017
2018 let rows = db
2019 .list_timeline_summary_cache_by_namespace("timeline:v1")
2020 .unwrap();
2021 assert_eq!(rows.len(), 1);
2022 assert_eq!(rows[0].lookup_key, "k1");
2023 assert_eq!(rows[0].namespace, "timeline:v1");
2024 assert_eq!(rows[0].compact, "compact text");
2025 assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2026 assert_eq!(rows[0].raw, "raw text");
2027
2028 let cleared = db.clear_timeline_summary_cache().unwrap();
2029 assert_eq!(cleared, 1);
2030 let rows_after = db
2031 .list_timeline_summary_cache_by_namespace("timeline:v1")
2032 .unwrap();
2033 assert!(rows_after.is_empty());
2034 }
2035
2036 #[test]
2037 fn test_local_migrations_include_timeline_summary_cache() {
2038 let db = test_db();
2039 let conn = db.conn();
2040 let applied: bool = conn
2041 .query_row(
2042 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2043 params!["local_0003_timeline_summary_cache"],
2044 |row| row.get(0),
2045 )
2046 .unwrap();
2047 assert!(applied);
2048 }
2049
2050 #[test]
2051 fn test_local_migration_files_match_api_local_migrations() {
2052 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2053 std::fs::read_dir(dir)
2054 .expect("read migrations directory")
2055 .filter_map(Result::ok)
2056 .map(|entry| entry.file_name().to_string_lossy().to_string())
2057 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2058 .collect()
2059 }
2060
2061 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2062 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2063 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2064
2065 assert_eq!(
2066 local_files, api_files,
2067 "local-db local migrations must stay in parity with api local migrations"
2068 );
2069 }
2070
2071 #[test]
2072 fn test_upsert_remote_session() {
2073 let db = test_db();
2074 let summary = RemoteSessionSummary {
2075 id: "remote-1".to_string(),
2076 user_id: Some("u1".to_string()),
2077 nickname: Some("alice".to_string()),
2078 team_id: "t1".to_string(),
2079 tool: "claude-code".to_string(),
2080 agent_provider: None,
2081 agent_model: None,
2082 title: Some("Test session".to_string()),
2083 description: None,
2084 tags: None,
2085 created_at: "2024-01-01T00:00:00Z".to_string(),
2086 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2087 message_count: 10,
2088 task_count: 2,
2089 event_count: 20,
2090 duration_seconds: 300,
2091 total_input_tokens: 1000,
2092 total_output_tokens: 500,
2093 git_remote: None,
2094 git_branch: None,
2095 git_commit: None,
2096 git_repo_name: None,
2097 pr_number: None,
2098 pr_url: None,
2099 working_directory: None,
2100 files_modified: None,
2101 files_read: None,
2102 has_errors: false,
2103 max_active_agents: 1,
2104 };
2105 db.upsert_remote_session(&summary).unwrap();
2106
2107 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2108 assert_eq!(sessions.len(), 1);
2109 assert_eq!(sessions[0].id, "remote-1");
2110 assert_eq!(sessions[0].sync_status, "remote_only");
2111 assert_eq!(sessions[0].nickname, None); }
2113
2114 #[test]
2115 fn test_list_filter_by_repo() {
2116 let db = test_db();
2117 let summary1 = RemoteSessionSummary {
2119 id: "s1".to_string(),
2120 user_id: None,
2121 nickname: None,
2122 team_id: "t1".to_string(),
2123 tool: "claude-code".to_string(),
2124 agent_provider: None,
2125 agent_model: None,
2126 title: Some("Session 1".to_string()),
2127 description: None,
2128 tags: None,
2129 created_at: "2024-01-01T00:00:00Z".to_string(),
2130 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2131 message_count: 5,
2132 task_count: 0,
2133 event_count: 10,
2134 duration_seconds: 60,
2135 total_input_tokens: 100,
2136 total_output_tokens: 50,
2137 git_remote: None,
2138 git_branch: None,
2139 git_commit: None,
2140 git_repo_name: None,
2141 pr_number: None,
2142 pr_url: None,
2143 working_directory: None,
2144 files_modified: None,
2145 files_read: None,
2146 has_errors: false,
2147 max_active_agents: 1,
2148 };
2149 db.upsert_remote_session(&summary1).unwrap();
2150
2151 let filter = LocalSessionFilter {
2153 team_id: Some("t1".to_string()),
2154 ..Default::default()
2155 };
2156 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2157
2158 let filter = LocalSessionFilter {
2159 team_id: Some("t999".to_string()),
2160 ..Default::default()
2161 };
2162 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2163 }
2164
2165 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2168 RemoteSessionSummary {
2169 id: id.to_string(),
2170 user_id: None,
2171 nickname: None,
2172 team_id: "t1".to_string(),
2173 tool: tool.to_string(),
2174 agent_provider: Some("anthropic".to_string()),
2175 agent_model: Some("claude-opus-4-6".to_string()),
2176 title: Some(title.to_string()),
2177 description: None,
2178 tags: None,
2179 created_at: created_at.to_string(),
2180 uploaded_at: created_at.to_string(),
2181 message_count: 5,
2182 task_count: 1,
2183 event_count: 10,
2184 duration_seconds: 300,
2185 total_input_tokens: 1000,
2186 total_output_tokens: 500,
2187 git_remote: None,
2188 git_branch: None,
2189 git_commit: None,
2190 git_repo_name: None,
2191 pr_number: None,
2192 pr_url: None,
2193 working_directory: None,
2194 files_modified: None,
2195 files_read: None,
2196 has_errors: false,
2197 max_active_agents: 1,
2198 }
2199 }
2200
2201 fn seed_sessions(db: &LocalDb) {
2202 db.upsert_remote_session(&make_summary(
2204 "s1",
2205 "claude-code",
2206 "First session",
2207 "2024-01-01T00:00:00Z",
2208 ))
2209 .unwrap();
2210 db.upsert_remote_session(&make_summary(
2211 "s2",
2212 "claude-code",
2213 "JWT auth work",
2214 "2024-01-02T00:00:00Z",
2215 ))
2216 .unwrap();
2217 db.upsert_remote_session(&make_summary(
2218 "s3",
2219 "gemini",
2220 "Gemini test",
2221 "2024-01-03T00:00:00Z",
2222 ))
2223 .unwrap();
2224 db.upsert_remote_session(&make_summary(
2225 "s4",
2226 "claude-code",
2227 "Error handling",
2228 "2024-01-04T00:00:00Z",
2229 ))
2230 .unwrap();
2231 db.upsert_remote_session(&make_summary(
2232 "s5",
2233 "claude-code",
2234 "Final polish",
2235 "2024-01-05T00:00:00Z",
2236 ))
2237 .unwrap();
2238 }
2239
2240 #[test]
2243 fn test_log_no_filters() {
2244 let db = test_db();
2245 seed_sessions(&db);
2246 let filter = LogFilter::default();
2247 let results = db.list_sessions_log(&filter).unwrap();
2248 assert_eq!(results.len(), 5);
2249 assert_eq!(results[0].id, "s5");
2251 assert_eq!(results[4].id, "s1");
2252 }
2253
2254 #[test]
2255 fn test_log_filter_by_tool() {
2256 let db = test_db();
2257 seed_sessions(&db);
2258 let filter = LogFilter {
2259 tool: Some("claude-code".to_string()),
2260 ..Default::default()
2261 };
2262 let results = db.list_sessions_log(&filter).unwrap();
2263 assert_eq!(results.len(), 4);
2264 assert!(results.iter().all(|s| s.tool == "claude-code"));
2265 }
2266
2267 #[test]
2268 fn test_log_filter_by_model_wildcard() {
2269 let db = test_db();
2270 seed_sessions(&db);
2271 let filter = LogFilter {
2272 model: Some("claude*".to_string()),
2273 ..Default::default()
2274 };
2275 let results = db.list_sessions_log(&filter).unwrap();
2276 assert_eq!(results.len(), 5); }
2278
2279 #[test]
2280 fn test_log_filter_since() {
2281 let db = test_db();
2282 seed_sessions(&db);
2283 let filter = LogFilter {
2284 since: Some("2024-01-03T00:00:00Z".to_string()),
2285 ..Default::default()
2286 };
2287 let results = db.list_sessions_log(&filter).unwrap();
2288 assert_eq!(results.len(), 3); }
2290
2291 #[test]
2292 fn test_log_filter_before() {
2293 let db = test_db();
2294 seed_sessions(&db);
2295 let filter = LogFilter {
2296 before: Some("2024-01-03T00:00:00Z".to_string()),
2297 ..Default::default()
2298 };
2299 let results = db.list_sessions_log(&filter).unwrap();
2300 assert_eq!(results.len(), 2); }
2302
2303 #[test]
2304 fn test_log_filter_since_and_before() {
2305 let db = test_db();
2306 seed_sessions(&db);
2307 let filter = LogFilter {
2308 since: Some("2024-01-02T00:00:00Z".to_string()),
2309 before: Some("2024-01-04T00:00:00Z".to_string()),
2310 ..Default::default()
2311 };
2312 let results = db.list_sessions_log(&filter).unwrap();
2313 assert_eq!(results.len(), 2); }
2315
2316 #[test]
2317 fn test_log_filter_grep() {
2318 let db = test_db();
2319 seed_sessions(&db);
2320 let filter = LogFilter {
2321 grep: Some("JWT".to_string()),
2322 ..Default::default()
2323 };
2324 let results = db.list_sessions_log(&filter).unwrap();
2325 assert_eq!(results.len(), 1);
2326 assert_eq!(results[0].id, "s2");
2327 }
2328
2329 #[test]
2330 fn test_log_limit_and_offset() {
2331 let db = test_db();
2332 seed_sessions(&db);
2333 let filter = LogFilter {
2334 limit: Some(2),
2335 offset: Some(1),
2336 ..Default::default()
2337 };
2338 let results = db.list_sessions_log(&filter).unwrap();
2339 assert_eq!(results.len(), 2);
2340 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2342 }
2343
2344 #[test]
2345 fn test_log_limit_only() {
2346 let db = test_db();
2347 seed_sessions(&db);
2348 let filter = LogFilter {
2349 limit: Some(3),
2350 ..Default::default()
2351 };
2352 let results = db.list_sessions_log(&filter).unwrap();
2353 assert_eq!(results.len(), 3);
2354 }
2355
2356 #[test]
2357 fn test_list_sessions_limit_offset() {
2358 let db = test_db();
2359 seed_sessions(&db);
2360 let filter = LocalSessionFilter {
2361 limit: Some(2),
2362 offset: Some(1),
2363 ..Default::default()
2364 };
2365 let results = db.list_sessions(&filter).unwrap();
2366 assert_eq!(results.len(), 2);
2367 assert_eq!(results[0].id, "s4");
2368 assert_eq!(results[1].id, "s3");
2369 }
2370
2371 #[test]
2372 fn test_count_sessions_filtered() {
2373 let db = test_db();
2374 seed_sessions(&db);
2375 let count = db
2376 .count_sessions_filtered(&LocalSessionFilter::default())
2377 .unwrap();
2378 assert_eq!(count, 5);
2379 }
2380
2381 #[test]
2382 fn test_list_working_directories_distinct_non_empty() {
2383 let db = test_db();
2384
2385 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2386 a.working_directory = Some("/tmp/repo-a".to_string());
2387 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2388 b.working_directory = Some("/tmp/repo-a".to_string());
2389 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2390 c.working_directory = Some("/tmp/repo-b".to_string());
2391 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2392 d.working_directory = Some("".to_string());
2393
2394 db.upsert_remote_session(&a).unwrap();
2395 db.upsert_remote_session(&b).unwrap();
2396 db.upsert_remote_session(&c).unwrap();
2397 db.upsert_remote_session(&d).unwrap();
2398
2399 let dirs = db.list_working_directories().unwrap();
2400 assert_eq!(
2401 dirs,
2402 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2403 );
2404 }
2405
2406 #[test]
2407 fn test_list_session_tools() {
2408 let db = test_db();
2409 seed_sessions(&db);
2410 let tools = db
2411 .list_session_tools(&LocalSessionFilter::default())
2412 .unwrap();
2413 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2414 }
2415
2416 #[test]
2417 fn test_log_combined_filters() {
2418 let db = test_db();
2419 seed_sessions(&db);
2420 let filter = LogFilter {
2421 tool: Some("claude-code".to_string()),
2422 since: Some("2024-01-03T00:00:00Z".to_string()),
2423 limit: Some(1),
2424 ..Default::default()
2425 };
2426 let results = db.list_sessions_log(&filter).unwrap();
2427 assert_eq!(results.len(), 1);
2428 assert_eq!(results[0].id, "s5"); }
2430
2431 #[test]
2434 fn test_get_session_by_offset() {
2435 let db = test_db();
2436 seed_sessions(&db);
2437 let row = db.get_session_by_offset(0).unwrap().unwrap();
2438 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2440 assert_eq!(row.id, "s3");
2441 assert!(db.get_session_by_offset(10).unwrap().is_none());
2442 }
2443
2444 #[test]
2445 fn test_get_session_by_tool_offset() {
2446 let db = test_db();
2447 seed_sessions(&db);
2448 let row = db
2449 .get_session_by_tool_offset("claude-code", 0)
2450 .unwrap()
2451 .unwrap();
2452 assert_eq!(row.id, "s5");
2453 let row = db
2454 .get_session_by_tool_offset("claude-code", 1)
2455 .unwrap()
2456 .unwrap();
2457 assert_eq!(row.id, "s4");
2458 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2459 assert_eq!(row.id, "s3");
2460 assert!(db
2461 .get_session_by_tool_offset("gemini", 1)
2462 .unwrap()
2463 .is_none());
2464 }
2465
2466 #[test]
2467 fn test_get_sessions_latest() {
2468 let db = test_db();
2469 seed_sessions(&db);
2470 let rows = db.get_sessions_latest(3).unwrap();
2471 assert_eq!(rows.len(), 3);
2472 assert_eq!(rows[0].id, "s5");
2473 assert_eq!(rows[1].id, "s4");
2474 assert_eq!(rows[2].id, "s3");
2475 }
2476
2477 #[test]
2478 fn test_get_sessions_by_tool_latest() {
2479 let db = test_db();
2480 seed_sessions(&db);
2481 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2482 assert_eq!(rows.len(), 2);
2483 assert_eq!(rows[0].id, "s5");
2484 assert_eq!(rows[1].id, "s4");
2485 }
2486
2487 #[test]
2488 fn test_get_sessions_latest_more_than_available() {
2489 let db = test_db();
2490 seed_sessions(&db);
2491 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2492 assert_eq!(rows.len(), 1); }
2494
2495 #[test]
2496 fn test_session_count() {
2497 let db = test_db();
2498 assert_eq!(db.session_count().unwrap(), 0);
2499 seed_sessions(&db);
2500 assert_eq!(db.session_count().unwrap(), 5);
2501 }
2502
2503 #[test]
2506 fn test_link_commit_session() {
2507 let db = test_db();
2508 seed_sessions(&db);
2509 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2510 .unwrap();
2511
2512 let commits = db.get_commits_by_session("s1").unwrap();
2513 assert_eq!(commits.len(), 1);
2514 assert_eq!(commits[0].commit_hash, "abc123");
2515 assert_eq!(commits[0].session_id, "s1");
2516 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2517 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2518
2519 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2520 assert_eq!(sessions.len(), 1);
2521 assert_eq!(sessions[0].id, "s1");
2522 }
2523
2524 #[test]
2525 fn test_get_sessions_by_commit() {
2526 let db = test_db();
2527 seed_sessions(&db);
2528 db.link_commit_session("abc123", "s1", None, None).unwrap();
2530 db.link_commit_session("abc123", "s2", None, None).unwrap();
2531 db.link_commit_session("abc123", "s3", None, None).unwrap();
2532
2533 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2534 assert_eq!(sessions.len(), 3);
2535 assert_eq!(sessions[0].id, "s3");
2537 assert_eq!(sessions[1].id, "s2");
2538 assert_eq!(sessions[2].id, "s1");
2539 }
2540
2541 #[test]
2542 fn test_get_commits_by_session() {
2543 let db = test_db();
2544 seed_sessions(&db);
2545 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2547 .unwrap();
2548 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2549 .unwrap();
2550 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2551 .unwrap();
2552
2553 let commits = db.get_commits_by_session("s1").unwrap();
2554 assert_eq!(commits.len(), 3);
2555 assert!(commits.iter().all(|c| c.session_id == "s1"));
2557 }
2558
2559 #[test]
2560 fn test_duplicate_link_ignored() {
2561 let db = test_db();
2562 seed_sessions(&db);
2563 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2564 .unwrap();
2565 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2567 .unwrap();
2568
2569 let commits = db.get_commits_by_session("s1").unwrap();
2570 assert_eq!(commits.len(), 1);
2571 }
2572
2573 #[test]
2574 fn test_log_filter_by_commit() {
2575 let db = test_db();
2576 seed_sessions(&db);
2577 db.link_commit_session("abc123", "s2", None, None).unwrap();
2578 db.link_commit_session("abc123", "s4", None, None).unwrap();
2579
2580 let filter = LogFilter {
2581 commit: Some("abc123".to_string()),
2582 ..Default::default()
2583 };
2584 let results = db.list_sessions_log(&filter).unwrap();
2585 assert_eq!(results.len(), 2);
2586 assert_eq!(results[0].id, "s4");
2587 assert_eq!(results[1].id, "s2");
2588
2589 let filter = LogFilter {
2591 commit: Some("nonexistent".to_string()),
2592 ..Default::default()
2593 };
2594 let results = db.list_sessions_log(&filter).unwrap();
2595 assert_eq!(results.len(), 0);
2596 }
2597}