1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_core::session::{is_auxiliary_session, working_directory};
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Mutex;
13
14use git::{normalize_repo_name, GitContext};
15
16#[derive(Debug, Clone)]
18pub struct LocalSessionRow {
19 pub id: String,
20 pub source_path: Option<String>,
21 pub sync_status: String,
22 pub last_synced_at: Option<String>,
23 pub user_id: Option<String>,
24 pub nickname: Option<String>,
25 pub team_id: Option<String>,
26 pub tool: String,
27 pub agent_provider: Option<String>,
28 pub agent_model: Option<String>,
29 pub title: Option<String>,
30 pub description: Option<String>,
31 pub tags: Option<String>,
32 pub created_at: String,
33 pub uploaded_at: Option<String>,
34 pub message_count: i64,
35 pub user_message_count: i64,
36 pub task_count: i64,
37 pub event_count: i64,
38 pub duration_seconds: i64,
39 pub total_input_tokens: i64,
40 pub total_output_tokens: i64,
41 pub git_remote: Option<String>,
42 pub git_branch: Option<String>,
43 pub git_commit: Option<String>,
44 pub git_repo_name: Option<String>,
45 pub pr_number: Option<i64>,
46 pub pr_url: Option<String>,
47 pub working_directory: Option<String>,
48 pub files_modified: Option<String>,
49 pub files_read: Option<String>,
50 pub has_errors: bool,
51 pub max_active_agents: i64,
52 pub is_auxiliary: bool,
53}
54
55#[derive(Debug, Clone)]
57pub struct CommitLink {
58 pub commit_hash: String,
59 pub session_id: String,
60 pub repo_path: Option<String>,
61 pub branch: Option<String>,
62 pub created_at: String,
63}
64
65pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
67 row.tool == "opencode" && row.is_auxiliary
68}
69
70#[deprecated(
72 note = "Use parser/core canonical session role attributes instead of runtime file inspection"
73)]
74pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
75 let text = fs::read_to_string(source_path).ok()?;
76 let json: Value = serde_json::from_str(&text).ok()?;
77 lookup_parent_session_id(&json)
78}
79
80fn lookup_parent_session_id(value: &Value) -> Option<String> {
81 match value {
82 Value::Object(obj) => {
83 for (key, value) in obj {
84 if is_parent_id_key(key) {
85 if let Some(parent_id) = value.as_str() {
86 let parent_id = parent_id.trim();
87 if !parent_id.is_empty() {
88 return Some(parent_id.to_string());
89 }
90 }
91 }
92 if let Some(parent_id) = lookup_parent_session_id(value) {
93 return Some(parent_id);
94 }
95 }
96 None
97 }
98 Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
99 _ => None,
100 }
101}
102
103fn is_parent_id_key(key: &str) -> bool {
104 let flat = key
105 .chars()
106 .filter(|c| c.is_ascii_alphanumeric())
107 .map(|c| c.to_ascii_lowercase())
108 .collect::<String>();
109
110 flat == "parentid"
111 || flat == "parentuuid"
112 || flat == "parentsessionid"
113 || flat == "parentsessionuuid"
114 || flat.ends_with("parentsessionid")
115 || (flat.contains("parent") && flat.ends_with("id"))
116 || (flat.contains("parent") && flat.ends_with("uuid"))
117}
118
119pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
121 rows.retain(|row| !row.is_auxiliary);
122 rows
123}
124
125fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
126 let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
127
128 if source_path.contains("/.codex/sessions/")
129 || source_path.contains("\\.codex\\sessions\\")
130 || source_path.contains("/codex/sessions/")
131 || source_path.contains("\\codex\\sessions\\")
132 {
133 return Some("codex");
134 }
135
136 if source_path.contains("/.claude/projects/")
137 || source_path.contains("\\.claude\\projects\\")
138 || source_path.contains("/claude/projects/")
139 || source_path.contains("\\claude\\projects\\")
140 {
141 return Some("claude-code");
142 }
143
144 None
145}
146
147fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
148 infer_tool_from_source_path(source_path)
149 .unwrap_or(current_tool)
150 .to_string()
151}
152
153fn normalize_non_empty(value: Option<&str>) -> Option<String> {
154 value
155 .map(str::trim)
156 .filter(|value| !value.is_empty())
157 .map(ToOwned::to_owned)
158}
159
160fn json_object_string(value: &Value, keys: &[&str]) -> Option<String> {
161 let obj = value.as_object()?;
162 for key in keys {
163 if let Some(found) = obj.get(*key).and_then(Value::as_str) {
164 let normalized = found.trim();
165 if !normalized.is_empty() {
166 return Some(normalized.to_string());
167 }
168 }
169 }
170 None
171}
172
173fn git_context_from_session_attributes(session: &Session) -> GitContext {
174 let attrs = &session.context.attributes;
175
176 let mut remote = normalize_non_empty(attrs.get("git_remote").and_then(Value::as_str));
177 let mut branch = normalize_non_empty(attrs.get("git_branch").and_then(Value::as_str));
178 let mut commit = normalize_non_empty(attrs.get("git_commit").and_then(Value::as_str));
179 let mut repo_name = normalize_non_empty(attrs.get("git_repo_name").and_then(Value::as_str));
180
181 if let Some(git_value) = attrs.get("git") {
182 if remote.is_none() {
183 remote = json_object_string(
184 git_value,
185 &["remote", "repository_url", "repo_url", "origin", "url"],
186 );
187 }
188 if branch.is_none() {
189 branch = json_object_string(
190 git_value,
191 &["branch", "git_branch", "current_branch", "ref", "head"],
192 );
193 }
194 if commit.is_none() {
195 commit = json_object_string(git_value, &["commit", "commit_hash", "sha", "git_commit"]);
196 }
197 if repo_name.is_none() {
198 repo_name = json_object_string(git_value, &["repo_name", "repository", "repo", "name"]);
199 }
200 }
201
202 if repo_name.is_none() {
203 repo_name = remote
204 .as_deref()
205 .and_then(normalize_repo_name)
206 .map(ToOwned::to_owned);
207 }
208
209 GitContext {
210 remote,
211 branch,
212 commit,
213 repo_name,
214 }
215}
216
217fn git_context_has_any_field(git: &GitContext) -> bool {
218 git.remote.is_some() || git.branch.is_some() || git.commit.is_some() || git.repo_name.is_some()
219}
220
221fn merge_git_context(preferred: &GitContext, fallback: &GitContext) -> GitContext {
222 GitContext {
223 remote: preferred.remote.clone().or_else(|| fallback.remote.clone()),
224 branch: preferred.branch.clone().or_else(|| fallback.branch.clone()),
225 commit: preferred.commit.clone().or_else(|| fallback.commit.clone()),
226 repo_name: preferred
227 .repo_name
228 .clone()
229 .or_else(|| fallback.repo_name.clone()),
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct LocalSessionFilter {
236 pub team_id: Option<String>,
237 pub sync_status: Option<String>,
238 pub git_repo_name: Option<String>,
239 pub search: Option<String>,
240 pub exclude_low_signal: bool,
241 pub tool: Option<String>,
242 pub sort: LocalSortOrder,
243 pub time_range: LocalTimeRange,
244 pub limit: Option<u32>,
245 pub offset: Option<u32>,
246}
247
248impl Default for LocalSessionFilter {
249 fn default() -> Self {
250 Self {
251 team_id: None,
252 sync_status: None,
253 git_repo_name: None,
254 search: None,
255 exclude_low_signal: false,
256 tool: None,
257 sort: LocalSortOrder::Recent,
258 time_range: LocalTimeRange::All,
259 limit: None,
260 offset: None,
261 }
262 }
263}
264
265#[derive(Debug, Clone, Default, PartialEq, Eq)]
267pub enum LocalSortOrder {
268 #[default]
269 Recent,
270 Popular,
271 Longest,
272}
273
274#[derive(Debug, Clone, Default, PartialEq, Eq)]
276pub enum LocalTimeRange {
277 Hours24,
278 Days7,
279 Days30,
280 #[default]
281 All,
282}
283
284#[derive(Debug, Clone)]
286pub struct RemoteSessionSummary {
287 pub id: String,
288 pub user_id: Option<String>,
289 pub nickname: Option<String>,
290 pub team_id: String,
291 pub tool: String,
292 pub agent_provider: Option<String>,
293 pub agent_model: Option<String>,
294 pub title: Option<String>,
295 pub description: Option<String>,
296 pub tags: Option<String>,
297 pub created_at: String,
298 pub uploaded_at: String,
299 pub message_count: i64,
300 pub task_count: i64,
301 pub event_count: i64,
302 pub duration_seconds: i64,
303 pub total_input_tokens: i64,
304 pub total_output_tokens: i64,
305 pub git_remote: Option<String>,
306 pub git_branch: Option<String>,
307 pub git_commit: Option<String>,
308 pub git_repo_name: Option<String>,
309 pub pr_number: Option<i64>,
310 pub pr_url: Option<String>,
311 pub working_directory: Option<String>,
312 pub files_modified: Option<String>,
313 pub files_read: Option<String>,
314 pub has_errors: bool,
315 pub max_active_agents: i64,
316}
317
318#[derive(Debug, Default)]
320pub struct LogFilter {
321 pub tool: Option<String>,
323 pub model: Option<String>,
325 pub since: Option<String>,
327 pub before: Option<String>,
329 pub touches: Option<String>,
331 pub grep: Option<String>,
333 pub has_errors: Option<bool>,
335 pub working_directory: Option<String>,
337 pub git_repo_name: Option<String>,
339 pub commit: Option<String>,
341 pub limit: Option<u32>,
343 pub offset: Option<u32>,
345}
346
347const FROM_CLAUSE: &str = "\
349FROM sessions s \
350LEFT JOIN session_sync ss ON ss.session_id = s.id \
351LEFT JOIN users u ON u.id = s.user_id";
352
353pub struct LocalDb {
357 conn: Mutex<Connection>,
358}
359
360impl LocalDb {
361 pub fn open() -> Result<Self> {
364 let path = default_db_path()?;
365 Self::open_path(&path)
366 }
367
368 pub fn open_path(path: &PathBuf) -> Result<Self> {
370 if let Some(parent) = path.parent() {
371 std::fs::create_dir_all(parent)
372 .with_context(|| format!("create dir for {}", path.display()))?;
373 }
374 match open_connection_with_latest_schema(path) {
375 Ok(conn) => Ok(Self {
376 conn: Mutex::new(conn),
377 }),
378 Err(err) => {
379 if !is_schema_compat_error(&err) {
380 return Err(err);
381 }
382
383 rotate_legacy_db(path)?;
386
387 let conn = open_connection_with_latest_schema(path)
388 .with_context(|| format!("recreate db {}", path.display()))?;
389 Ok(Self {
390 conn: Mutex::new(conn),
391 })
392 }
393 }
394 }
395
396 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
397 self.conn.lock().expect("local db mutex poisoned")
398 }
399
400 pub fn upsert_local_session(
403 &self,
404 session: &Session,
405 source_path: &str,
406 git: &GitContext,
407 ) -> Result<()> {
408 let title = session.context.title.as_deref();
409 let description = session.context.description.as_deref();
410 let tags = if session.context.tags.is_empty() {
411 None
412 } else {
413 Some(session.context.tags.join(","))
414 };
415 let created_at = session.context.created_at.to_rfc3339();
416 let cwd = working_directory(session).map(String::from);
417 let is_auxiliary = is_auxiliary_session(session);
418
419 let (files_modified, files_read, has_errors) =
421 opensession_core::extract::extract_file_metadata(session);
422 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
423 let normalized_tool =
424 normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
425 let git_from_session = git_context_from_session_attributes(session);
426 let has_session_git = git_context_has_any_field(&git_from_session);
427 let merged_git = merge_git_context(&git_from_session, git);
428
429 let conn = self.conn();
430 conn.execute(
433 "INSERT INTO sessions \
434 (id, team_id, tool, agent_provider, agent_model, \
435 title, description, tags, created_at, \
436 message_count, user_message_count, task_count, event_count, duration_seconds, \
437 total_input_tokens, total_output_tokens, body_storage_key, \
438 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
439 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
440 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24,?25) \
441 ON CONFLICT(id) DO UPDATE SET \
442 tool=excluded.tool, agent_provider=excluded.agent_provider, \
443 agent_model=excluded.agent_model, \
444 title=excluded.title, description=excluded.description, \
445 tags=excluded.tags, \
446 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
447 task_count=excluded.task_count, \
448 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
449 total_input_tokens=excluded.total_input_tokens, \
450 total_output_tokens=excluded.total_output_tokens, \
451 git_remote=CASE WHEN ?26=1 THEN excluded.git_remote ELSE COALESCE(git_remote, excluded.git_remote) END, \
452 git_branch=CASE WHEN ?26=1 THEN excluded.git_branch ELSE COALESCE(git_branch, excluded.git_branch) END, \
453 git_commit=CASE WHEN ?26=1 THEN excluded.git_commit ELSE COALESCE(git_commit, excluded.git_commit) END, \
454 git_repo_name=CASE WHEN ?26=1 THEN excluded.git_repo_name ELSE COALESCE(git_repo_name, excluded.git_repo_name) END, \
455 working_directory=excluded.working_directory, \
456 files_modified=excluded.files_modified, files_read=excluded.files_read, \
457 has_errors=excluded.has_errors, \
458 max_active_agents=excluded.max_active_agents, \
459 is_auxiliary=excluded.is_auxiliary",
460 params![
461 &session.session_id,
462 &normalized_tool,
463 &session.agent.provider,
464 &session.agent.model,
465 title,
466 description,
467 &tags,
468 &created_at,
469 session.stats.message_count as i64,
470 session.stats.user_message_count as i64,
471 session.stats.task_count as i64,
472 session.stats.event_count as i64,
473 session.stats.duration_seconds as i64,
474 session.stats.total_input_tokens as i64,
475 session.stats.total_output_tokens as i64,
476 &merged_git.remote,
477 &merged_git.branch,
478 &merged_git.commit,
479 &merged_git.repo_name,
480 &cwd,
481 &files_modified,
482 &files_read,
483 has_errors,
484 max_active_agents,
485 is_auxiliary as i64,
486 has_session_git as i64,
487 ],
488 )?;
489
490 conn.execute(
491 "INSERT INTO session_sync (session_id, source_path, sync_status) \
492 VALUES (?1, ?2, 'local_only') \
493 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
494 params![&session.session_id, source_path],
495 )?;
496 Ok(())
497 }
498
499 pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
502 let conn = self.conn();
503 conn.execute(
506 "INSERT INTO sessions \
507 (id, user_id, team_id, tool, agent_provider, agent_model, \
508 title, description, tags, created_at, uploaded_at, \
509 message_count, task_count, event_count, duration_seconds, \
510 total_input_tokens, total_output_tokens, body_storage_key, \
511 git_remote, git_branch, git_commit, git_repo_name, \
512 pr_number, pr_url, working_directory, \
513 files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
514 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,0) \
515 ON CONFLICT(id) DO UPDATE SET \
516 title=excluded.title, description=excluded.description, \
517 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
518 message_count=excluded.message_count, task_count=excluded.task_count, \
519 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
520 total_input_tokens=excluded.total_input_tokens, \
521 total_output_tokens=excluded.total_output_tokens, \
522 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
523 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
524 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
525 working_directory=excluded.working_directory, \
526 files_modified=excluded.files_modified, files_read=excluded.files_read, \
527 has_errors=excluded.has_errors, \
528 max_active_agents=excluded.max_active_agents, \
529 is_auxiliary=excluded.is_auxiliary",
530 params![
531 &summary.id,
532 &summary.user_id,
533 &summary.team_id,
534 &summary.tool,
535 &summary.agent_provider,
536 &summary.agent_model,
537 &summary.title,
538 &summary.description,
539 &summary.tags,
540 &summary.created_at,
541 &summary.uploaded_at,
542 summary.message_count,
543 summary.task_count,
544 summary.event_count,
545 summary.duration_seconds,
546 summary.total_input_tokens,
547 summary.total_output_tokens,
548 &summary.git_remote,
549 &summary.git_branch,
550 &summary.git_commit,
551 &summary.git_repo_name,
552 summary.pr_number,
553 &summary.pr_url,
554 &summary.working_directory,
555 &summary.files_modified,
556 &summary.files_read,
557 summary.has_errors,
558 summary.max_active_agents,
559 ],
560 )?;
561
562 conn.execute(
563 "INSERT INTO session_sync (session_id, sync_status) \
564 VALUES (?1, 'remote_only') \
565 ON CONFLICT(session_id) DO UPDATE SET \
566 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
567 params![&summary.id],
568 )?;
569 Ok(())
570 }
571
572 fn build_local_session_where_clause(
575 filter: &LocalSessionFilter,
576 ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
577 let mut where_clauses = vec![
578 "1=1".to_string(),
579 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
580 ];
581 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
582 let mut idx = 1u32;
583
584 if let Some(ref team_id) = filter.team_id {
585 where_clauses.push(format!("s.team_id = ?{idx}"));
586 param_values.push(Box::new(team_id.clone()));
587 idx += 1;
588 }
589
590 if let Some(ref sync_status) = filter.sync_status {
591 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
592 param_values.push(Box::new(sync_status.clone()));
593 idx += 1;
594 }
595
596 if let Some(ref repo) = filter.git_repo_name {
597 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
598 param_values.push(Box::new(repo.clone()));
599 idx += 1;
600 }
601
602 if let Some(ref tool) = filter.tool {
603 where_clauses.push(format!("s.tool = ?{idx}"));
604 param_values.push(Box::new(tool.clone()));
605 idx += 1;
606 }
607
608 if let Some(ref search) = filter.search {
609 let like = format!("%{search}%");
610 where_clauses.push(format!(
611 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
612 i1 = idx,
613 i2 = idx + 1,
614 i3 = idx + 2,
615 ));
616 param_values.push(Box::new(like.clone()));
617 param_values.push(Box::new(like.clone()));
618 param_values.push(Box::new(like));
619 idx += 3;
620 }
621
622 if filter.exclude_low_signal {
623 where_clauses.push(
624 "NOT (COALESCE(s.message_count, 0) = 0 \
625 AND COALESCE(s.user_message_count, 0) = 0 \
626 AND COALESCE(s.task_count, 0) = 0 \
627 AND COALESCE(s.event_count, 0) <= 2 \
628 AND (s.title IS NULL OR TRIM(s.title) = ''))"
629 .to_string(),
630 );
631 }
632
633 let interval = match filter.time_range {
634 LocalTimeRange::Hours24 => Some("-1 day"),
635 LocalTimeRange::Days7 => Some("-7 days"),
636 LocalTimeRange::Days30 => Some("-30 days"),
637 LocalTimeRange::All => None,
638 };
639 if let Some(interval) = interval {
640 where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
641 param_values.push(Box::new(interval.to_string()));
642 }
643
644 (where_clauses.join(" AND "), param_values)
645 }
646
647 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
648 let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
649 let order_clause = match filter.sort {
650 LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
651 LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
652 LocalSortOrder::Recent => "s.created_at DESC",
653 };
654
655 let mut sql = format!(
656 "SELECT {LOCAL_SESSION_COLUMNS} \
657 {FROM_CLAUSE} WHERE {where_str} \
658 ORDER BY {order_clause}"
659 );
660
661 if let Some(limit) = filter.limit {
662 sql.push_str(" LIMIT ?");
663 param_values.push(Box::new(limit));
664 if let Some(offset) = filter.offset {
665 sql.push_str(" OFFSET ?");
666 param_values.push(Box::new(offset));
667 }
668 }
669
670 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
671 param_values.iter().map(|p| p.as_ref()).collect();
672 let conn = self.conn();
673 let mut stmt = conn.prepare(&sql)?;
674 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
675
676 let mut result = Vec::new();
677 for row in rows {
678 result.push(row?);
679 }
680
681 Ok(result)
682 }
683
684 pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
686 let mut count_filter = filter.clone();
687 count_filter.limit = None;
688 count_filter.offset = None;
689 let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
690 let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
691 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
692 param_values.iter().map(|p| p.as_ref()).collect();
693 let conn = self.conn();
694 let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
695 Ok(count)
696 }
697
698 pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
700 let mut tool_filter = filter.clone();
701 tool_filter.tool = None;
702 tool_filter.limit = None;
703 tool_filter.offset = None;
704 let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
705 let sql = format!(
706 "SELECT DISTINCT s.tool \
707 {FROM_CLAUSE} WHERE {where_str} \
708 ORDER BY s.tool ASC"
709 );
710 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
711 param_values.iter().map(|p| p.as_ref()).collect();
712 let conn = self.conn();
713 let mut stmt = conn.prepare(&sql)?;
714 let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
715
716 let mut tools = Vec::new();
717 for row in rows {
718 let tool = row?;
719 if !tool.trim().is_empty() {
720 tools.push(tool);
721 }
722 }
723 Ok(tools)
724 }
725
726 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
730 let mut where_clauses = vec![
731 "1=1".to_string(),
732 "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
733 ];
734 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
735 let mut idx = 1u32;
736
737 if let Some(ref tool) = filter.tool {
738 where_clauses.push(format!("s.tool = ?{idx}"));
739 param_values.push(Box::new(tool.clone()));
740 idx += 1;
741 }
742
743 if let Some(ref model) = filter.model {
744 let like = model.replace('*', "%");
745 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
746 param_values.push(Box::new(like));
747 idx += 1;
748 }
749
750 if let Some(ref since) = filter.since {
751 where_clauses.push(format!("s.created_at >= ?{idx}"));
752 param_values.push(Box::new(since.clone()));
753 idx += 1;
754 }
755
756 if let Some(ref before) = filter.before {
757 where_clauses.push(format!("s.created_at < ?{idx}"));
758 param_values.push(Box::new(before.clone()));
759 idx += 1;
760 }
761
762 if let Some(ref touches) = filter.touches {
763 let like = format!("%\"{touches}\"%");
764 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
765 param_values.push(Box::new(like));
766 idx += 1;
767 }
768
769 if let Some(ref grep) = filter.grep {
770 let like = format!("%{grep}%");
771 where_clauses.push(format!(
772 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
773 i1 = idx,
774 i2 = idx + 1,
775 i3 = idx + 2,
776 ));
777 param_values.push(Box::new(like.clone()));
778 param_values.push(Box::new(like.clone()));
779 param_values.push(Box::new(like));
780 idx += 3;
781 }
782
783 if let Some(true) = filter.has_errors {
784 where_clauses.push("s.has_errors = 1".to_string());
785 }
786
787 if let Some(ref wd) = filter.working_directory {
788 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
789 param_values.push(Box::new(format!("{wd}%")));
790 idx += 1;
791 }
792
793 if let Some(ref repo) = filter.git_repo_name {
794 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
795 param_values.push(Box::new(repo.clone()));
796 idx += 1;
797 }
798
799 let mut extra_join = String::new();
801 if let Some(ref commit) = filter.commit {
802 extra_join =
803 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
804 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
805 param_values.push(Box::new(commit.clone()));
806 idx += 1;
807 }
808
809 let _ = idx; let where_str = where_clauses.join(" AND ");
812 let mut sql = format!(
813 "SELECT {LOCAL_SESSION_COLUMNS} \
814 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
815 ORDER BY s.created_at DESC"
816 );
817
818 if let Some(limit) = filter.limit {
819 sql.push_str(" LIMIT ?");
820 param_values.push(Box::new(limit));
821 if let Some(offset) = filter.offset {
822 sql.push_str(" OFFSET ?");
823 param_values.push(Box::new(offset));
824 }
825 }
826
827 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
828 param_values.iter().map(|p| p.as_ref()).collect();
829 let conn = self.conn();
830 let mut stmt = conn.prepare(&sql)?;
831 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
832
833 let mut result = Vec::new();
834 for row in rows {
835 result.push(row?);
836 }
837 Ok(result)
838 }
839
840 pub fn get_sessions_by_tool_latest(
842 &self,
843 tool: &str,
844 count: u32,
845 ) -> Result<Vec<LocalSessionRow>> {
846 let sql = format!(
847 "SELECT {LOCAL_SESSION_COLUMNS} \
848 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
849 ORDER BY s.created_at DESC"
850 );
851 let conn = self.conn();
852 let mut stmt = conn.prepare(&sql)?;
853 let rows = stmt.query_map(params![tool], row_to_local_session)?;
854 let mut result = Vec::new();
855 for row in rows {
856 result.push(row?);
857 }
858
859 result.truncate(count as usize);
860 Ok(result)
861 }
862
863 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
865 let sql = format!(
866 "SELECT {LOCAL_SESSION_COLUMNS} \
867 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
868 ORDER BY s.created_at DESC"
869 );
870 let conn = self.conn();
871 let mut stmt = conn.prepare(&sql)?;
872 let rows = stmt.query_map([], row_to_local_session)?;
873 let mut result = Vec::new();
874 for row in rows {
875 result.push(row?);
876 }
877
878 result.truncate(count as usize);
879 Ok(result)
880 }
881
882 pub fn get_session_by_tool_offset(
884 &self,
885 tool: &str,
886 offset: u32,
887 ) -> Result<Option<LocalSessionRow>> {
888 let sql = format!(
889 "SELECT {LOCAL_SESSION_COLUMNS} \
890 {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
891 ORDER BY s.created_at DESC"
892 );
893 let conn = self.conn();
894 let mut stmt = conn.prepare(&sql)?;
895 let rows = stmt.query_map(params![tool], row_to_local_session)?;
896 let result = rows.collect::<Result<Vec<_>, _>>()?;
897 Ok(result.into_iter().nth(offset as usize))
898 }
899
900 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
902 let sql = format!(
903 "SELECT {LOCAL_SESSION_COLUMNS} \
904 {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
905 ORDER BY s.created_at DESC"
906 );
907 let conn = self.conn();
908 let mut stmt = conn.prepare(&sql)?;
909 let rows = stmt.query_map([], row_to_local_session)?;
910 let result = rows.collect::<Result<Vec<_>, _>>()?;
911 Ok(result.into_iter().nth(offset as usize))
912 }
913
914 pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
916 let conn = self.conn();
917 let result = conn
918 .query_row(
919 "SELECT source_path FROM session_sync WHERE session_id = ?1",
920 params![session_id],
921 |row| row.get(0),
922 )
923 .optional()?;
924
925 Ok(result)
926 }
927
928 pub fn session_count(&self) -> Result<i64> {
930 let count = self
931 .conn()
932 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
933 Ok(count)
934 }
935
936 pub fn delete_session(&self, session_id: &str) -> Result<()> {
939 let conn = self.conn();
940 conn.execute(
941 "DELETE FROM body_cache WHERE session_id = ?1",
942 params![session_id],
943 )?;
944 conn.execute(
945 "DELETE FROM session_sync WHERE session_id = ?1",
946 params![session_id],
947 )?;
948 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
949 Ok(())
950 }
951
952 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
955 let cursor = self
956 .conn()
957 .query_row(
958 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
959 params![team_id],
960 |row| row.get(0),
961 )
962 .optional()?;
963 Ok(cursor)
964 }
965
966 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
967 self.conn().execute(
968 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
969 VALUES (?1, ?2, datetime('now')) \
970 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
971 params![team_id, cursor],
972 )?;
973 Ok(())
974 }
975
976 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
980 let sql = format!(
981 "SELECT {LOCAL_SESSION_COLUMNS} \
982 FROM sessions s \
983 INNER JOIN session_sync ss ON ss.session_id = s.id \
984 LEFT JOIN users u ON u.id = s.user_id \
985 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
986 ORDER BY s.created_at ASC"
987 );
988 let conn = self.conn();
989 let mut stmt = conn.prepare(&sql)?;
990 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
991 let mut result = Vec::new();
992 for row in rows {
993 result.push(row?);
994 }
995 Ok(result)
996 }
997
998 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
999 self.conn().execute(
1000 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
1001 WHERE session_id = ?1",
1002 params![session_id],
1003 )?;
1004 Ok(())
1005 }
1006
1007 pub fn was_uploaded_after(
1009 &self,
1010 source_path: &str,
1011 modified: &chrono::DateTime<chrono::Utc>,
1012 ) -> Result<bool> {
1013 let result: Option<String> = self
1014 .conn()
1015 .query_row(
1016 "SELECT last_synced_at FROM session_sync \
1017 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1018 params![source_path],
1019 |row| row.get(0),
1020 )
1021 .optional()?;
1022
1023 if let Some(synced_at) = result {
1024 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1025 return Ok(dt >= *modified);
1026 }
1027 }
1028 Ok(false)
1029 }
1030
1031 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1034 self.conn().execute(
1035 "INSERT INTO body_cache (session_id, body, cached_at) \
1036 VALUES (?1, ?2, datetime('now')) \
1037 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1038 params![session_id, body],
1039 )?;
1040 Ok(())
1041 }
1042
1043 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1044 let body = self
1045 .conn()
1046 .query_row(
1047 "SELECT body FROM body_cache WHERE session_id = ?1",
1048 params![session_id],
1049 |row| row.get(0),
1050 )
1051 .optional()?;
1052 Ok(body)
1053 }
1054
1055 pub fn migrate_from_state_json(
1060 &self,
1061 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1062 ) -> Result<usize> {
1063 let mut count = 0;
1064 for (path, uploaded_at) in uploaded {
1065 let exists: bool = self
1066 .conn()
1067 .query_row(
1068 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1069 params![path],
1070 |row| row.get(0),
1071 )
1072 .unwrap_or(false);
1073
1074 if exists {
1075 self.conn().execute(
1076 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1077 WHERE source_path = ?2 AND sync_status = 'local_only'",
1078 params![uploaded_at.to_rfc3339(), path],
1079 )?;
1080 count += 1;
1081 }
1082 }
1083 Ok(count)
1084 }
1085
1086 pub fn link_commit_session(
1090 &self,
1091 commit_hash: &str,
1092 session_id: &str,
1093 repo_path: Option<&str>,
1094 branch: Option<&str>,
1095 ) -> Result<()> {
1096 self.conn().execute(
1097 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1098 VALUES (?1, ?2, ?3, ?4) \
1099 ON CONFLICT(commit_hash, session_id) DO NOTHING",
1100 params![commit_hash, session_id, repo_path, branch],
1101 )?;
1102 Ok(())
1103 }
1104
1105 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1107 let sql = format!(
1108 "SELECT {LOCAL_SESSION_COLUMNS} \
1109 {FROM_CLAUSE} \
1110 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1111 WHERE csl.commit_hash = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
1112 ORDER BY s.created_at DESC"
1113 );
1114 let conn = self.conn();
1115 let mut stmt = conn.prepare(&sql)?;
1116 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1117 let mut result = Vec::new();
1118 for row in rows {
1119 result.push(row?);
1120 }
1121 Ok(result)
1122 }
1123
1124 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1126 let conn = self.conn();
1127 let mut stmt = conn.prepare(
1128 "SELECT commit_hash, session_id, repo_path, branch, created_at \
1129 FROM commit_session_links WHERE session_id = ?1 \
1130 ORDER BY created_at DESC",
1131 )?;
1132 let rows = stmt.query_map(params![session_id], |row| {
1133 Ok(CommitLink {
1134 commit_hash: row.get(0)?,
1135 session_id: row.get(1)?,
1136 repo_path: row.get(2)?,
1137 branch: row.get(3)?,
1138 created_at: row.get(4)?,
1139 })
1140 })?;
1141 let mut result = Vec::new();
1142 for row in rows {
1143 result.push(row?);
1144 }
1145 Ok(result)
1146 }
1147
1148 pub fn find_active_session_for_repo(
1152 &self,
1153 repo_path: &str,
1154 since_minutes: u32,
1155 ) -> Result<Option<LocalSessionRow>> {
1156 let sql = format!(
1157 "SELECT {LOCAL_SESSION_COLUMNS} \
1158 {FROM_CLAUSE} \
1159 WHERE s.working_directory LIKE ?1 \
1160 AND COALESCE(s.is_auxiliary, 0) = 0 \
1161 AND s.created_at >= datetime('now', ?2) \
1162 ORDER BY s.created_at DESC LIMIT 1"
1163 );
1164 let since = format!("-{since_minutes} minutes");
1165 let like = format!("{repo_path}%");
1166 let conn = self.conn();
1167 let mut stmt = conn.prepare(&sql)?;
1168 let row = stmt
1169 .query_map(params![like, since], row_to_local_session)?
1170 .next()
1171 .transpose()?;
1172 Ok(row)
1173 }
1174
1175 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1177 let conn = self.conn();
1178 let mut stmt = conn
1179 .prepare("SELECT id FROM sessions")
1180 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1181 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1182 let mut set = std::collections::HashSet::new();
1183 if let Ok(rows) = rows {
1184 for row in rows.flatten() {
1185 set.insert(row);
1186 }
1187 }
1188 set
1189 }
1190
1191 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1193 let title = session.context.title.as_deref();
1194 let description = session.context.description.as_deref();
1195 let (files_modified, files_read, has_errors) =
1196 opensession_core::extract::extract_file_metadata(session);
1197 let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1198 let is_auxiliary = is_auxiliary_session(session);
1199
1200 self.conn().execute(
1201 "UPDATE sessions SET \
1202 title=?2, description=?3, \
1203 message_count=?4, user_message_count=?5, task_count=?6, \
1204 event_count=?7, duration_seconds=?8, \
1205 total_input_tokens=?9, total_output_tokens=?10, \
1206 files_modified=?11, files_read=?12, has_errors=?13, \
1207 max_active_agents=?14, is_auxiliary=?15 \
1208 WHERE id=?1",
1209 params![
1210 &session.session_id,
1211 title,
1212 description,
1213 session.stats.message_count as i64,
1214 session.stats.user_message_count as i64,
1215 session.stats.task_count as i64,
1216 session.stats.event_count as i64,
1217 session.stats.duration_seconds as i64,
1218 session.stats.total_input_tokens as i64,
1219 session.stats.total_output_tokens as i64,
1220 &files_modified,
1221 &files_read,
1222 has_errors,
1223 max_active_agents,
1224 is_auxiliary as i64,
1225 ],
1226 )?;
1227 Ok(())
1228 }
1229
1230 pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1232 self.conn().execute(
1233 "INSERT INTO session_sync (session_id, source_path) \
1234 VALUES (?1, ?2) \
1235 ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1236 params![session_id, source_path],
1237 )?;
1238 Ok(())
1239 }
1240
1241 pub fn list_repos(&self) -> Result<Vec<String>> {
1243 let conn = self.conn();
1244 let mut stmt = conn.prepare(
1245 "SELECT DISTINCT git_repo_name FROM sessions \
1246 WHERE git_repo_name IS NOT NULL AND COALESCE(is_auxiliary, 0) = 0 \
1247 ORDER BY git_repo_name ASC",
1248 )?;
1249 let rows = stmt.query_map([], |row| row.get(0))?;
1250 let mut result = Vec::new();
1251 for row in rows {
1252 result.push(row?);
1253 }
1254 Ok(result)
1255 }
1256
1257 pub fn list_working_directories(&self) -> Result<Vec<String>> {
1259 let conn = self.conn();
1260 let mut stmt = conn.prepare(
1261 "SELECT DISTINCT working_directory FROM sessions \
1262 WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1263 AND COALESCE(is_auxiliary, 0) = 0 \
1264 ORDER BY working_directory ASC",
1265 )?;
1266 let rows = stmt.query_map([], |row| row.get(0))?;
1267 let mut result = Vec::new();
1268 for row in rows {
1269 result.push(row?);
1270 }
1271 Ok(result)
1272 }
1273}
1274
1275fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1278 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1279 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1280
1281 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1283
1284 apply_local_migrations(&conn)?;
1285
1286 ensure_sessions_columns(&conn)?;
1289 repair_session_tools_from_source_path(&conn)?;
1290 validate_local_schema(&conn)?;
1291
1292 Ok(conn)
1293}
1294
1295fn apply_local_migrations(conn: &Connection) -> Result<()> {
1296 conn.execute_batch(
1297 "CREATE TABLE IF NOT EXISTS _migrations (
1298 id INTEGER PRIMARY KEY,
1299 name TEXT NOT NULL UNIQUE,
1300 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1301 );",
1302 )
1303 .context("create _migrations table for local db")?;
1304
1305 for (name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1306 let already_applied: bool = conn
1307 .query_row(
1308 "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1309 [name],
1310 |row| row.get(0),
1311 )
1312 .unwrap_or(false);
1313
1314 if already_applied {
1315 continue;
1316 }
1317
1318 if let Err(e) = conn.execute_batch(sql) {
1319 let msg = e.to_string().to_ascii_lowercase();
1320 if !is_local_migration_compat_error(&msg) {
1321 return Err(e).with_context(|| format!("apply local migration {name}"));
1322 }
1323 }
1324
1325 conn.execute(
1326 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1327 [name],
1328 )
1329 .with_context(|| format!("record local migration {name}"))?;
1330 }
1331
1332 Ok(())
1333}
1334
1335fn is_local_migration_compat_error(msg: &str) -> bool {
1336 msg.contains("duplicate column name")
1337 || msg.contains("no such column")
1338 || msg.contains("already exists")
1339}
1340
1341fn validate_local_schema(conn: &Connection) -> Result<()> {
1342 let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1343 conn.prepare(&sql)
1344 .map(|_| ())
1345 .context("validate local session schema")
1346}
1347
1348fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1349 let mut stmt = conn.prepare(
1350 "SELECT s.id, s.tool, ss.source_path \
1351 FROM sessions s \
1352 LEFT JOIN session_sync ss ON ss.session_id = s.id \
1353 WHERE ss.source_path IS NOT NULL",
1354 )?;
1355 let rows = stmt.query_map([], |row| {
1356 Ok((
1357 row.get::<_, String>(0)?,
1358 row.get::<_, String>(1)?,
1359 row.get::<_, Option<String>>(2)?,
1360 ))
1361 })?;
1362
1363 let mut updates: Vec<(String, String)> = Vec::new();
1364 for row in rows {
1365 let (id, current_tool, source_path) = row?;
1366 let normalized = normalize_tool_for_source_path(¤t_tool, source_path.as_deref());
1367 if normalized != current_tool {
1368 updates.push((id, normalized));
1369 }
1370 }
1371 drop(stmt);
1372
1373 for (id, tool) in updates {
1374 conn.execute(
1375 "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1376 params![tool, id],
1377 )?;
1378 }
1379
1380 Ok(())
1381}
1382
1383fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1384 let msg = format!("{err:#}").to_ascii_lowercase();
1385 msg.contains("no such column")
1386 || msg.contains("no such table")
1387 || msg.contains("cannot add a column")
1388 || msg.contains("already exists")
1389 || msg.contains("views may not be indexed")
1390 || msg.contains("malformed database schema")
1391 || msg.contains("duplicate column name")
1392}
1393
1394fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1395 if !path.exists() {
1396 return Ok(());
1397 }
1398
1399 let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1400 let backup_name = format!(
1401 "{}.legacy-{}.bak",
1402 path.file_name()
1403 .and_then(|n| n.to_str())
1404 .unwrap_or("local.db"),
1405 ts
1406 );
1407 let backup_path = path.with_file_name(backup_name);
1408 std::fs::rename(path, &backup_path).with_context(|| {
1409 format!(
1410 "rotate local db backup {} -> {}",
1411 path.display(),
1412 backup_path.display()
1413 )
1414 })?;
1415
1416 let wal = PathBuf::from(format!("{}-wal", path.display()));
1417 let shm = PathBuf::from(format!("{}-shm", path.display()));
1418 let _ = std::fs::remove_file(wal);
1419 let _ = std::fs::remove_file(shm);
1420 Ok(())
1421}
1422
1423const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1424 ("user_id", "TEXT"),
1425 ("team_id", "TEXT DEFAULT 'personal'"),
1426 ("tool", "TEXT DEFAULT ''"),
1427 ("agent_provider", "TEXT"),
1428 ("agent_model", "TEXT"),
1429 ("title", "TEXT"),
1430 ("description", "TEXT"),
1431 ("tags", "TEXT"),
1432 ("created_at", "TEXT DEFAULT ''"),
1433 ("uploaded_at", "TEXT DEFAULT ''"),
1434 ("message_count", "INTEGER DEFAULT 0"),
1435 ("user_message_count", "INTEGER DEFAULT 0"),
1436 ("task_count", "INTEGER DEFAULT 0"),
1437 ("event_count", "INTEGER DEFAULT 0"),
1438 ("duration_seconds", "INTEGER DEFAULT 0"),
1439 ("total_input_tokens", "INTEGER DEFAULT 0"),
1440 ("total_output_tokens", "INTEGER DEFAULT 0"),
1441 ("body_storage_key", "TEXT DEFAULT ''"),
1443 ("body_url", "TEXT"),
1444 ("git_remote", "TEXT"),
1445 ("git_branch", "TEXT"),
1446 ("git_commit", "TEXT"),
1447 ("git_repo_name", "TEXT"),
1448 ("pr_number", "INTEGER"),
1449 ("pr_url", "TEXT"),
1450 ("working_directory", "TEXT"),
1451 ("files_modified", "TEXT"),
1452 ("files_read", "TEXT"),
1453 ("has_errors", "BOOLEAN DEFAULT 0"),
1454 ("max_active_agents", "INTEGER DEFAULT 1"),
1455 ("is_auxiliary", "INTEGER NOT NULL DEFAULT 0"),
1456];
1457
1458fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1459 let mut existing = HashSet::new();
1460 let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1461 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1462 for row in rows {
1463 existing.insert(row?);
1464 }
1465
1466 for (name, decl) in REQUIRED_SESSION_COLUMNS {
1467 if existing.contains(*name) {
1468 continue;
1469 }
1470 let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1471 conn.execute_batch(&sql)
1472 .with_context(|| format!("add sessions column '{name}'"))?;
1473 }
1474
1475 Ok(())
1476}
1477
1478pub const LOCAL_SESSION_COLUMNS: &str = "\
1480s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1481s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1482s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1483s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1484s.total_input_tokens, s.total_output_tokens, \
1485s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1486s.pr_number, s.pr_url, s.working_directory, \
1487s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1), COALESCE(s.is_auxiliary, 0)";
1488
1489fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1490 let source_path: Option<String> = row.get(1)?;
1491 let tool: String = row.get(7)?;
1492 let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1493
1494 Ok(LocalSessionRow {
1495 id: row.get(0)?,
1496 source_path,
1497 sync_status: row.get(2)?,
1498 last_synced_at: row.get(3)?,
1499 user_id: row.get(4)?,
1500 nickname: row.get(5)?,
1501 team_id: row.get(6)?,
1502 tool: normalized_tool,
1503 agent_provider: row.get(8)?,
1504 agent_model: row.get(9)?,
1505 title: row.get(10)?,
1506 description: row.get(11)?,
1507 tags: row.get(12)?,
1508 created_at: row.get(13)?,
1509 uploaded_at: row.get(14)?,
1510 message_count: row.get(15)?,
1511 user_message_count: row.get(16)?,
1512 task_count: row.get(17)?,
1513 event_count: row.get(18)?,
1514 duration_seconds: row.get(19)?,
1515 total_input_tokens: row.get(20)?,
1516 total_output_tokens: row.get(21)?,
1517 git_remote: row.get(22)?,
1518 git_branch: row.get(23)?,
1519 git_commit: row.get(24)?,
1520 git_repo_name: row.get(25)?,
1521 pr_number: row.get(26)?,
1522 pr_url: row.get(27)?,
1523 working_directory: row.get(28)?,
1524 files_modified: row.get(29)?,
1525 files_read: row.get(30)?,
1526 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1527 max_active_agents: row.get(32).unwrap_or(1),
1528 is_auxiliary: row.get::<_, i64>(33).unwrap_or(0) != 0,
1529 })
1530}
1531
1532fn default_db_path() -> Result<PathBuf> {
1533 let home = std::env::var("HOME")
1534 .or_else(|_| std::env::var("USERPROFILE"))
1535 .context("Could not determine home directory")?;
1536 Ok(PathBuf::from(home)
1537 .join(".local")
1538 .join("share")
1539 .join("opensession")
1540 .join("local.db"))
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use super::*;
1546
1547 use std::fs::{create_dir_all, write};
1548 use tempfile::tempdir;
1549
1550 fn test_db() -> LocalDb {
1551 let dir = tempdir().unwrap();
1552 let path = dir.keep().join("test.db");
1553 LocalDb::open_path(&path).unwrap()
1554 }
1555
1556 fn temp_root() -> tempfile::TempDir {
1557 tempdir().unwrap()
1558 }
1559
1560 fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1561 LocalSessionRow {
1562 id: id.to_string(),
1563 source_path: source_path.map(String::from),
1564 sync_status: "local_only".to_string(),
1565 last_synced_at: None,
1566 user_id: None,
1567 nickname: None,
1568 team_id: None,
1569 tool: tool.to_string(),
1570 agent_provider: None,
1571 agent_model: None,
1572 title: Some("test".to_string()),
1573 description: None,
1574 tags: None,
1575 created_at: "2024-01-01T00:00:00Z".to_string(),
1576 uploaded_at: None,
1577 message_count: 0,
1578 user_message_count: 0,
1579 task_count: 0,
1580 event_count: 0,
1581 duration_seconds: 0,
1582 total_input_tokens: 0,
1583 total_output_tokens: 0,
1584 git_remote: None,
1585 git_branch: None,
1586 git_commit: None,
1587 git_repo_name: None,
1588 pr_number: None,
1589 pr_url: None,
1590 working_directory: None,
1591 files_modified: None,
1592 files_read: None,
1593 has_errors: false,
1594 max_active_agents: 1,
1595 is_auxiliary: false,
1596 }
1597 }
1598
1599 #[test]
1600 fn test_open_and_schema() {
1601 let _db = test_db();
1602 }
1603
1604 #[test]
1605 fn test_open_repairs_codex_tool_hint_from_source_path() {
1606 let dir = tempfile::tempdir().unwrap();
1607 let path = dir.path().join("repair.db");
1608
1609 {
1610 let _ = LocalDb::open_path(&path).unwrap();
1611 }
1612
1613 {
1614 let conn = Connection::open(&path).unwrap();
1615 conn.execute(
1616 "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1617 params!["rollout-repair", "2026-02-20T00:00:00Z"],
1618 )
1619 .unwrap();
1620 conn.execute(
1621 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1622 params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1623 )
1624 .unwrap();
1625 }
1626
1627 let db = LocalDb::open_path(&path).unwrap();
1628 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1629 let row = rows
1630 .iter()
1631 .find(|row| row.id == "rollout-repair")
1632 .expect("repaired row");
1633 assert_eq!(row.tool, "codex");
1634 }
1635
1636 #[test]
1637 fn test_upsert_local_session_normalizes_tool_from_source_path() {
1638 let db = test_db();
1639 let mut session = Session::new(
1640 "rollout-upsert".to_string(),
1641 opensession_core::trace::Agent {
1642 provider: "openai".to_string(),
1643 model: "gpt-5".to_string(),
1644 tool: "claude-code".to_string(),
1645 tool_version: None,
1646 },
1647 );
1648 session.stats.event_count = 1;
1649
1650 db.upsert_local_session(
1651 &session,
1652 "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
1653 &crate::git::GitContext::default(),
1654 )
1655 .unwrap();
1656
1657 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1658 let row = rows
1659 .iter()
1660 .find(|row| row.id == "rollout-upsert")
1661 .expect("upserted row");
1662 assert_eq!(row.tool, "codex");
1663 }
1664
1665 #[test]
1666 fn test_upsert_local_session_preserves_existing_git_when_session_has_no_git_metadata() {
1667 let db = test_db();
1668 let mut session = Session::new(
1669 "preserve-git".to_string(),
1670 opensession_core::trace::Agent {
1671 provider: "openai".to_string(),
1672 model: "gpt-5".to_string(),
1673 tool: "codex".to_string(),
1674 tool_version: None,
1675 },
1676 );
1677 session.stats.event_count = 1;
1678
1679 let first_git = crate::git::GitContext {
1680 remote: Some("https://github.com/acme/repo.git".to_string()),
1681 branch: Some("feature/original".to_string()),
1682 commit: Some("1111111".to_string()),
1683 repo_name: Some("acme/repo".to_string()),
1684 };
1685 db.upsert_local_session(
1686 &session,
1687 "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
1688 &first_git,
1689 )
1690 .unwrap();
1691
1692 let second_git = crate::git::GitContext {
1693 remote: Some("https://github.com/acme/repo.git".to_string()),
1694 branch: Some("feature/current-head".to_string()),
1695 commit: Some("2222222".to_string()),
1696 repo_name: Some("acme/repo".to_string()),
1697 };
1698 db.upsert_local_session(
1699 &session,
1700 "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
1701 &second_git,
1702 )
1703 .unwrap();
1704
1705 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1706 let row = rows
1707 .iter()
1708 .find(|row| row.id == "preserve-git")
1709 .expect("row exists");
1710 assert_eq!(row.git_branch.as_deref(), Some("feature/original"));
1711 assert_eq!(row.git_commit.as_deref(), Some("1111111"));
1712 }
1713
1714 #[test]
1715 fn test_upsert_local_session_prefers_git_branch_from_session_attributes() {
1716 let db = test_db();
1717 let mut session = Session::new(
1718 "session-git-branch".to_string(),
1719 opensession_core::trace::Agent {
1720 provider: "anthropic".to_string(),
1721 model: "claude-opus-4-6".to_string(),
1722 tool: "claude-code".to_string(),
1723 tool_version: None,
1724 },
1725 );
1726 session.stats.event_count = 1;
1727 session.context.attributes.insert(
1728 "git_branch".to_string(),
1729 serde_json::Value::String("from-session".to_string()),
1730 );
1731
1732 let fallback_git = crate::git::GitContext {
1733 remote: Some("https://github.com/acme/repo.git".to_string()),
1734 branch: Some("fallback-branch".to_string()),
1735 commit: Some("aaaaaaaa".to_string()),
1736 repo_name: Some("acme/repo".to_string()),
1737 };
1738 db.upsert_local_session(
1739 &session,
1740 "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
1741 &fallback_git,
1742 )
1743 .unwrap();
1744
1745 session.context.attributes.insert(
1746 "git_branch".to_string(),
1747 serde_json::Value::String("from-session-updated".to_string()),
1748 );
1749 db.upsert_local_session(
1750 &session,
1751 "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
1752 &fallback_git,
1753 )
1754 .unwrap();
1755
1756 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1757 let row = rows
1758 .iter()
1759 .find(|row| row.id == "session-git-branch")
1760 .expect("row exists");
1761 assert_eq!(row.git_branch.as_deref(), Some("from-session-updated"));
1762 }
1763
1764 #[test]
1765 fn test_open_backfills_legacy_sessions_columns() {
1766 let dir = tempfile::tempdir().unwrap();
1767 let path = dir.path().join("legacy.db");
1768 {
1769 let conn = Connection::open(&path).unwrap();
1770 conn.execute_batch(
1771 "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1772 INSERT INTO sessions (id) VALUES ('legacy-1');",
1773 )
1774 .unwrap();
1775 }
1776
1777 let db = LocalDb::open_path(&path).unwrap();
1778 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1779 assert_eq!(rows.len(), 1);
1780 assert_eq!(rows[0].id, "legacy-1");
1781 assert_eq!(rows[0].user_message_count, 0);
1782 }
1783
1784 #[test]
1785 fn test_open_rotates_incompatible_legacy_schema() {
1786 let dir = tempfile::tempdir().unwrap();
1787 let path = dir.path().join("broken.db");
1788 {
1789 let conn = Connection::open(&path).unwrap();
1790 conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1791 .unwrap();
1792 }
1793
1794 let db = LocalDb::open_path(&path).unwrap();
1795 let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1796 assert!(rows.is_empty());
1797
1798 let rotated = std::fs::read_dir(dir.path())
1799 .unwrap()
1800 .filter_map(Result::ok)
1801 .any(|entry| {
1802 let name = entry.file_name();
1803 let name = name.to_string_lossy();
1804 name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1805 });
1806 assert!(rotated, "expected rotated legacy backup file");
1807 }
1808
1809 #[test]
1810 fn test_is_opencode_child_session() {
1811 let root = temp_root();
1812 let dir = root.path().join("sessions");
1813 create_dir_all(&dir).unwrap();
1814 let parent_session = dir.join("parent.json");
1815 write(
1816 &parent_session,
1817 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1818 )
1819 .unwrap();
1820 let child_session = dir.join("child.json");
1821 write(
1822 &child_session,
1823 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1824 )
1825 .unwrap();
1826
1827 let parent = make_row(
1828 "ses_parent",
1829 "opencode",
1830 Some(parent_session.to_str().unwrap()),
1831 );
1832 let mut child = make_row(
1833 "ses_child",
1834 "opencode",
1835 Some(child_session.to_str().unwrap()),
1836 );
1837 child.is_auxiliary = true;
1838 let mut codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1839 codex.is_auxiliary = true;
1840
1841 assert!(!is_opencode_child_session(&parent));
1842 assert!(is_opencode_child_session(&child));
1843 assert!(!is_opencode_child_session(&codex));
1844 }
1845
1846 #[allow(deprecated)]
1847 #[test]
1848 fn test_parse_opencode_parent_session_id_aliases() {
1849 let root = temp_root();
1850 let dir = root.path().join("session-aliases");
1851 create_dir_all(&dir).unwrap();
1852 let child_session = dir.join("child.json");
1853 write(
1854 &child_session,
1855 r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1856 )
1857 .unwrap();
1858 assert_eq!(
1859 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1860 Some("ses_parent")
1861 );
1862 }
1863
1864 #[allow(deprecated)]
1865 #[test]
1866 fn test_parse_opencode_parent_session_id_nested_metadata() {
1867 let root = temp_root();
1868 let dir = root.path().join("session-nested");
1869 create_dir_all(&dir).unwrap();
1870 let child_session = dir.join("child.json");
1871 write(
1872 &child_session,
1873 r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1874 )
1875 .unwrap();
1876 assert_eq!(
1877 parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1878 Some("ses_parent")
1879 );
1880 }
1881
1882 #[test]
1883 fn test_hide_opencode_child_sessions() {
1884 let root = temp_root();
1885 let dir = root.path().join("sessions");
1886 create_dir_all(&dir).unwrap();
1887 let parent_session = dir.join("parent.json");
1888 let child_session = dir.join("child.json");
1889 let orphan_session = dir.join("orphan.json");
1890
1891 write(
1892 &parent_session,
1893 r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1894 )
1895 .unwrap();
1896 write(
1897 &child_session,
1898 r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1899 )
1900 .unwrap();
1901 write(
1902 &orphan_session,
1903 r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1904 )
1905 .unwrap();
1906
1907 let rows = vec![
1908 {
1909 let mut row = make_row(
1910 "ses_child",
1911 "opencode",
1912 Some(child_session.to_str().unwrap()),
1913 );
1914 row.is_auxiliary = true;
1915 row
1916 },
1917 make_row(
1918 "ses_parent",
1919 "opencode",
1920 Some(parent_session.to_str().unwrap()),
1921 ),
1922 {
1923 let mut row = make_row("ses_other", "codex", None);
1924 row.user_message_count = 1;
1925 row
1926 },
1927 make_row(
1928 "ses_orphan",
1929 "opencode",
1930 Some(orphan_session.to_str().unwrap()),
1931 ),
1932 ];
1933
1934 let filtered = hide_opencode_child_sessions(rows);
1935 assert_eq!(filtered.len(), 3);
1936 assert!(filtered.iter().all(|r| r.id != "ses_child"));
1937 }
1938
1939 #[test]
1940 fn test_sync_cursor() {
1941 let db = test_db();
1942 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1943 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1944 assert_eq!(
1945 db.get_sync_cursor("team1").unwrap(),
1946 Some("2024-01-01T00:00:00Z".to_string())
1947 );
1948 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1950 assert_eq!(
1951 db.get_sync_cursor("team1").unwrap(),
1952 Some("2024-06-01T00:00:00Z".to_string())
1953 );
1954 }
1955
1956 #[test]
1957 fn test_body_cache() {
1958 let db = test_db();
1959 assert_eq!(db.get_cached_body("s1").unwrap(), None);
1960 db.cache_body("s1", b"hello world").unwrap();
1961 assert_eq!(
1962 db.get_cached_body("s1").unwrap(),
1963 Some(b"hello world".to_vec())
1964 );
1965 }
1966
1967 #[test]
1968 fn test_local_migrations_are_loaded_from_api_crate() {
1969 let migration_names: Vec<&str> = super::LOCAL_MIGRATIONS
1970 .iter()
1971 .map(|(name, _)| *name)
1972 .collect();
1973 assert!(
1974 migration_names.contains(&"local_0003_session_flags"),
1975 "expected local_0003_session_flags migration from opensession-api"
1976 );
1977
1978 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1979 let migrations_dir = manifest_dir.join("migrations");
1980 if migrations_dir.exists() {
1981 let sql_files = std::fs::read_dir(migrations_dir)
1982 .expect("read local-db migrations directory")
1983 .filter_map(Result::ok)
1984 .map(|entry| entry.file_name().to_string_lossy().to_string())
1985 .filter(|name| name.ends_with(".sql"))
1986 .collect::<Vec<_>>();
1987 assert!(
1988 sql_files.is_empty(),
1989 "local-db must not ship duplicated migration SQL files"
1990 );
1991 }
1992 }
1993
1994 #[test]
1995 fn test_local_0003_session_flags_backfills_known_auxiliary_rows() {
1996 let dir = tempdir().unwrap();
1997 let path = dir.path().join("local-legacy.db");
1998
1999 {
2000 let conn = Connection::open(&path).unwrap();
2001 conn.execute_batch("PRAGMA foreign_keys=OFF;").unwrap();
2002 conn.execute_batch(
2003 "CREATE TABLE IF NOT EXISTS _migrations (
2004 id INTEGER PRIMARY KEY,
2005 name TEXT NOT NULL UNIQUE,
2006 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
2007 );",
2008 )
2009 .unwrap();
2010
2011 for (name, sql) in super::MIGRATIONS.iter().chain(
2012 super::LOCAL_MIGRATIONS
2013 .iter()
2014 .filter(|(name, _)| *name != "local_0003_session_flags"),
2015 ) {
2016 conn.execute_batch(sql).unwrap();
2017 conn.execute(
2018 "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
2019 params![name],
2020 )
2021 .unwrap();
2022 }
2023
2024 conn.execute(
2025 "INSERT INTO sessions \
2026 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
2027 VALUES (?1, 'personal', 'codex', '2026-02-20T00:00:00Z', '', 12, 6, 6, 20)",
2028 params!["primary-visible"],
2029 )
2030 .unwrap();
2031 conn.execute(
2032 "INSERT INTO sessions \
2033 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
2034 VALUES (?1, 'personal', 'opencode', '2026-02-20T00:00:00Z', '', 2, 0, 2, 6)",
2035 params!["opencode-heuristic-child"],
2036 )
2037 .unwrap();
2038 conn.execute(
2039 "INSERT INTO sessions \
2040 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
2041 VALUES (?1, 'personal', 'claude-code', '2026-02-20T00:00:00Z', '', 3, 1, 3, 12)",
2042 params!["claude-subagent-child"],
2043 )
2044 .unwrap();
2045
2046 conn.execute(
2047 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2048 params![
2049 "opencode-heuristic-child",
2050 "/Users/test/.opencode/sessions/opencode-heuristic-child.json"
2051 ],
2052 )
2053 .unwrap();
2054 conn.execute(
2055 "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2056 params![
2057 "claude-subagent-child",
2058 "/Users/test/.claude/projects/foo/subagents/agent-1.jsonl"
2059 ],
2060 )
2061 .unwrap();
2062 }
2063
2064 let db = LocalDb::open_path(&path).unwrap();
2065
2066 let flags = {
2067 let conn = db.conn();
2068 let mut stmt = conn
2069 .prepare("SELECT id, COALESCE(is_auxiliary, 0) FROM sessions ORDER BY id")
2070 .unwrap();
2071 let rows = stmt
2072 .query_map([], |row| {
2073 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
2074 })
2075 .unwrap();
2076 rows.collect::<Result<Vec<_>, _>>().unwrap()
2077 };
2078
2079 assert_eq!(
2080 flags,
2081 vec![
2082 ("claude-subagent-child".to_string(), 1),
2083 ("opencode-heuristic-child".to_string(), 1),
2084 ("primary-visible".to_string(), 0),
2085 ]
2086 );
2087
2088 let visible = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2089 assert_eq!(visible.len(), 1);
2090 assert_eq!(visible[0].id, "primary-visible");
2091 }
2092
2093 #[test]
2094 fn test_upsert_remote_session() {
2095 let db = test_db();
2096 let summary = RemoteSessionSummary {
2097 id: "remote-1".to_string(),
2098 user_id: Some("u1".to_string()),
2099 nickname: Some("alice".to_string()),
2100 team_id: "t1".to_string(),
2101 tool: "claude-code".to_string(),
2102 agent_provider: None,
2103 agent_model: None,
2104 title: Some("Test session".to_string()),
2105 description: None,
2106 tags: None,
2107 created_at: "2024-01-01T00:00:00Z".to_string(),
2108 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2109 message_count: 10,
2110 task_count: 2,
2111 event_count: 20,
2112 duration_seconds: 300,
2113 total_input_tokens: 1000,
2114 total_output_tokens: 500,
2115 git_remote: None,
2116 git_branch: None,
2117 git_commit: None,
2118 git_repo_name: None,
2119 pr_number: None,
2120 pr_url: None,
2121 working_directory: None,
2122 files_modified: None,
2123 files_read: None,
2124 has_errors: false,
2125 max_active_agents: 1,
2126 };
2127 db.upsert_remote_session(&summary).unwrap();
2128
2129 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2130 assert_eq!(sessions.len(), 1);
2131 assert_eq!(sessions[0].id, "remote-1");
2132 assert_eq!(sessions[0].sync_status, "remote_only");
2133 assert_eq!(sessions[0].nickname, None); assert!(!sessions[0].is_auxiliary);
2135 }
2136
2137 #[test]
2138 fn test_list_filter_by_repo() {
2139 let db = test_db();
2140 let summary1 = RemoteSessionSummary {
2142 id: "s1".to_string(),
2143 user_id: None,
2144 nickname: None,
2145 team_id: "t1".to_string(),
2146 tool: "claude-code".to_string(),
2147 agent_provider: None,
2148 agent_model: None,
2149 title: Some("Session 1".to_string()),
2150 description: None,
2151 tags: None,
2152 created_at: "2024-01-01T00:00:00Z".to_string(),
2153 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2154 message_count: 5,
2155 task_count: 0,
2156 event_count: 10,
2157 duration_seconds: 60,
2158 total_input_tokens: 100,
2159 total_output_tokens: 50,
2160 git_remote: None,
2161 git_branch: None,
2162 git_commit: None,
2163 git_repo_name: None,
2164 pr_number: None,
2165 pr_url: None,
2166 working_directory: None,
2167 files_modified: None,
2168 files_read: None,
2169 has_errors: false,
2170 max_active_agents: 1,
2171 };
2172 db.upsert_remote_session(&summary1).unwrap();
2173
2174 let filter = LocalSessionFilter {
2176 team_id: Some("t1".to_string()),
2177 ..Default::default()
2178 };
2179 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2180
2181 let filter = LocalSessionFilter {
2182 team_id: Some("t999".to_string()),
2183 ..Default::default()
2184 };
2185 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2186 }
2187
2188 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2191 RemoteSessionSummary {
2192 id: id.to_string(),
2193 user_id: None,
2194 nickname: None,
2195 team_id: "t1".to_string(),
2196 tool: tool.to_string(),
2197 agent_provider: Some("anthropic".to_string()),
2198 agent_model: Some("claude-opus-4-6".to_string()),
2199 title: Some(title.to_string()),
2200 description: None,
2201 tags: None,
2202 created_at: created_at.to_string(),
2203 uploaded_at: created_at.to_string(),
2204 message_count: 5,
2205 task_count: 1,
2206 event_count: 10,
2207 duration_seconds: 300,
2208 total_input_tokens: 1000,
2209 total_output_tokens: 500,
2210 git_remote: None,
2211 git_branch: None,
2212 git_commit: None,
2213 git_repo_name: None,
2214 pr_number: None,
2215 pr_url: None,
2216 working_directory: None,
2217 files_modified: None,
2218 files_read: None,
2219 has_errors: false,
2220 max_active_agents: 1,
2221 }
2222 }
2223
2224 fn seed_sessions(db: &LocalDb) {
2225 db.upsert_remote_session(&make_summary(
2227 "s1",
2228 "claude-code",
2229 "First session",
2230 "2024-01-01T00:00:00Z",
2231 ))
2232 .unwrap();
2233 db.upsert_remote_session(&make_summary(
2234 "s2",
2235 "claude-code",
2236 "JWT auth work",
2237 "2024-01-02T00:00:00Z",
2238 ))
2239 .unwrap();
2240 db.upsert_remote_session(&make_summary(
2241 "s3",
2242 "gemini",
2243 "Gemini test",
2244 "2024-01-03T00:00:00Z",
2245 ))
2246 .unwrap();
2247 db.upsert_remote_session(&make_summary(
2248 "s4",
2249 "claude-code",
2250 "Error handling",
2251 "2024-01-04T00:00:00Z",
2252 ))
2253 .unwrap();
2254 db.upsert_remote_session(&make_summary(
2255 "s5",
2256 "claude-code",
2257 "Final polish",
2258 "2024-01-05T00:00:00Z",
2259 ))
2260 .unwrap();
2261 }
2262
2263 #[test]
2266 fn test_log_no_filters() {
2267 let db = test_db();
2268 seed_sessions(&db);
2269 let filter = LogFilter::default();
2270 let results = db.list_sessions_log(&filter).unwrap();
2271 assert_eq!(results.len(), 5);
2272 assert_eq!(results[0].id, "s5");
2274 assert_eq!(results[4].id, "s1");
2275 }
2276
2277 #[test]
2278 fn test_log_filter_by_tool() {
2279 let db = test_db();
2280 seed_sessions(&db);
2281 let filter = LogFilter {
2282 tool: Some("claude-code".to_string()),
2283 ..Default::default()
2284 };
2285 let results = db.list_sessions_log(&filter).unwrap();
2286 assert_eq!(results.len(), 4);
2287 assert!(results.iter().all(|s| s.tool == "claude-code"));
2288 }
2289
2290 #[test]
2291 fn test_log_filter_by_model_wildcard() {
2292 let db = test_db();
2293 seed_sessions(&db);
2294 let filter = LogFilter {
2295 model: Some("claude*".to_string()),
2296 ..Default::default()
2297 };
2298 let results = db.list_sessions_log(&filter).unwrap();
2299 assert_eq!(results.len(), 5); }
2301
2302 #[test]
2303 fn test_log_filter_since() {
2304 let db = test_db();
2305 seed_sessions(&db);
2306 let filter = LogFilter {
2307 since: Some("2024-01-03T00:00:00Z".to_string()),
2308 ..Default::default()
2309 };
2310 let results = db.list_sessions_log(&filter).unwrap();
2311 assert_eq!(results.len(), 3); }
2313
2314 #[test]
2315 fn test_log_filter_before() {
2316 let db = test_db();
2317 seed_sessions(&db);
2318 let filter = LogFilter {
2319 before: Some("2024-01-03T00:00:00Z".to_string()),
2320 ..Default::default()
2321 };
2322 let results = db.list_sessions_log(&filter).unwrap();
2323 assert_eq!(results.len(), 2); }
2325
2326 #[test]
2327 fn test_log_filter_since_and_before() {
2328 let db = test_db();
2329 seed_sessions(&db);
2330 let filter = LogFilter {
2331 since: Some("2024-01-02T00:00:00Z".to_string()),
2332 before: Some("2024-01-04T00:00:00Z".to_string()),
2333 ..Default::default()
2334 };
2335 let results = db.list_sessions_log(&filter).unwrap();
2336 assert_eq!(results.len(), 2); }
2338
2339 #[test]
2340 fn test_log_filter_grep() {
2341 let db = test_db();
2342 seed_sessions(&db);
2343 let filter = LogFilter {
2344 grep: Some("JWT".to_string()),
2345 ..Default::default()
2346 };
2347 let results = db.list_sessions_log(&filter).unwrap();
2348 assert_eq!(results.len(), 1);
2349 assert_eq!(results[0].id, "s2");
2350 }
2351
2352 #[test]
2353 fn test_log_limit_and_offset() {
2354 let db = test_db();
2355 seed_sessions(&db);
2356 let filter = LogFilter {
2357 limit: Some(2),
2358 offset: Some(1),
2359 ..Default::default()
2360 };
2361 let results = db.list_sessions_log(&filter).unwrap();
2362 assert_eq!(results.len(), 2);
2363 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
2365 }
2366
2367 #[test]
2368 fn test_log_limit_only() {
2369 let db = test_db();
2370 seed_sessions(&db);
2371 let filter = LogFilter {
2372 limit: Some(3),
2373 ..Default::default()
2374 };
2375 let results = db.list_sessions_log(&filter).unwrap();
2376 assert_eq!(results.len(), 3);
2377 }
2378
2379 #[test]
2380 fn test_list_sessions_limit_offset() {
2381 let db = test_db();
2382 seed_sessions(&db);
2383 let filter = LocalSessionFilter {
2384 limit: Some(2),
2385 offset: Some(1),
2386 ..Default::default()
2387 };
2388 let results = db.list_sessions(&filter).unwrap();
2389 assert_eq!(results.len(), 2);
2390 assert_eq!(results[0].id, "s4");
2391 assert_eq!(results[1].id, "s3");
2392 }
2393
2394 #[test]
2395 fn test_count_sessions_filtered() {
2396 let db = test_db();
2397 seed_sessions(&db);
2398 let count = db
2399 .count_sessions_filtered(&LocalSessionFilter::default())
2400 .unwrap();
2401 assert_eq!(count, 5);
2402 }
2403
2404 #[test]
2405 fn test_list_and_count_filters_match_when_auxiliary_rows_exist() {
2406 let db = test_db();
2407 seed_sessions(&db);
2408 db.conn()
2409 .execute(
2410 "UPDATE sessions SET is_auxiliary = 1 WHERE id IN ('s2', 's3')",
2411 [],
2412 )
2413 .unwrap();
2414
2415 let default_filter = LocalSessionFilter::default();
2416 let rows = db.list_sessions(&default_filter).unwrap();
2417 let count = db.count_sessions_filtered(&default_filter).unwrap();
2418 assert_eq!(rows.len() as i64, count);
2419 assert!(rows.iter().all(|row| !row.is_auxiliary));
2420
2421 let gemini_filter = LocalSessionFilter {
2422 tool: Some("gemini".to_string()),
2423 ..Default::default()
2424 };
2425 let gemini_rows = db.list_sessions(&gemini_filter).unwrap();
2426 let gemini_count = db.count_sessions_filtered(&gemini_filter).unwrap();
2427 assert_eq!(gemini_rows.len() as i64, gemini_count);
2428 assert!(gemini_rows.is_empty());
2429 assert_eq!(gemini_count, 0);
2430 }
2431
2432 #[test]
2433 fn test_exclude_low_signal_filter_hides_metadata_only_sessions() {
2434 let db = test_db();
2435
2436 let mut low_signal = make_summary("meta-only", "claude-code", "", "2024-01-01T00:00:00Z");
2437 low_signal.title = None;
2438 low_signal.message_count = 0;
2439 low_signal.task_count = 0;
2440 low_signal.event_count = 2;
2441 low_signal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2442
2443 let mut normal = make_summary(
2444 "real-work",
2445 "opencode",
2446 "Socket.IO decision",
2447 "2024-01-02T00:00:00Z",
2448 );
2449 normal.message_count = 14;
2450 normal.task_count = 2;
2451 normal.event_count = 38;
2452 normal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2453
2454 db.upsert_remote_session(&low_signal).unwrap();
2455 db.upsert_remote_session(&normal).unwrap();
2456
2457 let default_filter = LocalSessionFilter {
2458 git_repo_name: Some("frontend/aviss-react-front".to_string()),
2459 ..Default::default()
2460 };
2461 assert_eq!(db.list_sessions(&default_filter).unwrap().len(), 2);
2462 assert_eq!(db.count_sessions_filtered(&default_filter).unwrap(), 2);
2463
2464 let repo_filter = LocalSessionFilter {
2465 git_repo_name: Some("frontend/aviss-react-front".to_string()),
2466 exclude_low_signal: true,
2467 ..Default::default()
2468 };
2469 let rows = db.list_sessions(&repo_filter).unwrap();
2470 assert_eq!(rows.len(), 1);
2471 assert_eq!(rows[0].id, "real-work");
2472 assert_eq!(db.count_sessions_filtered(&repo_filter).unwrap(), 1);
2473 }
2474
2475 #[test]
2476 fn test_list_working_directories_distinct_non_empty() {
2477 let db = test_db();
2478
2479 let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2480 a.working_directory = Some("/tmp/repo-a".to_string());
2481 let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2482 b.working_directory = Some("/tmp/repo-a".to_string());
2483 let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2484 c.working_directory = Some("/tmp/repo-b".to_string());
2485 let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2486 d.working_directory = Some("".to_string());
2487
2488 db.upsert_remote_session(&a).unwrap();
2489 db.upsert_remote_session(&b).unwrap();
2490 db.upsert_remote_session(&c).unwrap();
2491 db.upsert_remote_session(&d).unwrap();
2492
2493 let dirs = db.list_working_directories().unwrap();
2494 assert_eq!(
2495 dirs,
2496 vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2497 );
2498 }
2499
2500 #[test]
2501 fn test_list_session_tools() {
2502 let db = test_db();
2503 seed_sessions(&db);
2504 let tools = db
2505 .list_session_tools(&LocalSessionFilter::default())
2506 .unwrap();
2507 assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2508 }
2509
2510 #[test]
2511 fn test_log_combined_filters() {
2512 let db = test_db();
2513 seed_sessions(&db);
2514 let filter = LogFilter {
2515 tool: Some("claude-code".to_string()),
2516 since: Some("2024-01-03T00:00:00Z".to_string()),
2517 limit: Some(1),
2518 ..Default::default()
2519 };
2520 let results = db.list_sessions_log(&filter).unwrap();
2521 assert_eq!(results.len(), 1);
2522 assert_eq!(results[0].id, "s5"); }
2524
2525 #[test]
2528 fn test_get_session_by_offset() {
2529 let db = test_db();
2530 seed_sessions(&db);
2531 let row = db.get_session_by_offset(0).unwrap().unwrap();
2532 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
2534 assert_eq!(row.id, "s3");
2535 assert!(db.get_session_by_offset(10).unwrap().is_none());
2536 }
2537
2538 #[test]
2539 fn test_get_session_by_tool_offset() {
2540 let db = test_db();
2541 seed_sessions(&db);
2542 let row = db
2543 .get_session_by_tool_offset("claude-code", 0)
2544 .unwrap()
2545 .unwrap();
2546 assert_eq!(row.id, "s5");
2547 let row = db
2548 .get_session_by_tool_offset("claude-code", 1)
2549 .unwrap()
2550 .unwrap();
2551 assert_eq!(row.id, "s4");
2552 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2553 assert_eq!(row.id, "s3");
2554 assert!(db
2555 .get_session_by_tool_offset("gemini", 1)
2556 .unwrap()
2557 .is_none());
2558 }
2559
2560 #[test]
2561 fn test_get_sessions_latest() {
2562 let db = test_db();
2563 seed_sessions(&db);
2564 let rows = db.get_sessions_latest(3).unwrap();
2565 assert_eq!(rows.len(), 3);
2566 assert_eq!(rows[0].id, "s5");
2567 assert_eq!(rows[1].id, "s4");
2568 assert_eq!(rows[2].id, "s3");
2569 }
2570
2571 #[test]
2572 fn test_get_sessions_by_tool_latest() {
2573 let db = test_db();
2574 seed_sessions(&db);
2575 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2576 assert_eq!(rows.len(), 2);
2577 assert_eq!(rows[0].id, "s5");
2578 assert_eq!(rows[1].id, "s4");
2579 }
2580
2581 #[test]
2582 fn test_get_sessions_latest_more_than_available() {
2583 let db = test_db();
2584 seed_sessions(&db);
2585 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2586 assert_eq!(rows.len(), 1); }
2588
2589 #[test]
2590 fn test_session_count() {
2591 let db = test_db();
2592 assert_eq!(db.session_count().unwrap(), 0);
2593 seed_sessions(&db);
2594 assert_eq!(db.session_count().unwrap(), 5);
2595 }
2596
2597 #[test]
2600 fn test_link_commit_session() {
2601 let db = test_db();
2602 seed_sessions(&db);
2603 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2604 .unwrap();
2605
2606 let commits = db.get_commits_by_session("s1").unwrap();
2607 assert_eq!(commits.len(), 1);
2608 assert_eq!(commits[0].commit_hash, "abc123");
2609 assert_eq!(commits[0].session_id, "s1");
2610 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2611 assert_eq!(commits[0].branch.as_deref(), Some("main"));
2612
2613 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2614 assert_eq!(sessions.len(), 1);
2615 assert_eq!(sessions[0].id, "s1");
2616 }
2617
2618 #[test]
2619 fn test_get_sessions_by_commit() {
2620 let db = test_db();
2621 seed_sessions(&db);
2622 db.link_commit_session("abc123", "s1", None, None).unwrap();
2624 db.link_commit_session("abc123", "s2", None, None).unwrap();
2625 db.link_commit_session("abc123", "s3", None, None).unwrap();
2626
2627 let sessions = db.get_sessions_by_commit("abc123").unwrap();
2628 assert_eq!(sessions.len(), 3);
2629 assert_eq!(sessions[0].id, "s3");
2631 assert_eq!(sessions[1].id, "s2");
2632 assert_eq!(sessions[2].id, "s1");
2633 }
2634
2635 #[test]
2636 fn test_get_commits_by_session() {
2637 let db = test_db();
2638 seed_sessions(&db);
2639 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2641 .unwrap();
2642 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2643 .unwrap();
2644 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2645 .unwrap();
2646
2647 let commits = db.get_commits_by_session("s1").unwrap();
2648 assert_eq!(commits.len(), 3);
2649 assert!(commits.iter().all(|c| c.session_id == "s1"));
2651 }
2652
2653 #[test]
2654 fn test_duplicate_link_ignored() {
2655 let db = test_db();
2656 seed_sessions(&db);
2657 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2658 .unwrap();
2659 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2661 .unwrap();
2662
2663 let commits = db.get_commits_by_session("s1").unwrap();
2664 assert_eq!(commits.len(), 1);
2665 }
2666
2667 #[test]
2668 fn test_log_filter_by_commit() {
2669 let db = test_db();
2670 seed_sessions(&db);
2671 db.link_commit_session("abc123", "s2", None, None).unwrap();
2672 db.link_commit_session("abc123", "s4", None, None).unwrap();
2673
2674 let filter = LogFilter {
2675 commit: Some("abc123".to_string()),
2676 ..Default::default()
2677 };
2678 let results = db.list_sessions_log(&filter).unwrap();
2679 assert_eq!(results.len(), 2);
2680 assert_eq!(results[0].id, "s4");
2681 assert_eq!(results[1].id, "s2");
2682
2683 let filter = LogFilter {
2685 commit: Some("nonexistent".to_string()),
2686 ..Default::default()
2687 };
2688 let results = db.list_sessions_log(&filter).unwrap();
2689 assert_eq!(results.len(), 0);
2690 }
2691}