1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16const REMOTE_MIGRATIONS: &[Migration] = &[
18 ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19 (
20 "0002_team_invite_keys",
21 include_str!("../migrations/0002_team_invite_keys.sql"),
22 ),
23 (
24 "0003_max_active_agents",
25 include_str!("../migrations/0003_max_active_agents.sql"),
26 ),
27 (
28 "0004_oauth_states_provider",
29 include_str!("../migrations/0004_oauth_states_provider.sql"),
30 ),
31 (
32 "0005_sessions_body_url_backfill",
33 include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34 ),
35 (
36 "0006_sessions_remove_fk_constraints",
37 include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38 ),
39 (
40 "0007_sessions_list_perf_indexes",
41 include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42 ),
43 (
44 "0008_teams_force_public",
45 include_str!("../migrations/0008_teams_force_public.sql"),
46 ),
47 (
48 "0009_session_score_plugin",
49 include_str!("../migrations/0009_session_score_plugin.sql"),
50 ),
51 (
52 "0010_api_keys_issuance",
53 include_str!("../migrations/0010_api_keys_issuance.sql"),
54 ),
55];
56
57const LOCAL_MIGRATIONS: &[Migration] = &[
58 (
59 "local_0001_schema",
60 include_str!("../migrations/local_0001_schema.sql"),
61 ),
62 (
63 "local_0002_drop_unused_local_sessions",
64 include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
65 ),
66 (
67 "local_0003_timeline_summary_cache",
68 include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
69 ),
70];
71
72#[derive(Debug, Clone)]
74pub struct LocalSessionRow {
75 pub id: String,
76 pub source_path: Option<String>,
77 pub sync_status: String,
78 pub last_synced_at: Option<String>,
79 pub user_id: Option<String>,
80 pub nickname: Option<String>,
81 pub team_id: Option<String>,
82 pub tool: String,
83 pub agent_provider: Option<String>,
84 pub agent_model: Option<String>,
85 pub title: Option<String>,
86 pub description: Option<String>,
87 pub tags: Option<String>,
88 pub created_at: String,
89 pub uploaded_at: Option<String>,
90 pub message_count: i64,
91 pub user_message_count: i64,
92 pub task_count: i64,
93 pub event_count: i64,
94 pub duration_seconds: i64,
95 pub total_input_tokens: i64,
96 pub total_output_tokens: i64,
97 pub git_remote: Option<String>,
98 pub git_branch: Option<String>,
99 pub git_commit: Option<String>,
100 pub git_repo_name: Option<String>,
101 pub pr_number: Option<i64>,
102 pub pr_url: Option<String>,
103 pub working_directory: Option<String>,
104 pub files_modified: Option<String>,
105 pub files_read: Option<String>,
106 pub has_errors: bool,
107 pub max_active_agents: i64,
108}
109
110#[derive(Debug, Clone)]
112pub struct CommitLink {
113 pub commit_hash: String,
114 pub session_id: String,
115 pub repo_path: Option<String>,
116 pub branch: Option<String>,
117 pub created_at: String,
118}
119
120#[derive(Debug, Clone)]
122pub struct TimelineSummaryCacheRow {
123 pub lookup_key: String,
124 pub namespace: String,
125 pub compact: String,
126 pub payload: String,
127 pub raw: String,
128 pub cached_at: String,
129}
130
131pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
133 if row.tool != "opencode" {
134 return false;
135 }
136
137 if row.user_message_count <= 0
141 && row.message_count <= 4
142 && row.task_count <= 4
143 && row.event_count > 0
144 && row.event_count <= 16
145 {
146 return true;
147 }
148
149 if is_opencode_subagent_source(row.source_path.as_deref()) {
150 return true;
151 }
152
153 let source_path = match row.source_path.as_deref() {
154 Some(path) if !path.trim().is_empty() => path,
155 _ => return false,
156 };
157
158 parse_opencode_parent_session_id(source_path)
159 .is_some_and(|parent_id| !parent_id.trim().is_empty())
160}
161
162pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
164 let text = fs::read_to_string(source_path).ok()?;
165 let json: Value = serde_json::from_str(&text).ok()?;
166 lookup_parent_session_id(&json)
167}
168
169fn lookup_parent_session_id(value: &Value) -> Option<String> {
170 match value {
171 Value::Object(obj) => {
172 for (key, value) in obj {
173 if is_parent_id_key(key) {
174 if let Some(parent_id) = value.as_str() {
175 let parent_id = parent_id.trim();
176 if !parent_id.is_empty() {
177 return Some(parent_id.to_string());
178 }
179 }
180 }
181 if let Some(parent_id) = lookup_parent_session_id(value) {
182 return Some(parent_id);
183 }
184 }
185 None
186 }
187 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
188 _ => None,
189 }
190}
191
192fn is_parent_id_key(key: &str) -> bool {
193 let flat = key
194 .chars()
195 .filter(|c| c.is_ascii_alphanumeric())
196 .map(|c| c.to_ascii_lowercase())
197 .collect::<String>();
198
199 flat == "parentid"
200 || flat == "parentuuid"
201 || flat == "parentsessionid"
202 || flat == "parentsessionuuid"
203 || flat.ends_with("parentsessionid")
204 || (flat.contains("parent") && flat.ends_with("id"))
205 || (flat.contains("parent") && flat.ends_with("uuid"))
206}
207
208pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
210 rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
211 rows
212}
213
214fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
215 is_subagent_source(source_path)
216}
217
218fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
219 if row.tool != "claude-code" {
220 return false;
221 }
222
223 is_subagent_source(row.source_path.as_deref())
224}
225
226fn is_subagent_source(source_path: Option<&str>) -> bool {
227 let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
228 return false;
229 };
230
231 if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
232 return true;
233 }
234
235 let filename = match std::path::Path::new(&source_path).file_name() {
236 Some(name) => name.to_string_lossy(),
237 None => return false,
238 };
239
240 filename.starts_with("agent-")
241 || filename.starts_with("agent_")
242 || filename.starts_with("subagent-")
243 || filename.starts_with("subagent_")
244}
245
246#[derive(Debug, Clone)]
248pub struct LocalSessionFilter {
249 pub team_id: Option<String>,
250 pub sync_status: Option<String>,
251 pub git_repo_name: Option<String>,
252 pub search: Option<String>,
253 pub tool: Option<String>,
254 pub sort: LocalSortOrder,
255 pub time_range: LocalTimeRange,
256 pub limit: Option<u32>,
257 pub offset: Option<u32>,
258}
259
260impl Default for LocalSessionFilter {
261 fn default() -> Self {
262 Self {
263 team_id: None,
264 sync_status: None,
265 git_repo_name: None,
266 search: None,
267 tool: None,
268 sort: LocalSortOrder::Recent,
269 time_range: LocalTimeRange::All,
270 limit: None,
271 offset: None,
272 }
273 }
274}
275
276#[derive(Debug, Clone, Default, PartialEq, Eq)]
278pub enum LocalSortOrder {
279 #[default]
280 Recent,
281 Popular,
282 Longest,
283}
284
285#[derive(Debug, Clone, Default, PartialEq, Eq)]
287pub enum LocalTimeRange {
288 Hours24,
289 Days7,
290 Days30,
291 #[default]
292 All,
293}
294
295#[derive(Debug, Clone)]
297pub struct RemoteSessionSummary {
298 pub id: String,
299 pub user_id: Option<String>,
300 pub nickname: Option<String>,
301 pub team_id: String,
302 pub tool: String,
303 pub agent_provider: Option<String>,
304 pub agent_model: Option<String>,
305 pub title: Option<String>,
306 pub description: Option<String>,
307 pub tags: Option<String>,
308 pub created_at: String,
309 pub uploaded_at: String,
310 pub message_count: i64,
311 pub task_count: i64,
312 pub event_count: i64,
313 pub duration_seconds: i64,
314 pub total_input_tokens: i64,
315 pub total_output_tokens: i64,
316 pub git_remote: Option<String>,
317 pub git_branch: Option<String>,
318 pub git_commit: Option<String>,
319 pub git_repo_name: Option<String>,
320 pub pr_number: Option<i64>,
321 pub pr_url: Option<String>,
322 pub working_directory: Option<String>,
323 pub files_modified: Option<String>,
324 pub files_read: Option<String>,
325 pub has_errors: bool,
326 pub max_active_agents: i64,
327}
328
329#[derive(Debug, Default)]
331pub struct LogFilter {
332 pub tool: Option<String>,
334 pub model: Option<String>,
336 pub since: Option<String>,
338 pub before: Option<String>,
340 pub touches: Option<String>,
342 pub grep: Option<String>,
344 pub has_errors: Option<bool>,
346 pub working_directory: Option<String>,
348 pub git_repo_name: Option<String>,
350 pub commit: Option<String>,
352 pub limit: Option<u32>,
354 pub offset: Option<u32>,
356}
357
358const FROM_CLAUSE: &str = "\
360FROM sessions s \
361LEFT JOIN session_sync ss ON ss.session_id = s.id \
362LEFT JOIN users u ON u.id = s.user_id";
363
364pub struct LocalDb {
368 conn: Mutex<Connection>,
369}
370
371impl LocalDb {
372 pub fn open() -> Result<Self> {
375 let path = default_db_path()?;
376 Self::open_path(&path)
377 }
378
379 pub fn open_path(path: &PathBuf) -> Result<Self> {
381 if let Some(parent) = path.parent() {
382 std::fs::create_dir_all(parent)
383 .with_context(|| format!("create dir for {}", path.display()))?;
384 }
385 match open_connection_with_latest_schema(path) {
386 Ok(conn) => Ok(Self {
387 conn: Mutex::new(conn),
388 }),
389 Err(err) => {
390 if !is_schema_compat_error(&err) {
391 return Err(err);
392 }
393
394 rotate_legacy_db(path)?;
397
398 let conn = open_connection_with_latest_schema(path)
399 .with_context(|| format!("recreate db {}", path.display()))?;
400 Ok(Self {
401 conn: Mutex::new(conn),
402 })
403 }
404 }
405 }
406
407 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
408 self.conn.lock().expect("local db mutex poisoned")
409 }
410
411 pub fn upsert_local_session(
414 &self,
415 session: &Session,
416 source_path: &str,
417 git: &GitContext,
418 ) -> Result<()> {
419 let title = session.context.title.as_deref();
420 let description = session.context.description.as_deref();
421 let tags = if session.context.tags.is_empty() {
422 None
423 } else {
424 Some(session.context.tags.join(","))
425 };
426 let created_at = session.context.created_at.to_rfc3339();
427 let cwd = session
428 .context
429 .attributes
430 .get("cwd")
431 .or_else(|| session.context.attributes.get("working_directory"))
432 .and_then(|v| v.as_str().map(String::from));
433
434 let (files_modified, files_read, has_errors) =
436 opensession_core::extract::extract_file_metadata(session);
437 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
438
439 let conn = self.conn();
440 conn.execute(
443 "INSERT INTO sessions \
444 (id, team_id, tool, agent_provider, agent_model, \
445 title, description, tags, created_at, \
446 message_count, user_message_count, task_count, event_count, duration_seconds, \
447 total_input_tokens, total_output_tokens, body_storage_key, \
448 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
449 files_modified, files_read, has_errors, max_active_agents) \
450 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
451 ON CONFLICT(id) DO UPDATE SET \
452 tool=excluded.tool, agent_provider=excluded.agent_provider, \
453 agent_model=excluded.agent_model, \
454 title=excluded.title, description=excluded.description, \
455 tags=excluded.tags, \
456 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
457 task_count=excluded.task_count, \
458 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
459 total_input_tokens=excluded.total_input_tokens, \
460 total_output_tokens=excluded.total_output_tokens, \
461 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
462 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
463 working_directory=excluded.working_directory, \
464 files_modified=excluded.files_modified, files_read=excluded.files_read, \
465 has_errors=excluded.has_errors, \
466 max_active_agents=excluded.max_active_agents",
467 params![
468 &session.session_id,
469 &session.agent.tool,
470 &session.agent.provider,
471 &session.agent.model,
472 title,
473 description,
474 &tags,
475 &created_at,
476 session.stats.message_count as i64,
477 session.stats.user_message_count as i64,
478 session.stats.task_count as i64,
479 session.stats.event_count as i64,
480 session.stats.duration_seconds as i64,
481 session.stats.total_input_tokens as i64,
482 session.stats.total_output_tokens as i64,
483 &git.remote,
484 &git.branch,
485 &git.commit,
486 &git.repo_name,
487 &cwd,
488 &files_modified,
489 &files_read,
490 has_errors,
491 max_active_agents,
492 ],
493 )?;
494
495 conn.execute(
496 "INSERT INTO session_sync (session_id, source_path, sync_status) \
497 VALUES (?1, ?2, 'local_only') \
498 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
499 params![&session.session_id, source_path],
500 )?;
501 Ok(())
502 }
503
504 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
507 let conn = self.conn();
508 conn.execute(
511 "INSERT INTO sessions \
512 (id, user_id, team_id, tool, agent_provider, agent_model, \
513 title, description, tags, created_at, uploaded_at, \
514 message_count, task_count, event_count, duration_seconds, \
515 total_input_tokens, total_output_tokens, body_storage_key, \
516 git_remote, git_branch, git_commit, git_repo_name, \
517 pr_number, pr_url, working_directory, \
518 files_modified, files_read, has_errors, max_active_agents) \
519 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
520 ON CONFLICT(id) DO UPDATE SET \
521 title=excluded.title, description=excluded.description, \
522 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
523 message_count=excluded.message_count, task_count=excluded.task_count, \
524 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
525 total_input_tokens=excluded.total_input_tokens, \
526 total_output_tokens=excluded.total_output_tokens, \
527 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
528 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
529 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
530 working_directory=excluded.working_directory, \
531 files_modified=excluded.files_modified, files_read=excluded.files_read, \
532 has_errors=excluded.has_errors, \
533 max_active_agents=excluded.max_active_agents",
534 params![
535 &summary.id,
536 &summary.user_id,
537 &summary.team_id,
538 &summary.tool,
539 &summary.agent_provider,
540 &summary.agent_model,
541 &summary.title,
542 &summary.description,
543 &summary.tags,
544 &summary.created_at,
545 &summary.uploaded_at,
546 summary.message_count,
547 summary.task_count,
548 summary.event_count,
549 summary.duration_seconds,
550 summary.total_input_tokens,
551 summary.total_output_tokens,
552 &summary.git_remote,
553 &summary.git_branch,
554 &summary.git_commit,
555 &summary.git_repo_name,
556 summary.pr_number,
557 &summary.pr_url,
558 &summary.working_directory,
559 &summary.files_modified,
560 &summary.files_read,
561 summary.has_errors,
562 summary.max_active_agents,
563 ],
564 )?;
565
566 conn.execute(
567 "INSERT INTO session_sync (session_id, sync_status) \
568 VALUES (?1, 'remote_only') \
569 ON CONFLICT(session_id) DO UPDATE SET \
570 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
571 params![&summary.id],
572 )?;
573 Ok(())
574 }
575
576 fn build_local_session_where_clause(
579 filter: &LocalSessionFilter,
580 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
581 let mut where_clauses = vec![
582 "1=1".to_string(),
583 "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
584 "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
585 "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
586 ];
587 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
588 let mut idx = 1u32;
589
590 if let Some(ref team_id) = filter.team_id {
591 where_clauses.push(format!("s.team_id = ?{idx}"));
592 param_values.push(Box::new(team_id.clone()));
593 idx += 1;
594 }
595
596 if let Some(ref sync_status) = filter.sync_status {
597 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
598 param_values.push(Box::new(sync_status.clone()));
599 idx += 1;
600 }
601
602 if let Some(ref repo) = filter.git_repo_name {
603 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
604 param_values.push(Box::new(repo.clone()));
605 idx += 1;
606 }
607
608 if let Some(ref tool) = filter.tool {
609 where_clauses.push(format!("s.tool = ?{idx}"));
610 param_values.push(Box::new(tool.clone()));
611 idx += 1;
612 }
613
614 if let Some(ref search) = filter.search {
615 let like = format!("%{search}%");
616 where_clauses.push(format!(
617 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
618 i1 = idx,
619 i2 = idx + 1,
620 i3 = idx + 2,
621 ));
622 param_values.push(Box::new(like.clone()));
623 param_values.push(Box::new(like.clone()));
624 param_values.push(Box::new(like));
625 idx += 3;
626 }
627
628 let interval = match filter.time_range {
629 LocalTimeRange::Hours24 => Some("-1 day"),
630 LocalTimeRange::Days7 => Some("-7 days"),
631 LocalTimeRange::Days30 => Some("-30 days"),
632 LocalTimeRange::All => None,
633 };
634 if let Some(interval) = interval {
635 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
636 param_values.push(Box::new(interval.to_string()));
637 }
638
639 (where_clauses.join(" AND "), param_values)
640 }
641
642 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
643 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
644 let order_clause = match filter.sort {
645 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
646 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
647 LocalSortOrder::Recent => "s.created_at DESC",
648 };
649
650 let mut sql = format!(
651 "SELECT {LOCAL_SESSION_COLUMNS} \
652 {FROM_CLAUSE} WHERE {where_str} \
653 ORDER BY {order_clause}"
654 );
655
656 if let Some(limit) = filter.limit {
657 sql.push_str(" LIMIT ?");
658 param_values.push(Box::new(limit));
659 if let Some(offset) = filter.offset {
660 sql.push_str(" OFFSET ?");
661 param_values.push(Box::new(offset));
662 }
663 }
664
665 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
666 param_values.iter().map(|p| p.as_ref()).collect();
667 let conn = self.conn();
668 let mut stmt = conn.prepare(&sql)?;
669 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
670
671 let mut result = Vec::new();
672 for row in rows {
673 result.push(row?);
674 }
675
676 Ok(hide_opencode_child_sessions(result))
677 }
678
679 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
681 let mut count_filter = filter.clone();
682 count_filter.limit = None;
683 count_filter.offset = None;
684 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
685 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
686 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
687 param_values.iter().map(|p| p.as_ref()).collect();
688 let conn = self.conn();
689 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
690 Ok(count)
691 }
692
693 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
695 let mut tool_filter = filter.clone();
696 tool_filter.tool = None;
697 tool_filter.limit = None;
698 tool_filter.offset = None;
699 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
700 let sql = format!(
701 "SELECT DISTINCT s.tool \
702 {FROM_CLAUSE} WHERE {where_str} \
703 ORDER BY s.tool ASC"
704 );
705 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
706 param_values.iter().map(|p| p.as_ref()).collect();
707 let conn = self.conn();
708 let mut stmt = conn.prepare(&sql)?;
709 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
710
711 let mut tools = Vec::new();
712 for row in rows {
713 let tool = row?;
714 if !tool.trim().is_empty() {
715 tools.push(tool);
716 }
717 }
718 Ok(tools)
719 }
720
721 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
725 let mut where_clauses = vec!["1=1".to_string()];
726 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
727 let mut idx = 1u32;
728
729 if let Some(ref tool) = filter.tool {
730 where_clauses.push(format!("s.tool = ?{idx}"));
731 param_values.push(Box::new(tool.clone()));
732 idx += 1;
733 }
734
735 if let Some(ref model) = filter.model {
736 let like = model.replace('*', "%");
737 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
738 param_values.push(Box::new(like));
739 idx += 1;
740 }
741
742 if let Some(ref since) = filter.since {
743 where_clauses.push(format!("s.created_at >= ?{idx}"));
744 param_values.push(Box::new(since.clone()));
745 idx += 1;
746 }
747
748 if let Some(ref before) = filter.before {
749 where_clauses.push(format!("s.created_at < ?{idx}"));
750 param_values.push(Box::new(before.clone()));
751 idx += 1;
752 }
753
754 if let Some(ref touches) = filter.touches {
755 let like = format!("%\"{touches}\"%");
756 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
757 param_values.push(Box::new(like));
758 idx += 1;
759 }
760
761 if let Some(ref grep) = filter.grep {
762 let like = format!("%{grep}%");
763 where_clauses.push(format!(
764 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
765 i1 = idx,
766 i2 = idx + 1,
767 i3 = idx + 2,
768 ));
769 param_values.push(Box::new(like.clone()));
770 param_values.push(Box::new(like.clone()));
771 param_values.push(Box::new(like));
772 idx += 3;
773 }
774
775 if let Some(true) = filter.has_errors {
776 where_clauses.push("s.has_errors = 1".to_string());
777 }
778
779 if let Some(ref wd) = filter.working_directory {
780 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
781 param_values.push(Box::new(format!("{wd}%")));
782 idx += 1;
783 }
784
785 if let Some(ref repo) = filter.git_repo_name {
786 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
787 param_values.push(Box::new(repo.clone()));
788 idx += 1;
789 }
790
791 let mut extra_join = String::new();
793 if let Some(ref commit) = filter.commit {
794 extra_join =
795 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
796 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
797 param_values.push(Box::new(commit.clone()));
798 idx += 1;
799 }
800
801 let _ = idx; let where_str = where_clauses.join(" AND ");
804 let mut sql = format!(
805 "SELECT {LOCAL_SESSION_COLUMNS} \
806 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
807 ORDER BY s.created_at DESC"
808 );
809
810 if let Some(limit) = filter.limit {
811 sql.push_str(" LIMIT ?");
812 param_values.push(Box::new(limit));
813 if let Some(offset) = filter.offset {
814 sql.push_str(" OFFSET ?");
815 param_values.push(Box::new(offset));
816 }
817 }
818
819 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
820 param_values.iter().map(|p| p.as_ref()).collect();
821 let conn = self.conn();
822 let mut stmt = conn.prepare(&sql)?;
823 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
824
825 let mut result = Vec::new();
826 for row in rows {
827 result.push(row?);
828 }
829 Ok(hide_opencode_child_sessions(result))
830 }
831
832 pub fn get_sessions_by_tool_latest(
834 &self,
835 tool: &str,
836 count: u32,
837 ) -> Result<Vec<LocalSessionRow>> {
838 let sql = format!(
839 "SELECT {LOCAL_SESSION_COLUMNS} \
840 {FROM_CLAUSE} WHERE s.tool = ?1 \
841 ORDER BY s.created_at DESC"
842 );
843 let conn = self.conn();
844 let mut stmt = conn.prepare(&sql)?;
845 let rows = stmt.query_map(params![tool], row_to_local_session)?;
846 let mut result = Vec::new();
847 for row in rows {
848 result.push(row?);
849 }
850
851 let mut filtered = hide_opencode_child_sessions(result);
852 filtered.truncate(count as usize);
853 Ok(filtered)
854 }
855
856 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
858 let sql = format!(
859 "SELECT {LOCAL_SESSION_COLUMNS} \
860 {FROM_CLAUSE} \
861 ORDER BY s.created_at DESC"
862 );
863 let conn = self.conn();
864 let mut stmt = conn.prepare(&sql)?;
865 let rows = stmt.query_map([], row_to_local_session)?;
866 let mut result = Vec::new();
867 for row in rows {
868 result.push(row?);
869 }
870
871 let mut filtered = hide_opencode_child_sessions(result);
872 filtered.truncate(count as usize);
873 Ok(filtered)
874 }
875
876 pub fn get_session_by_tool_offset(
878 &self,
879 tool: &str,
880 offset: u32,
881 ) -> Result<Option<LocalSessionRow>> {
882 let sql = format!(
883 "SELECT {LOCAL_SESSION_COLUMNS} \
884 {FROM_CLAUSE} WHERE s.tool = ?1 \
885 ORDER BY s.created_at DESC"
886 );
887 let conn = self.conn();
888 let mut stmt = conn.prepare(&sql)?;
889 let rows = stmt.query_map(params![tool], row_to_local_session)?;
890 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
891 Ok(filtered.into_iter().nth(offset as usize))
892 }
893
894 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
896 let sql = format!(
897 "SELECT {LOCAL_SESSION_COLUMNS} \
898 {FROM_CLAUSE} \
899 ORDER BY s.created_at DESC"
900 );
901 let conn = self.conn();
902 let mut stmt = conn.prepare(&sql)?;
903 let rows = stmt.query_map([], row_to_local_session)?;
904 let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
905 Ok(filtered.into_iter().nth(offset as usize))
906 }
907
908 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
910 let conn = self.conn();
911 let result = conn
912 .query_row(
913 "SELECT source_path FROM session_sync WHERE session_id = ?1",
914 params![session_id],
915 |row| row.get(0),
916 )
917 .optional()?;
918
919 Ok(result)
920 }
921
922 pub fn session_count(&self) -> Result<i64> {
924 let count = self
925 .conn()
926 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
927 Ok(count)
928 }
929
930 pub fn delete_session(&self, session_id: &str) -> Result<()> {
933 let conn = self.conn();
934 conn.execute(
935 "DELETE FROM body_cache WHERE session_id = ?1",
936 params![session_id],
937 )?;
938 conn.execute(
939 "DELETE FROM session_sync WHERE session_id = ?1",
940 params![session_id],
941 )?;
942 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
943 Ok(())
944 }
945
946 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
949 let cursor = self
950 .conn()
951 .query_row(
952 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
953 params![team_id],
954 |row| row.get(0),
955 )
956 .optional()?;
957 Ok(cursor)
958 }
959
960 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
961 self.conn().execute(
962 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
963 VALUES (?1, ?2, datetime('now')) \
964 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
965 params![team_id, cursor],
966 )?;
967 Ok(())
968 }
969
970 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
974 let sql = format!(
975 "SELECT {LOCAL_SESSION_COLUMNS} \
976 FROM sessions s \
977 INNER JOIN session_sync ss ON ss.session_id = s.id \
978 LEFT JOIN users u ON u.id = s.user_id \
979 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
980 ORDER BY s.created_at ASC"
981 );
982 let conn = self.conn();
983 let mut stmt = conn.prepare(&sql)?;
984 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
985 let mut result = Vec::new();
986 for row in rows {
987 result.push(row?);
988 }
989 Ok(result)
990 }
991
992 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
993 self.conn().execute(
994 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
995 WHERE session_id = ?1",
996 params![session_id],
997 )?;
998 Ok(())
999 }
1000
1001 pub fn was_uploaded_after(
1003 &self,
1004 source_path: &str,
1005 modified: &chrono::DateTime<chrono::Utc>,
1006 ) -> Result<bool> {
1007 let result: Option<String> = self
1008 .conn()
1009 .query_row(
1010 "SELECT last_synced_at FROM session_sync \
1011 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1012 params![source_path],
1013 |row| row.get(0),
1014 )
1015 .optional()?;
1016
1017 if let Some(synced_at) = result {
1018 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1019 return Ok(dt >= *modified);
1020 }
1021 }
1022 Ok(false)
1023 }
1024
1025 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1028 self.conn().execute(
1029 "INSERT INTO body_cache (session_id, body, cached_at) \
1030 VALUES (?1, ?2, datetime('now')) \
1031 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1032 params![session_id, body],
1033 )?;
1034 Ok(())
1035 }
1036
1037 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1038 let body = self
1039 .conn()
1040 .query_row(
1041 "SELECT body FROM body_cache WHERE session_id = ?1",
1042 params![session_id],
1043 |row| row.get(0),
1044 )
1045 .optional()?;
1046 Ok(body)
1047 }
1048
1049 pub fn upsert_timeline_summary_cache(
1052 &self,
1053 lookup_key: &str,
1054 namespace: &str,
1055 compact: &str,
1056 payload: &str,
1057 raw: &str,
1058 ) -> Result<()> {
1059 self.conn().execute(
1060 "INSERT INTO timeline_summary_cache \
1061 (lookup_key, namespace, compact, payload, raw, cached_at) \
1062 VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1063 ON CONFLICT(lookup_key) DO UPDATE SET \
1064 namespace = excluded.namespace, \
1065 compact = excluded.compact, \
1066 payload = excluded.payload, \
1067 raw = excluded.raw, \
1068 cached_at = datetime('now')",
1069 params![lookup_key, namespace, compact, payload, raw],
1070 )?;
1071 Ok(())
1072 }
1073
1074 pub fn list_timeline_summary_cache_by_namespace(
1075 &self,
1076 namespace: &str,
1077 ) -> Result<Vec<TimelineSummaryCacheRow>> {
1078 let conn = self.conn();
1079 let mut stmt = conn.prepare(
1080 "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1081 FROM timeline_summary_cache \
1082 WHERE namespace = ?1 \
1083 ORDER BY cached_at DESC",
1084 )?;
1085 let rows = stmt.query_map(params![namespace], |row| {
1086 Ok(TimelineSummaryCacheRow {
1087 lookup_key: row.get(0)?,
1088 namespace: row.get(1)?,
1089 compact: row.get(2)?,
1090 payload: row.get(3)?,
1091 raw: row.get(4)?,
1092 cached_at: row.get(5)?,
1093 })
1094 })?;
1095
1096 let mut out = Vec::new();
1097 for row in rows {
1098 out.push(row?);
1099 }
1100 Ok(out)
1101 }
1102
1103 pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1104 let affected = self
1105 .conn()
1106 .execute("DELETE FROM timeline_summary_cache", [])?;
1107 Ok(affected)
1108 }
1109
1110 pub fn migrate_from_state_json(
1115 &self,
1116 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1117 ) -> Result<usize> {
1118 let mut count = 0;
1119 for (path, uploaded_at) in uploaded {
1120 let exists: bool = self
1121 .conn()
1122 .query_row(
1123 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1124 params![path],
1125 |row| row.get(0),
1126 )
1127 .unwrap_or(false);
1128
1129 if exists {
1130 self.conn().execute(
1131 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1132 WHERE source_path = ?2 AND sync_status = 'local_only'",
1133 params![uploaded_at.to_rfc3339(), path],
1134 )?;
1135 count += 1;
1136 }
1137 }
1138 Ok(count)
1139 }
1140
1141 pub fn link_commit_session(
1145 &self,
1146 commit_hash: &str,
1147 session_id: &str,
1148 repo_path: Option<&str>,
1149 branch: Option<&str>,
1150 ) -> Result<()> {
1151 self.conn().execute(
1152 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1153 VALUES (?1, ?2, ?3, ?4) \
1154 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1155 params![commit_hash, session_id, repo_path, branch],
1156 )?;
1157 Ok(())
1158 }
1159
1160 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1162 let sql = format!(
1163 "SELECT {LOCAL_SESSION_COLUMNS} \
1164 {FROM_CLAUSE} \
1165 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1166 WHERE csl.commit_hash = ?1 \
1167 ORDER BY s.created_at DESC"
1168 );
1169 let conn = self.conn();
1170 let mut stmt = conn.prepare(&sql)?;
1171 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1172 let mut result = Vec::new();
1173 for row in rows {
1174 result.push(row?);
1175 }
1176 Ok(result)
1177 }
1178
1179 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1181 let conn = self.conn();
1182 let mut stmt = conn.prepare(
1183 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1184 FROM commit_session_links WHERE session_id = ?1 \
1185 ORDER BY created_at DESC",
1186 )?;
1187 let rows = stmt.query_map(params![session_id], |row| {
1188 Ok(CommitLink {
1189 commit_hash: row.get(0)?,
1190 session_id: row.get(1)?,
1191 repo_path: row.get(2)?,
1192 branch: row.get(3)?,
1193 created_at: row.get(4)?,
1194 })
1195 })?;
1196 let mut result = Vec::new();
1197 for row in rows {
1198 result.push(row?);
1199 }
1200 Ok(result)
1201 }
1202
1203 pub fn find_active_session_for_repo(
1207 &self,
1208 repo_path: &str,
1209 since_minutes: u32,
1210 ) -> Result<Option<LocalSessionRow>> {
1211 let sql = format!(
1212 "SELECT {LOCAL_SESSION_COLUMNS} \
1213 {FROM_CLAUSE} \
1214 WHERE s.working_directory LIKE ?1 \
1215 AND s.created_at >= datetime('now', ?2) \
1216 ORDER BY s.created_at DESC LIMIT 1"
1217 );
1218 let since = format!("-{since_minutes} minutes");
1219 let like = format!("{repo_path}%");
1220 let conn = self.conn();
1221 let mut stmt = conn.prepare(&sql)?;
1222 let row = stmt
1223 .query_map(params![like, since], row_to_local_session)?
1224 .next()
1225 .transpose()?;
1226 Ok(row)
1227 }
1228
1229 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1231 let conn = self.conn();
1232 let mut stmt = conn
1233 .prepare("SELECT id FROM sessions")
1234 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1235 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1236 let mut set = std::collections::HashSet::new();
1237 if let Ok(rows) = rows {
1238 for row in rows.flatten() {
1239 set.insert(row);
1240 }
1241 }
1242 set
1243 }
1244
1245 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1247 let title = session.context.title.as_deref();
1248 let description = session.context.description.as_deref();
1249 let (files_modified, files_read, has_errors) =
1250 opensession_core::extract::extract_file_metadata(session);
1251 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1252
1253 self.conn().execute(
1254 "UPDATE sessions SET \
1255 title=?2, description=?3, \
1256 message_count=?4, user_message_count=?5, task_count=?6, \
1257 event_count=?7, duration_seconds=?8, \
1258 total_input_tokens=?9, total_output_tokens=?10, \
1259 files_modified=?11, files_read=?12, has_errors=?13, \
1260 max_active_agents=?14 \
1261 WHERE id=?1",
1262 params![
1263 &session.session_id,
1264 title,
1265 description,
1266 session.stats.message_count as i64,
1267 session.stats.user_message_count as i64,
1268 session.stats.task_count as i64,
1269 session.stats.event_count as i64,
1270 session.stats.duration_seconds as i64,
1271 session.stats.total_input_tokens as i64,
1272 session.stats.total_output_tokens as i64,
1273 &files_modified,
1274 &files_read,
1275 has_errors,
1276 max_active_agents,
1277 ],
1278 )?;
1279 Ok(())
1280 }
1281
1282 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1284 self.conn().execute(
1285 "INSERT INTO session_sync (session_id, source_path) \
1286 VALUES (?1, ?2) \
1287 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1288 params![session_id, source_path],
1289 )?;
1290 Ok(())
1291 }
1292
1293 pub fn list_repos(&self) -> Result<Vec<String>> {
1295 let conn = self.conn();
1296 let mut stmt = conn.prepare(
1297 "SELECT DISTINCT git_repo_name FROM sessions \
1298 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1299 )?;
1300 let rows = stmt.query_map([], |row| row.get(0))?;
1301 let mut result = Vec::new();
1302 for row in rows {
1303 result.push(row?);
1304 }
1305 Ok(result)
1306 }
1307
1308 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1310 let conn = self.conn();
1311 let mut stmt = conn.prepare(
1312 "SELECT DISTINCT working_directory FROM sessions \
1313 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1314 ORDER BY working_directory ASC",
1315 )?;
1316 let rows = stmt.query_map([], |row| row.get(0))?;
1317 let mut result = Vec::new();
1318 for row in rows {
1319 result.push(row?);
1320 }
1321 Ok(result)
1322 }
1323}
1324
1325fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1328 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1329 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1330
1331 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1333
1334 apply_local_migrations(&conn)?;
1335
1336 ensure_sessions_columns(&conn)?;
1339 validate_local_schema(&conn)?;
1340
1341 Ok(conn)
1342}
1343
1344fn apply_local_migrations(conn: &Connection) -> Result<()> {
1345 conn.execute_batch(
1346 "CREATE TABLE IF NOT EXISTS _migrations (
1347 id INTEGER PRIMARY KEY,
1348 name TEXT NOT NULL UNIQUE,
1349 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1350 );",
1351 )
1352 .context("create _migrations table for local db")?;
1353
1354 for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1355 let already_applied: bool = conn
1356 .query_row(
1357 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1358 [name],
1359 |row| row.get(0),
1360 )
1361 .unwrap_or(false);
1362
1363 if already_applied {
1364 continue;
1365 }
1366
1367 if let Err(e) = conn.execute_batch(sql) {
1368 let msg = e.to_string().to_ascii_lowercase();
1369 if !is_local_migration_compat_error(&msg) {
1370 return Err(e).with_context(|| format!("apply local migration {name}"));
1371 }
1372 }
1373
1374 conn.execute(
1375 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1376 [name],
1377 )
1378 .with_context(|| format!("record local migration {name}"))?;
1379 }
1380
1381 Ok(())
1382}
1383
1384fn is_local_migration_compat_error(msg: &str) -> bool {
1385 msg.contains("duplicate column name")
1386 || msg.contains("no such column")
1387 || msg.contains("already exists")
1388}
1389
1390fn validate_local_schema(conn: &Connection) -> Result<()> {
1391 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1392 conn.prepare(&sql)
1393 .map(|_| ())
1394 .context("validate local session schema")
1395}
1396
1397fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1398 let msg = format!("{err:#}").to_ascii_lowercase();
1399 msg.contains("no such column")
1400 || msg.contains("no such table")
1401 || msg.contains("cannot add a column")
1402 || msg.contains("already exists")
1403 || msg.contains("views may not be indexed")
1404 || msg.contains("malformed database schema")
1405 || msg.contains("duplicate column name")
1406}
1407
1408fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1409 if !path.exists() {
1410 return Ok(());
1411 }
1412
1413 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1414 let backup_name = format!(
1415 "{}.legacy-{}.bak",
1416 path.file_name()
1417 .and_then(|n| n.to_str())
1418 .unwrap_or("local.db"),
1419 ts
1420 );
1421 let backup_path = path.with_file_name(backup_name);
1422 std::fs::rename(path, &backup_path).with_context(|| {
1423 format!(
1424 "rotate local db backup {} -> {}",
1425 path.display(),
1426 backup_path.display()
1427 )
1428 })?;
1429
1430 let wal = PathBuf::from(format!("{}-wal", path.display()));
1431 let shm = PathBuf::from(format!("{}-shm", path.display()));
1432 let _ = std::fs::remove_file(wal);
1433 let _ = std::fs::remove_file(shm);
1434 Ok(())
1435}
1436
1437const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1438 ("user_id", "TEXT"),
1439 ("team_id", "TEXT DEFAULT 'personal'"),
1440 ("tool", "TEXT DEFAULT ''"),
1441 ("agent_provider", "TEXT"),
1442 ("agent_model", "TEXT"),
1443 ("title", "TEXT"),
1444 ("description", "TEXT"),
1445 ("tags", "TEXT"),
1446 ("created_at", "TEXT DEFAULT ''"),
1447 ("uploaded_at", "TEXT DEFAULT ''"),
1448 ("message_count", "INTEGER DEFAULT 0"),
1449 ("user_message_count", "INTEGER DEFAULT 0"),
1450 ("task_count", "INTEGER DEFAULT 0"),
1451 ("event_count", "INTEGER DEFAULT 0"),
1452 ("duration_seconds", "INTEGER DEFAULT 0"),
1453 ("total_input_tokens", "INTEGER DEFAULT 0"),
1454 ("total_output_tokens", "INTEGER DEFAULT 0"),
1455 ("body_storage_key", "TEXT DEFAULT ''"),
1457 ("body_url", "TEXT"),
1458 ("git_remote", "TEXT"),
1459 ("git_branch", "TEXT"),
1460 ("git_commit", "TEXT"),
1461 ("git_repo_name", "TEXT"),
1462 ("pr_number", "INTEGER"),
1463 ("pr_url", "TEXT"),
1464 ("working_directory", "TEXT"),
1465 ("files_modified", "TEXT"),
1466 ("files_read", "TEXT"),
1467 ("has_errors", "BOOLEAN DEFAULT 0"),
1468 ("max_active_agents", "INTEGER DEFAULT 1"),
1469];
1470
1471fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1472 let mut existing = HashSet::new();
1473 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1474 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1475 for row in rows {
1476 existing.insert(row?);
1477 }
1478
1479 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1480 if existing.contains(*name) {
1481 continue;
1482 }
1483 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1484 conn.execute_batch(&sql)
1485 .with_context(|| format!("add sessions column '{name}'"))?;
1486 }
1487
1488 Ok(())
1489}
1490
1491pub const LOCAL_SESSION_COLUMNS: &str = "\
1493s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1494s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1495s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1496s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1497s.total_input_tokens, s.total_output_tokens, \
1498s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1499s.pr_number, s.pr_url, s.working_directory, \
1500s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1501
1502fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1503 Ok(LocalSessionRow {
1504 id: row.get(0)?,
1505 source_path: row.get(1)?,
1506 sync_status: row.get(2)?,
1507 last_synced_at: row.get(3)?,
1508 user_id: row.get(4)?,
1509 nickname: row.get(5)?,
1510 team_id: row.get(6)?,
1511 tool: row.get(7)?,
1512 agent_provider: row.get(8)?,
1513 agent_model: row.get(9)?,
1514 title: row.get(10)?,
1515 description: row.get(11)?,
1516 tags: row.get(12)?,
1517 created_at: row.get(13)?,
1518 uploaded_at: row.get(14)?,
1519 message_count: row.get(15)?,
1520 user_message_count: row.get(16)?,
1521 task_count: row.get(17)?,
1522 event_count: row.get(18)?,
1523 duration_seconds: row.get(19)?,
1524 total_input_tokens: row.get(20)?,
1525 total_output_tokens: row.get(21)?,
1526 git_remote: row.get(22)?,
1527 git_branch: row.get(23)?,
1528 git_commit: row.get(24)?,
1529 git_repo_name: row.get(25)?,
1530 pr_number: row.get(26)?,
1531 pr_url: row.get(27)?,
1532 working_directory: row.get(28)?,
1533 files_modified: row.get(29)?,
1534 files_read: row.get(30)?,
1535 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1536 max_active_agents: row.get(32).unwrap_or(1),
1537 })
1538}
1539
1540fn default_db_path() -> Result<PathBuf> {
1541 let home = std::env::var("HOME")
1542 .or_else(|_| std::env::var("USERPROFILE"))
1543 .context("Could not determine home directory")?;
1544 Ok(PathBuf::from(home)
1545 .join(".local")
1546 .join("share")
1547 .join("opensession")
1548 .join("local.db"))
1549}
1550
1551#[cfg(test)]
1552mod tests {
1553 use super::*;
1554
1555 use std::collections::BTreeSet;
1556 use std::fs::{create_dir_all, write};
1557 use tempfile::tempdir;
1558
1559 fn test_db() -> LocalDb {
1560 let dir = tempdir().unwrap();
1561 let path = dir.keep().join("test.db");
1562 LocalDb::open_path(&path).unwrap()
1563 }
1564
1565 fn temp_root() -> tempfile::TempDir {
1566 tempdir().unwrap()
1567 }
1568
1569 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1570 LocalSessionRow {
1571 id: id.to_string(),
1572 source_path: source_path.map(String::from),
1573 sync_status: "local_only".to_string(),
1574 last_synced_at: None,
1575 user_id: None,
1576 nickname: None,
1577 team_id: None,
1578 tool: tool.to_string(),
1579 agent_provider: None,
1580 agent_model: None,
1581 title: Some("test".to_string()),
1582 description: None,
1583 tags: None,
1584 created_at: "2024-01-01T00:00:00Z".to_string(),
1585 uploaded_at: None,
1586 message_count: 0,
1587 user_message_count: 0,
1588 task_count: 0,
1589 event_count: 0,
1590 duration_seconds: 0,
1591 total_input_tokens: 0,
1592 total_output_tokens: 0,
1593 git_remote: None,
1594 git_branch: None,
1595 git_commit: None,
1596 git_repo_name: None,
1597 pr_number: None,
1598 pr_url: None,
1599 working_directory: None,
1600 files_modified: None,
1601 files_read: None,
1602 has_errors: false,
1603 max_active_agents: 1,
1604 }
1605 }
1606
1607 #[test]
1608 fn test_open_and_schema() {
1609 let _db = test_db();
1610 }
1611
1612 #[test]
1613 fn test_open_backfills_legacy_sessions_columns() {
1614 let dir = tempfile::tempdir().unwrap();
1615 let path = dir.path().join("legacy.db");
1616 {
1617 let conn = Connection::open(&path).unwrap();
1618 conn.execute_batch(
1619 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1620 INSERT INTO sessions (id) VALUES ('legacy-1');",
1621 )
1622 .unwrap();
1623 }
1624
1625 let db = LocalDb::open_path(&path).unwrap();
1626 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1627 assert_eq!(rows.len(), 1);
1628 assert_eq!(rows[0].id, "legacy-1");
1629 assert_eq!(rows[0].user_message_count, 0);
1630 }
1631
1632 #[test]
1633 fn test_open_rotates_incompatible_legacy_schema() {
1634 let dir = tempfile::tempdir().unwrap();
1635 let path = dir.path().join("broken.db");
1636 {
1637 let conn = Connection::open(&path).unwrap();
1638 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1639 .unwrap();
1640 }
1641
1642 let db = LocalDb::open_path(&path).unwrap();
1643 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1644 assert!(rows.is_empty());
1645
1646 let rotated = std::fs::read_dir(dir.path())
1647 .unwrap()
1648 .filter_map(Result::ok)
1649 .any(|entry| {
1650 let name = entry.file_name();
1651 let name = name.to_string_lossy();
1652 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1653 });
1654 assert!(rotated, "expected rotated legacy backup file");
1655 }
1656
1657 #[test]
1658 fn test_is_opencode_child_session() {
1659 let root = temp_root();
1660 let dir = root.path().join("sessions");
1661 create_dir_all(&dir).unwrap();
1662 let parent_session = dir.join("parent.json");
1663 write(
1664 &parent_session,
1665 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1666 )
1667 .unwrap();
1668 let child_session = dir.join("child.json");
1669 write(
1670 &child_session,
1671 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1672 )
1673 .unwrap();
1674
1675 let parent = make_row(
1676 "ses_parent",
1677 "opencode",
1678 Some(parent_session.to_str().unwrap()),
1679 );
1680 let child = make_row(
1681 "ses_child",
1682 "opencode",
1683 Some(child_session.to_str().unwrap()),
1684 );
1685 let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1686
1687 assert!(!is_opencode_child_session(&parent));
1688 assert!(is_opencode_child_session(&child));
1689 assert!(!is_opencode_child_session(&codex));
1690 }
1691
1692 #[test]
1693 fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1694 let child = LocalSessionRow {
1695 id: "sess_child".to_string(),
1696 source_path: None,
1697 sync_status: "local_only".to_string(),
1698 last_synced_at: None,
1699 user_id: None,
1700 nickname: None,
1701 team_id: None,
1702 tool: "opencode".to_string(),
1703 agent_provider: None,
1704 agent_model: None,
1705 title: None,
1706 description: None,
1707 tags: None,
1708 created_at: "2024-01-01T00:00:00Z".to_string(),
1709 uploaded_at: None,
1710 message_count: 1,
1711 user_message_count: 0,
1712 task_count: 4,
1713 event_count: 4,
1714 duration_seconds: 0,
1715 total_input_tokens: 0,
1716 total_output_tokens: 0,
1717 git_remote: None,
1718 git_branch: None,
1719 git_commit: None,
1720 git_repo_name: None,
1721 pr_number: None,
1722 pr_url: None,
1723 working_directory: None,
1724 files_modified: None,
1725 files_read: None,
1726 has_errors: false,
1727 max_active_agents: 1,
1728 };
1729
1730 let parent = LocalSessionRow {
1731 id: "sess_parent".to_string(),
1732 source_path: None,
1733 sync_status: "local_only".to_string(),
1734 last_synced_at: None,
1735 user_id: None,
1736 nickname: None,
1737 team_id: None,
1738 tool: "opencode".to_string(),
1739 agent_provider: None,
1740 agent_model: None,
1741 title: Some("regular".to_string()),
1742 description: None,
1743 tags: None,
1744 created_at: "2024-01-01T00:00:00Z".to_string(),
1745 uploaded_at: None,
1746 message_count: 1,
1747 user_message_count: 1,
1748 task_count: 2,
1749 event_count: 20,
1750 duration_seconds: 0,
1751 total_input_tokens: 0,
1752 total_output_tokens: 0,
1753 git_remote: None,
1754 git_branch: None,
1755 git_commit: None,
1756 git_repo_name: None,
1757 pr_number: None,
1758 pr_url: None,
1759 working_directory: None,
1760 files_modified: None,
1761 files_read: None,
1762 has_errors: false,
1763 max_active_agents: 1,
1764 };
1765
1766 assert!(is_opencode_child_session(&child));
1767 assert!(!is_opencode_child_session(&parent));
1768 }
1769
1770 #[test]
1771 fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1772 let child = LocalSessionRow {
1773 id: "sess_child_2".to_string(),
1774 source_path: None,
1775 sync_status: "local_only".to_string(),
1776 last_synced_at: None,
1777 user_id: None,
1778 nickname: None,
1779 team_id: None,
1780 tool: "opencode".to_string(),
1781 agent_provider: None,
1782 agent_model: None,
1783 title: None,
1784 description: None,
1785 tags: None,
1786 created_at: "2024-01-01T00:00:00Z".to_string(),
1787 uploaded_at: None,
1788 message_count: 2,
1789 user_message_count: 0,
1790 task_count: 4,
1791 event_count: 4,
1792 duration_seconds: 0,
1793 total_input_tokens: 0,
1794 total_output_tokens: 0,
1795 git_remote: None,
1796 git_branch: None,
1797 git_commit: None,
1798 git_repo_name: None,
1799 pr_number: None,
1800 pr_url: None,
1801 working_directory: None,
1802 files_modified: None,
1803 files_read: None,
1804 has_errors: false,
1805 max_active_agents: 1,
1806 };
1807
1808 let parent = LocalSessionRow {
1809 id: "sess_parent".to_string(),
1810 source_path: None,
1811 sync_status: "local_only".to_string(),
1812 last_synced_at: None,
1813 user_id: None,
1814 nickname: None,
1815 team_id: None,
1816 tool: "opencode".to_string(),
1817 agent_provider: None,
1818 agent_model: None,
1819 title: Some("regular".to_string()),
1820 description: None,
1821 tags: None,
1822 created_at: "2024-01-01T00:00:00Z".to_string(),
1823 uploaded_at: None,
1824 message_count: 2,
1825 user_message_count: 1,
1826 task_count: 5,
1827 event_count: 20,
1828 duration_seconds: 0,
1829 total_input_tokens: 0,
1830 total_output_tokens: 0,
1831 git_remote: None,
1832 git_branch: None,
1833 git_commit: None,
1834 git_repo_name: None,
1835 pr_number: None,
1836 pr_url: None,
1837 working_directory: None,
1838 files_modified: None,
1839 files_read: None,
1840 has_errors: false,
1841 max_active_agents: 1,
1842 };
1843
1844 assert!(is_opencode_child_session(&child));
1845 assert!(!is_opencode_child_session(&parent));
1846 }
1847
1848 #[test]
1849 fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1850 let child = LocalSessionRow {
1851 id: "sess_child_3".to_string(),
1852 source_path: None,
1853 sync_status: "local_only".to_string(),
1854 last_synced_at: None,
1855 user_id: None,
1856 nickname: None,
1857 team_id: None,
1858 tool: "opencode".to_string(),
1859 agent_provider: None,
1860 agent_model: None,
1861 title: None,
1862 description: None,
1863 tags: None,
1864 created_at: "2024-01-01T00:00:00Z".to_string(),
1865 uploaded_at: None,
1866 message_count: 3,
1867 user_message_count: 0,
1868 task_count: 2,
1869 event_count: 6,
1870 duration_seconds: 0,
1871 total_input_tokens: 0,
1872 total_output_tokens: 0,
1873 git_remote: None,
1874 git_branch: None,
1875 git_commit: None,
1876 git_repo_name: None,
1877 pr_number: None,
1878 pr_url: None,
1879 working_directory: None,
1880 files_modified: None,
1881 files_read: None,
1882 has_errors: false,
1883 max_active_agents: 1,
1884 };
1885
1886 assert!(is_opencode_child_session(&child));
1887 }
1888
1889 #[test]
1890 fn test_parse_opencode_parent_session_id_aliases() {
1891 let root = temp_root();
1892 let dir = root.path().join("session-aliases");
1893 create_dir_all(&dir).unwrap();
1894 let child_session = dir.join("child.json");
1895 write(
1896 &child_session,
1897 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1898 )
1899 .unwrap();
1900 assert_eq!(
1901 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1902 Some("ses_parent")
1903 );
1904 }
1905
1906 #[test]
1907 fn test_parse_opencode_parent_session_id_nested_metadata() {
1908 let root = temp_root();
1909 let dir = root.path().join("session-nested");
1910 create_dir_all(&dir).unwrap();
1911 let child_session = dir.join("child.json");
1912 write(
1913 &child_session,
1914 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1915 )
1916 .unwrap();
1917 assert_eq!(
1918 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1919 Some("ses_parent")
1920 );
1921 }
1922
1923 #[test]
1924 fn test_is_claude_subagent_session() {
1925 let row = make_row(
1926 "ses_parent",
1927 "claude-code",
1928 Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1929 );
1930 assert!(!is_opencode_child_session(&row));
1931 assert!(is_claude_subagent_session(&row));
1932 assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1933 }
1934
1935 #[test]
1936 fn test_hide_opencode_child_sessions() {
1937 let root = temp_root();
1938 let dir = root.path().join("sessions");
1939 create_dir_all(&dir).unwrap();
1940 let parent_session = dir.join("parent.json");
1941 let child_session = dir.join("child.json");
1942 let orphan_session = dir.join("orphan.json");
1943
1944 write(
1945 &parent_session,
1946 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1947 )
1948 .unwrap();
1949 write(
1950 &child_session,
1951 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1952 )
1953 .unwrap();
1954 write(
1955 &orphan_session,
1956 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1957 )
1958 .unwrap();
1959
1960 let rows = vec![
1961 make_row(
1962 "ses_child",
1963 "opencode",
1964 Some(child_session.to_str().unwrap()),
1965 ),
1966 make_row(
1967 "ses_parent",
1968 "opencode",
1969 Some(parent_session.to_str().unwrap()),
1970 ),
1971 {
1972 let mut row = make_row("ses_other", "codex", None);
1973 row.user_message_count = 1;
1974 row
1975 },
1976 make_row(
1977 "ses_orphan",
1978 "opencode",
1979 Some(orphan_session.to_str().unwrap()),
1980 ),
1981 ];
1982
1983 let filtered = hide_opencode_child_sessions(rows);
1984 assert_eq!(filtered.len(), 3);
1985 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1986 }
1987
1988 #[test]
1989 fn test_sync_cursor() {
1990 let db = test_db();
1991 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1992 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1993 assert_eq!(
1994 db.get_sync_cursor("team1").unwrap(),
1995 Some("2024-01-01T00:00:00Z".to_string())
1996 );
1997 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1999 assert_eq!(
2000 db.get_sync_cursor("team1").unwrap(),
2001 Some("2024-06-01T00:00:00Z".to_string())
2002 );
2003 }
2004
2005 #[test]
2006 fn test_body_cache() {
2007 let db = test_db();
2008 assert_eq!(db.get_cached_body("s1").unwrap(), None);
2009 db.cache_body("s1", b"hello world").unwrap();
2010 assert_eq!(
2011 db.get_cached_body("s1").unwrap(),
2012 Some(b"hello world".to_vec())
2013 );
2014 }
2015
2016 #[test]
2017 fn test_timeline_summary_cache_roundtrip() {
2018 let db = test_db();
2019 db.upsert_timeline_summary_cache(
2020 "k1",
2021 "timeline:v1",
2022 "compact text",
2023 "{\"kind\":\"turn-summary\"}",
2024 "raw text",
2025 )
2026 .unwrap();
2027
2028 let rows = db
2029 .list_timeline_summary_cache_by_namespace("timeline:v1")
2030 .unwrap();
2031 assert_eq!(rows.len(), 1);
2032 assert_eq!(rows[0].lookup_key, "k1");
2033 assert_eq!(rows[0].namespace, "timeline:v1");
2034 assert_eq!(rows[0].compact, "compact text");
2035 assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2036 assert_eq!(rows[0].raw, "raw text");
2037
2038 let cleared = db.clear_timeline_summary_cache().unwrap();
2039 assert_eq!(cleared, 1);
2040 let rows_after = db
2041 .list_timeline_summary_cache_by_namespace("timeline:v1")
2042 .unwrap();
2043 assert!(rows_after.is_empty());
2044 }
2045
2046 #[test]
2047 fn test_local_migrations_include_timeline_summary_cache() {
2048 let db = test_db();
2049 let conn = db.conn();
2050 let applied: bool = conn
2051 .query_row(
2052 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2053 params!["local_0003_timeline_summary_cache"],
2054 |row| row.get(0),
2055 )
2056 .unwrap();
2057 assert!(applied);
2058 }
2059
2060 #[test]
2061 fn test_local_migration_files_match_api_local_migrations() {
2062 fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2063 std::fs::read_dir(dir)
2064 .expect("read migrations directory")
2065 .filter_map(Result::ok)
2066 .map(|entry| entry.file_name().to_string_lossy().to_string())
2067 .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2068 .collect()
2069 }
2070
2071 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2072 let local_files = collect_local_sql(manifest_dir.join("migrations"));
2073 let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2074
2075 assert_eq!(
2076 local_files, api_files,
2077 "local-db local migrations must stay in parity with api local migrations"
2078 );
2079 }
2080
2081 #[test]
2082 fn test_upsert_remote_session() {
2083 let db = test_db();
2084 let summary = RemoteSessionSummary {
2085 id: "remote-1".to_string(),
2086 user_id: Some("u1".to_string()),
2087 nickname: Some("alice".to_string()),
2088 team_id: "t1".to_string(),
2089 tool: "claude-code".to_string(),
2090 agent_provider: None,
2091 agent_model: None,
2092 title: Some("Test session".to_string()),
2093 description: None,
2094 tags: None,
2095 created_at: "2024-01-01T00:00:00Z".to_string(),
2096 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2097 message_count: 10,
2098 task_count: 2,
2099 event_count: 20,
2100 duration_seconds: 300,
2101 total_input_tokens: 1000,
2102 total_output_tokens: 500,
2103 git_remote: None,
2104 git_branch: None,
2105 git_commit: None,
2106 git_repo_name: None,
2107 pr_number: None,
2108 pr_url: None,
2109 working_directory: None,
2110 files_modified: None,
2111 files_read: None,
2112 has_errors: false,
2113 max_active_agents: 1,
2114 };
2115 db.upsert_remote_session(&summary).unwrap();
2116
2117 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2118 assert_eq!(sessions.len(), 1);
2119 assert_eq!(sessions[0].id, "remote-1");
2120 assert_eq!(sessions[0].sync_status, "remote_only");
2121 assert_eq!(sessions[0].nickname, None); }
2123
2124 #[test]
2125 fn test_list_filter_by_repo() {
2126 let db = test_db();
2127 let summary1 = RemoteSessionSummary {
2129 id: "s1".to_string(),
2130 user_id: None,
2131 nickname: None,
2132 team_id: "t1".to_string(),
2133 tool: "claude-code".to_string(),
2134 agent_provider: None,
2135 agent_model: None,
2136 title: Some("Session 1".to_string()),
2137 description: None,
2138 tags: None,
2139 created_at: "2024-01-01T00:00:00Z".to_string(),
2140 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2141 message_count: 5,
2142 task_count: 0,
2143 event_count: 10,
2144 duration_seconds: 60,
2145 total_input_tokens: 100,
2146 total_output_tokens: 50,
2147 git_remote: None,
2148 git_branch: None,
2149 git_commit: None,
2150 git_repo_name: None,
2151 pr_number: None,
2152 pr_url: None,
2153 working_directory: None,
2154 files_modified: None,
2155 files_read: None,
2156 has_errors: false,
2157 max_active_agents: 1,
2158 };
2159 db.upsert_remote_session(&summary1).unwrap();
2160
2161 let filter = LocalSessionFilter {
2163 team_id: Some("t1".to_string()),
2164 ..Default::default()
2165 };
2166 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2167
2168 let filter = LocalSessionFilter {
2169 team_id: Some("t999".to_string()),
2170 ..Default::default()
2171 };
2172 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2173 }
2174
2175 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2178 RemoteSessionSummary {
2179 id: id.to_string(),
2180 user_id: None,
2181 nickname: None,
2182 team_id: "t1".to_string(),
2183 tool: tool.to_string(),
2184 agent_provider: Some("anthropic".to_string()),
2185 agent_model: Some("claude-opus-4-6".to_string()),
2186 title: Some(title.to_string()),
2187 description: None,
2188 tags: None,
2189 created_at: created_at.to_string(),
2190 uploaded_at: created_at.to_string(),
2191 message_count: 5,
2192 task_count: 1,
2193 event_count: 10,
2194 duration_seconds: 300,
2195 total_input_tokens: 1000,
2196 total_output_tokens: 500,
2197 git_remote: None,
2198 git_branch: None,
2199 git_commit: None,
2200 git_repo_name: None,
2201 pr_number: None,
2202 pr_url: None,
2203 working_directory: None,
2204 files_modified: None,
2205 files_read: None,
2206 has_errors: false,
2207 max_active_agents: 1,
2208 }
2209 }
2210
2211 fn seed_sessions(db: &LocalDb) {
2212 db.upsert_remote_session(&make_summary(
2214 "s1",
2215 "claude-code",
2216 "First session",
2217 "2024-01-01T00:00:00Z",
2218 ))
2219 .unwrap();
2220 db.upsert_remote_session(&make_summary(
2221 "s2",
2222 "claude-code",
2223 "JWT auth work",
2224 "2024-01-02T00:00:00Z",
2225 ))
2226 .unwrap();
2227 db.upsert_remote_session(&make_summary(
2228 "s3",
2229 "gemini",
2230 "Gemini test",
2231 "2024-01-03T00:00:00Z",
2232 ))
2233 .unwrap();
2234 db.upsert_remote_session(&make_summary(
2235 "s4",
2236 "claude-code",
2237 "Error handling",
2238 "2024-01-04T00:00:00Z",
2239 ))
2240 .unwrap();
2241 db.upsert_remote_session(&make_summary(
2242 "s5",
2243 "claude-code",
2244 "Final polish",
2245 "2024-01-05T00:00:00Z",
2246 ))
2247 .unwrap();
2248 }
2249
2250 #[test]
2253 fn test_log_no_filters() {
2254 let db = test_db();
2255 seed_sessions(&db);
2256 let filter = LogFilter::default();
2257 let results = db.list_sessions_log(&filter).unwrap();
2258 assert_eq!(results.len(), 5);
2259 assert_eq!(results[0].id, "s5");
2261 assert_eq!(results[4].id, "s1");
2262 }
2263
2264 #[test]
2265 fn test_log_filter_by_tool() {
2266 let db = test_db();
2267 seed_sessions(&db);
2268 let filter = LogFilter {
2269 tool: Some("claude-code".to_string()),
2270 ..Default::default()
2271 };
2272 let results = db.list_sessions_log(&filter).unwrap();
2273 assert_eq!(results.len(), 4);
2274 assert!(results.iter().all(|s| s.tool == "claude-code"));
2275 }
2276
2277 #[test]
2278 fn test_log_filter_by_model_wildcard() {
2279 let db = test_db();
2280 seed_sessions(&db);
2281 let filter = LogFilter {
2282 model: Some("claude*".to_string()),
2283 ..Default::default()
2284 };
2285 let results = db.list_sessions_log(&filter).unwrap();
2286 assert_eq!(results.len(), 5); }
2288
2289 #[test]
2290 fn test_log_filter_since() {
2291 let db = test_db();
2292 seed_sessions(&db);
2293 let filter = LogFilter {
2294 since: Some("2024-01-03T00:00:00Z".to_string()),
2295 ..Default::default()
2296 };
2297 let results = db.list_sessions_log(&filter).unwrap();
2298 assert_eq!(results.len(), 3); }
2300
2301 #[test]
2302 fn test_log_filter_before() {
2303 let db = test_db();
2304 seed_sessions(&db);
2305 let filter = LogFilter {
2306 before: Some("2024-01-03T00:00:00Z".to_string()),
2307 ..Default::default()
2308 };
2309 let results = db.list_sessions_log(&filter).unwrap();
2310 assert_eq!(results.len(), 2); }
2312
2313 #[test]
2314 fn test_log_filter_since_and_before() {
2315 let db = test_db();
2316 seed_sessions(&db);
2317 let filter = LogFilter {
2318 since: Some("2024-01-02T00:00:00Z".to_string()),
2319 before: Some("2024-01-04T00:00:00Z".to_string()),
2320 ..Default::default()
2321 };
2322 let results = db.list_sessions_log(&filter).unwrap();
2323 assert_eq!(results.len(), 2); }
2325
2326 #[test]
2327 fn test_log_filter_grep() {
2328 let db = test_db();
2329 seed_sessions(&db);
2330 let filter = LogFilter {
2331 grep: Some("JWT".to_string()),
2332 ..Default::default()
2333 };
2334 let results = db.list_sessions_log(&filter).unwrap();
2335 assert_eq!(results.len(), 1);
2336 assert_eq!(results[0].id, "s2");
2337 }
2338
2339 #[test]
2340 fn test_log_limit_and_offset() {
2341 let db = test_db();
2342 seed_sessions(&db);
2343 let filter = LogFilter {
2344 limit: Some(2),
2345 offset: Some(1),
2346 ..Default::default()
2347 };
2348 let results = db.list_sessions_log(&filter).unwrap();
2349 assert_eq!(results.len(), 2);
2350 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2352 }
2353
2354 #[test]
2355 fn test_log_limit_only() {
2356 let db = test_db();
2357 seed_sessions(&db);
2358 let filter = LogFilter {
2359 limit: Some(3),
2360 ..Default::default()
2361 };
2362 let results = db.list_sessions_log(&filter).unwrap();
2363 assert_eq!(results.len(), 3);
2364 }
2365
2366 #[test]
2367 fn test_list_sessions_limit_offset() {
2368 let db = test_db();
2369 seed_sessions(&db);
2370 let filter = LocalSessionFilter {
2371 limit: Some(2),
2372 offset: Some(1),
2373 ..Default::default()
2374 };
2375 let results = db.list_sessions(&filter).unwrap();
2376 assert_eq!(results.len(), 2);
2377 assert_eq!(results[0].id, "s4");
2378 assert_eq!(results[1].id, "s3");
2379 }
2380
2381 #[test]
2382 fn test_count_sessions_filtered() {
2383 let db = test_db();
2384 seed_sessions(&db);
2385 let count = db
2386 .count_sessions_filtered(&LocalSessionFilter::default())
2387 .unwrap();
2388 assert_eq!(count, 5);
2389 }
2390
2391 #[test]
2392 fn test_list_working_directories_distinct_non_empty() {
2393 let db = test_db();
2394
2395 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2396 a.working_directory = Some("/tmp/repo-a".to_string());
2397 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2398 b.working_directory = Some("/tmp/repo-a".to_string());
2399 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2400 c.working_directory = Some("/tmp/repo-b".to_string());
2401 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2402 d.working_directory = Some("".to_string());
2403
2404 db.upsert_remote_session(&a).unwrap();
2405 db.upsert_remote_session(&b).unwrap();
2406 db.upsert_remote_session(&c).unwrap();
2407 db.upsert_remote_session(&d).unwrap();
2408
2409 let dirs = db.list_working_directories().unwrap();
2410 assert_eq!(
2411 dirs,
2412 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2413 );
2414 }
2415
2416 #[test]
2417 fn test_list_session_tools() {
2418 let db = test_db();
2419 seed_sessions(&db);
2420 let tools = db
2421 .list_session_tools(&LocalSessionFilter::default())
2422 .unwrap();
2423 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2424 }
2425
2426 #[test]
2427 fn test_log_combined_filters() {
2428 let db = test_db();
2429 seed_sessions(&db);
2430 let filter = LogFilter {
2431 tool: Some("claude-code".to_string()),
2432 since: Some("2024-01-03T00:00:00Z".to_string()),
2433 limit: Some(1),
2434 ..Default::default()
2435 };
2436 let results = db.list_sessions_log(&filter).unwrap();
2437 assert_eq!(results.len(), 1);
2438 assert_eq!(results[0].id, "s5"); }
2440
2441 #[test]
2444 fn test_get_session_by_offset() {
2445 let db = test_db();
2446 seed_sessions(&db);
2447 let row = db.get_session_by_offset(0).unwrap().unwrap();
2448 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2450 assert_eq!(row.id, "s3");
2451 assert!(db.get_session_by_offset(10).unwrap().is_none());
2452 }
2453
2454 #[test]
2455 fn test_get_session_by_tool_offset() {
2456 let db = test_db();
2457 seed_sessions(&db);
2458 let row = db
2459 .get_session_by_tool_offset("claude-code", 0)
2460 .unwrap()
2461 .unwrap();
2462 assert_eq!(row.id, "s5");
2463 let row = db
2464 .get_session_by_tool_offset("claude-code", 1)
2465 .unwrap()
2466 .unwrap();
2467 assert_eq!(row.id, "s4");
2468 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2469 assert_eq!(row.id, "s3");
2470 assert!(db
2471 .get_session_by_tool_offset("gemini", 1)
2472 .unwrap()
2473 .is_none());
2474 }
2475
2476 #[test]
2477 fn test_get_sessions_latest() {
2478 let db = test_db();
2479 seed_sessions(&db);
2480 let rows = db.get_sessions_latest(3).unwrap();
2481 assert_eq!(rows.len(), 3);
2482 assert_eq!(rows[0].id, "s5");
2483 assert_eq!(rows[1].id, "s4");
2484 assert_eq!(rows[2].id, "s3");
2485 }
2486
2487 #[test]
2488 fn test_get_sessions_by_tool_latest() {
2489 let db = test_db();
2490 seed_sessions(&db);
2491 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2492 assert_eq!(rows.len(), 2);
2493 assert_eq!(rows[0].id, "s5");
2494 assert_eq!(rows[1].id, "s4");
2495 }
2496
2497 #[test]
2498 fn test_get_sessions_latest_more_than_available() {
2499 let db = test_db();
2500 seed_sessions(&db);
2501 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2502 assert_eq!(rows.len(), 1); }
2504
2505 #[test]
2506 fn test_session_count() {
2507 let db = test_db();
2508 assert_eq!(db.session_count().unwrap(), 0);
2509 seed_sessions(&db);
2510 assert_eq!(db.session_count().unwrap(), 5);
2511 }
2512
2513 #[test]
2516 fn test_link_commit_session() {
2517 let db = test_db();
2518 seed_sessions(&db);
2519 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2520 .unwrap();
2521
2522 let commits = db.get_commits_by_session("s1").unwrap();
2523 assert_eq!(commits.len(), 1);
2524 assert_eq!(commits[0].commit_hash, "abc123");
2525 assert_eq!(commits[0].session_id, "s1");
2526 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2527 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2528
2529 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2530 assert_eq!(sessions.len(), 1);
2531 assert_eq!(sessions[0].id, "s1");
2532 }
2533
2534 #[test]
2535 fn test_get_sessions_by_commit() {
2536 let db = test_db();
2537 seed_sessions(&db);
2538 db.link_commit_session("abc123", "s1", None, None).unwrap();
2540 db.link_commit_session("abc123", "s2", None, None).unwrap();
2541 db.link_commit_session("abc123", "s3", None, None).unwrap();
2542
2543 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2544 assert_eq!(sessions.len(), 3);
2545 assert_eq!(sessions[0].id, "s3");
2547 assert_eq!(sessions[1].id, "s2");
2548 assert_eq!(sessions[2].id, "s1");
2549 }
2550
2551 #[test]
2552 fn test_get_commits_by_session() {
2553 let db = test_db();
2554 seed_sessions(&db);
2555 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2557 .unwrap();
2558 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2559 .unwrap();
2560 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2561 .unwrap();
2562
2563 let commits = db.get_commits_by_session("s1").unwrap();
2564 assert_eq!(commits.len(), 3);
2565 assert!(commits.iter().all(|c| c.session_id == "s1"));
2567 }
2568
2569 #[test]
2570 fn test_duplicate_link_ignored() {
2571 let db = test_db();
2572 seed_sessions(&db);
2573 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2574 .unwrap();
2575 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2577 .unwrap();
2578
2579 let commits = db.get_commits_by_session("s1").unwrap();
2580 assert_eq!(commits.len(), 1);
2581 }
2582
2583 #[test]
2584 fn test_log_filter_by_commit() {
2585 let db = test_db();
2586 seed_sessions(&db);
2587 db.link_commit_session("abc123", "s2", None, None).unwrap();
2588 db.link_commit_session("abc123", "s4", None, None).unwrap();
2589
2590 let filter = LogFilter {
2591 commit: Some("abc123".to_string()),
2592 ..Default::default()
2593 };
2594 let results = db.list_sessions_log(&filter).unwrap();
2595 assert_eq!(results.len(), 2);
2596 assert_eq!(results[0].id, "s4");
2597 assert_eq!(results[1].id, "s2");
2598
2599 let filter = LogFilter {
2601 commit: Some("nonexistent".to_string()),
2602 ..Default::default()
2603 };
2604 let results = db.list_sessions_log(&filter).unwrap();
2605 assert_eq!(results.len(), 0);
2606 }
2607}