1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api_types::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_api_types::SessionSummary;
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use std::path::PathBuf;
9use std::sync::Mutex;
10
11use git::GitContext;
12
13#[derive(Debug, Clone)]
15pub struct LocalSessionRow {
16 pub id: String,
17 pub source_path: Option<String>,
18 pub sync_status: String,
19 pub last_synced_at: Option<String>,
20 pub user_id: Option<String>,
21 pub nickname: Option<String>,
22 pub team_id: Option<String>,
23 pub tool: String,
24 pub agent_provider: Option<String>,
25 pub agent_model: Option<String>,
26 pub title: Option<String>,
27 pub description: Option<String>,
28 pub tags: Option<String>,
29 pub created_at: String,
30 pub uploaded_at: Option<String>,
31 pub message_count: i64,
32 pub task_count: i64,
33 pub event_count: i64,
34 pub duration_seconds: i64,
35 pub total_input_tokens: i64,
36 pub total_output_tokens: i64,
37 pub git_remote: Option<String>,
38 pub git_branch: Option<String>,
39 pub git_commit: Option<String>,
40 pub git_repo_name: Option<String>,
41 pub pr_number: Option<i64>,
42 pub pr_url: Option<String>,
43 pub working_directory: Option<String>,
44 pub files_modified: Option<String>,
45 pub files_read: Option<String>,
46 pub has_errors: bool,
47}
48
49#[derive(Debug, Clone)]
51pub struct CommitLink {
52 pub commit_hash: String,
53 pub session_id: String,
54 pub repo_path: Option<String>,
55 pub branch: Option<String>,
56 pub created_at: String,
57}
58
59#[derive(Debug, Default)]
61pub struct LocalSessionFilter {
62 pub team_id: Option<String>,
63 pub sync_status: Option<String>,
64 pub git_repo_name: Option<String>,
65 pub search: Option<String>,
66 pub tool: Option<String>,
67}
68
69#[derive(Debug, Default)]
71pub struct LogFilter {
72 pub tool: Option<String>,
74 pub model: Option<String>,
76 pub since: Option<String>,
78 pub before: Option<String>,
80 pub touches: Option<String>,
82 pub grep: Option<String>,
84 pub has_errors: Option<bool>,
86 pub working_directory: Option<String>,
88 pub git_repo_name: Option<String>,
90 pub commit: Option<String>,
92 pub limit: Option<u32>,
94 pub offset: Option<u32>,
96}
97
98const FROM_CLAUSE: &str = "\
100FROM sessions s \
101LEFT JOIN session_sync ss ON ss.session_id = s.id \
102LEFT JOIN users u ON u.id = s.user_id";
103
104pub struct LocalDb {
107 conn: Mutex<Connection>,
108}
109
110impl LocalDb {
111 pub fn open() -> Result<Self> {
114 let path = default_db_path()?;
115 Self::open_path(&path)
116 }
117
118 pub fn open_path(path: &PathBuf) -> Result<Self> {
120 if let Some(parent) = path.parent() {
121 std::fs::create_dir_all(parent)
122 .with_context(|| format!("create dir for {}", path.display()))?;
123 }
124 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
125 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
126
127 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
129
130 for (_name, sql) in MIGRATIONS {
131 conn.execute_batch(sql)?;
132 }
133 for (_name, sql) in LOCAL_MIGRATIONS {
134 conn.execute_batch(sql)?;
135 }
136
137 Ok(Self {
138 conn: Mutex::new(conn),
139 })
140 }
141
142 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
143 self.conn.lock().expect("local db mutex poisoned")
144 }
145
146 pub fn upsert_local_session(
149 &self,
150 session: &Session,
151 source_path: &str,
152 git: &GitContext,
153 ) -> Result<()> {
154 let title = session.context.title.as_deref();
155 let description = session.context.description.as_deref();
156 let tags = if session.context.tags.is_empty() {
157 None
158 } else {
159 Some(session.context.tags.join(","))
160 };
161 let created_at = session.context.created_at.to_rfc3339();
162 let cwd = session
163 .context
164 .attributes
165 .get("cwd")
166 .or_else(|| session.context.attributes.get("working_directory"))
167 .and_then(|v| v.as_str().map(String::from));
168
169 let (files_modified, files_read, has_errors) =
171 opensession_core::extract::extract_file_metadata(session);
172
173 let conn = self.conn();
174 conn.execute(
175 "INSERT INTO sessions \
176 (id, team_id, tool, agent_provider, agent_model, \
177 title, description, tags, created_at, \
178 message_count, task_count, event_count, duration_seconds, \
179 total_input_tokens, total_output_tokens, body_storage_key, \
180 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
181 files_modified, files_read, has_errors) \
182 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,'',?15,?16,?17,?18,?19,?20,?21,?22) \
183 ON CONFLICT(id) DO UPDATE SET \
184 tool=excluded.tool, agent_provider=excluded.agent_provider, \
185 agent_model=excluded.agent_model, \
186 title=excluded.title, description=excluded.description, \
187 tags=excluded.tags, \
188 message_count=excluded.message_count, task_count=excluded.task_count, \
189 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
190 total_input_tokens=excluded.total_input_tokens, \
191 total_output_tokens=excluded.total_output_tokens, \
192 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
193 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
194 working_directory=excluded.working_directory, \
195 files_modified=excluded.files_modified, files_read=excluded.files_read, \
196 has_errors=excluded.has_errors",
197 params![
198 &session.session_id,
199 &session.agent.tool,
200 &session.agent.provider,
201 &session.agent.model,
202 title,
203 description,
204 &tags,
205 &created_at,
206 session.stats.message_count as i64,
207 session.stats.task_count as i64,
208 session.stats.event_count as i64,
209 session.stats.duration_seconds as i64,
210 session.stats.total_input_tokens as i64,
211 session.stats.total_output_tokens as i64,
212 &git.remote,
213 &git.branch,
214 &git.commit,
215 &git.repo_name,
216 &cwd,
217 &files_modified,
218 &files_read,
219 has_errors,
220 ],
221 )?;
222
223 conn.execute(
224 "INSERT INTO session_sync (session_id, source_path, sync_status) \
225 VALUES (?1, ?2, 'local_only') \
226 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
227 params![&session.session_id, source_path],
228 )?;
229 Ok(())
230 }
231
232 pub fn upsert_remote_session(&self, summary: &SessionSummary) -> Result<()> {
235 let conn = self.conn();
236 conn.execute(
237 "INSERT INTO sessions \
238 (id, user_id, team_id, tool, agent_provider, agent_model, \
239 title, description, tags, created_at, uploaded_at, \
240 message_count, task_count, event_count, duration_seconds, \
241 total_input_tokens, total_output_tokens, body_storage_key, \
242 git_remote, git_branch, git_commit, git_repo_name, \
243 pr_number, pr_url, working_directory, \
244 files_modified, files_read, has_errors) \
245 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27) \
246 ON CONFLICT(id) DO UPDATE SET \
247 title=excluded.title, description=excluded.description, \
248 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
249 message_count=excluded.message_count, task_count=excluded.task_count, \
250 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
251 total_input_tokens=excluded.total_input_tokens, \
252 total_output_tokens=excluded.total_output_tokens, \
253 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
254 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
255 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
256 working_directory=excluded.working_directory, \
257 files_modified=excluded.files_modified, files_read=excluded.files_read, \
258 has_errors=excluded.has_errors",
259 params![
260 &summary.id,
261 &summary.user_id,
262 &summary.team_id,
263 &summary.tool,
264 &summary.agent_provider,
265 &summary.agent_model,
266 &summary.title,
267 &summary.description,
268 &summary.tags,
269 &summary.created_at,
270 &summary.uploaded_at,
271 summary.message_count,
272 summary.task_count,
273 summary.event_count,
274 summary.duration_seconds,
275 summary.total_input_tokens,
276 summary.total_output_tokens,
277 &summary.git_remote,
278 &summary.git_branch,
279 &summary.git_commit,
280 &summary.git_repo_name,
281 summary.pr_number,
282 &summary.pr_url,
283 &summary.working_directory,
284 &summary.files_modified,
285 &summary.files_read,
286 summary.has_errors,
287 ],
288 )?;
289
290 conn.execute(
291 "INSERT INTO session_sync (session_id, sync_status) \
292 VALUES (?1, 'remote_only') \
293 ON CONFLICT(session_id) DO UPDATE SET \
294 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
295 params![&summary.id],
296 )?;
297 Ok(())
298 }
299
300 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
303 let mut where_clauses = vec!["1=1".to_string()];
304 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
305 let mut idx = 1u32;
306
307 if let Some(ref team_id) = filter.team_id {
308 where_clauses.push(format!("s.team_id = ?{idx}"));
309 param_values.push(Box::new(team_id.clone()));
310 idx += 1;
311 }
312
313 if let Some(ref sync_status) = filter.sync_status {
314 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
315 param_values.push(Box::new(sync_status.clone()));
316 idx += 1;
317 }
318
319 if let Some(ref repo) = filter.git_repo_name {
320 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
321 param_values.push(Box::new(repo.clone()));
322 idx += 1;
323 }
324
325 if let Some(ref tool) = filter.tool {
326 where_clauses.push(format!("s.tool = ?{idx}"));
327 param_values.push(Box::new(tool.clone()));
328 idx += 1;
329 }
330
331 if let Some(ref search) = filter.search {
332 let like = format!("%{search}%");
333 where_clauses.push(format!(
334 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
335 i1 = idx,
336 i2 = idx + 1,
337 i3 = idx + 2,
338 ));
339 param_values.push(Box::new(like.clone()));
340 param_values.push(Box::new(like.clone()));
341 param_values.push(Box::new(like));
342 }
343
344 let _ = idx; let where_str = where_clauses.join(" AND ");
347 let sql = format!(
348 "SELECT {LOCAL_SESSION_COLUMNS} \
349 {FROM_CLAUSE} WHERE {where_str} \
350 ORDER BY s.created_at DESC"
351 );
352
353 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
354 param_values.iter().map(|p| p.as_ref()).collect();
355 let conn = self.conn();
356 let mut stmt = conn.prepare(&sql)?;
357 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
358
359 let mut result = Vec::new();
360 for row in rows {
361 result.push(row?);
362 }
363 Ok(result)
364 }
365
366 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
370 let mut where_clauses = vec!["1=1".to_string()];
371 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
372 let mut idx = 1u32;
373
374 if let Some(ref tool) = filter.tool {
375 where_clauses.push(format!("s.tool = ?{idx}"));
376 param_values.push(Box::new(tool.clone()));
377 idx += 1;
378 }
379
380 if let Some(ref model) = filter.model {
381 let like = model.replace('*', "%");
382 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
383 param_values.push(Box::new(like));
384 idx += 1;
385 }
386
387 if let Some(ref since) = filter.since {
388 where_clauses.push(format!("s.created_at >= ?{idx}"));
389 param_values.push(Box::new(since.clone()));
390 idx += 1;
391 }
392
393 if let Some(ref before) = filter.before {
394 where_clauses.push(format!("s.created_at < ?{idx}"));
395 param_values.push(Box::new(before.clone()));
396 idx += 1;
397 }
398
399 if let Some(ref touches) = filter.touches {
400 let like = format!("%\"{touches}\"%");
401 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
402 param_values.push(Box::new(like));
403 idx += 1;
404 }
405
406 if let Some(ref grep) = filter.grep {
407 let like = format!("%{grep}%");
408 where_clauses.push(format!(
409 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
410 i1 = idx,
411 i2 = idx + 1,
412 i3 = idx + 2,
413 ));
414 param_values.push(Box::new(like.clone()));
415 param_values.push(Box::new(like.clone()));
416 param_values.push(Box::new(like));
417 idx += 3;
418 }
419
420 if let Some(true) = filter.has_errors {
421 where_clauses.push("s.has_errors = 1".to_string());
422 }
423
424 if let Some(ref wd) = filter.working_directory {
425 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
426 param_values.push(Box::new(format!("{wd}%")));
427 idx += 1;
428 }
429
430 if let Some(ref repo) = filter.git_repo_name {
431 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
432 param_values.push(Box::new(repo.clone()));
433 idx += 1;
434 }
435
436 let mut extra_join = String::new();
438 if let Some(ref commit) = filter.commit {
439 extra_join =
440 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
441 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
442 param_values.push(Box::new(commit.clone()));
443 idx += 1;
444 }
445
446 let _ = idx; let where_str = where_clauses.join(" AND ");
449 let mut sql = format!(
450 "SELECT {LOCAL_SESSION_COLUMNS} \
451 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
452 ORDER BY s.created_at DESC"
453 );
454
455 if let Some(limit) = filter.limit {
456 sql.push_str(" LIMIT ?");
457 param_values.push(Box::new(limit));
458 if let Some(offset) = filter.offset {
459 sql.push_str(" OFFSET ?");
460 param_values.push(Box::new(offset));
461 }
462 }
463
464 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
465 param_values.iter().map(|p| p.as_ref()).collect();
466 let conn = self.conn();
467 let mut stmt = conn.prepare(&sql)?;
468 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
469
470 let mut result = Vec::new();
471 for row in rows {
472 result.push(row?);
473 }
474 Ok(result)
475 }
476
477 pub fn get_sessions_by_tool_latest(
479 &self,
480 tool: &str,
481 count: u32,
482 ) -> Result<Vec<LocalSessionRow>> {
483 let sql = format!(
484 "SELECT {LOCAL_SESSION_COLUMNS} \
485 {FROM_CLAUSE} WHERE s.tool = ?1 \
486 ORDER BY s.created_at DESC LIMIT ?2"
487 );
488 let conn = self.conn();
489 let mut stmt = conn.prepare(&sql)?;
490 let rows = stmt.query_map(params![tool, count], row_to_local_session)?;
491 let mut result = Vec::new();
492 for row in rows {
493 result.push(row?);
494 }
495 Ok(result)
496 }
497
498 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
500 let sql = format!(
501 "SELECT {LOCAL_SESSION_COLUMNS} \
502 {FROM_CLAUSE} \
503 ORDER BY s.created_at DESC LIMIT ?1"
504 );
505 let conn = self.conn();
506 let mut stmt = conn.prepare(&sql)?;
507 let rows = stmt.query_map(params![count], row_to_local_session)?;
508 let mut result = Vec::new();
509 for row in rows {
510 result.push(row?);
511 }
512 Ok(result)
513 }
514
515 pub fn get_session_by_tool_offset(
517 &self,
518 tool: &str,
519 offset: u32,
520 ) -> Result<Option<LocalSessionRow>> {
521 let sql = format!(
522 "SELECT {LOCAL_SESSION_COLUMNS} \
523 {FROM_CLAUSE} WHERE s.tool = ?1 \
524 ORDER BY s.created_at DESC LIMIT 1 OFFSET ?2"
525 );
526 let conn = self.conn();
527 let mut stmt = conn.prepare(&sql)?;
528 let row = stmt
529 .query_map(params![tool, offset], row_to_local_session)?
530 .next()
531 .transpose()?;
532 Ok(row)
533 }
534
535 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
537 let sql = format!(
538 "SELECT {LOCAL_SESSION_COLUMNS} \
539 {FROM_CLAUSE} \
540 ORDER BY s.created_at DESC LIMIT 1 OFFSET ?1"
541 );
542 let conn = self.conn();
543 let mut stmt = conn.prepare(&sql)?;
544 let row = stmt
545 .query_map(params![offset], row_to_local_session)?
546 .next()
547 .transpose()?;
548 Ok(row)
549 }
550
551 pub fn session_count(&self) -> Result<i64> {
553 let count = self
554 .conn()
555 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
556 Ok(count)
557 }
558
559 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
562 let cursor = self
563 .conn()
564 .query_row(
565 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
566 params![team_id],
567 |row| row.get(0),
568 )
569 .optional()?;
570 Ok(cursor)
571 }
572
573 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
574 self.conn().execute(
575 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
576 VALUES (?1, ?2, datetime('now')) \
577 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
578 params![team_id, cursor],
579 )?;
580 Ok(())
581 }
582
583 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
587 let sql = format!(
588 "SELECT {LOCAL_SESSION_COLUMNS} \
589 FROM sessions s \
590 INNER JOIN session_sync ss ON ss.session_id = s.id \
591 LEFT JOIN users u ON u.id = s.user_id \
592 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
593 ORDER BY s.created_at ASC"
594 );
595 let conn = self.conn();
596 let mut stmt = conn.prepare(&sql)?;
597 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
598 let mut result = Vec::new();
599 for row in rows {
600 result.push(row?);
601 }
602 Ok(result)
603 }
604
605 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
606 self.conn().execute(
607 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
608 WHERE session_id = ?1",
609 params![session_id],
610 )?;
611 Ok(())
612 }
613
614 pub fn was_uploaded_after(
616 &self,
617 source_path: &str,
618 modified: &chrono::DateTime<chrono::Utc>,
619 ) -> Result<bool> {
620 let result: Option<String> = self
621 .conn()
622 .query_row(
623 "SELECT last_synced_at FROM session_sync \
624 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
625 params![source_path],
626 |row| row.get(0),
627 )
628 .optional()?;
629
630 if let Some(synced_at) = result {
631 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
632 return Ok(dt >= *modified);
633 }
634 }
635 Ok(false)
636 }
637
638 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
641 self.conn().execute(
642 "INSERT INTO body_cache (session_id, body, cached_at) \
643 VALUES (?1, ?2, datetime('now')) \
644 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
645 params![session_id, body],
646 )?;
647 Ok(())
648 }
649
650 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
651 let body = self
652 .conn()
653 .query_row(
654 "SELECT body FROM body_cache WHERE session_id = ?1",
655 params![session_id],
656 |row| row.get(0),
657 )
658 .optional()?;
659 Ok(body)
660 }
661
662 pub fn migrate_from_state_json(
667 &self,
668 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
669 ) -> Result<usize> {
670 let mut count = 0;
671 for (path, uploaded_at) in uploaded {
672 let exists: bool = self
673 .conn()
674 .query_row(
675 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
676 params![path],
677 |row| row.get(0),
678 )
679 .unwrap_or(false);
680
681 if exists {
682 self.conn().execute(
683 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
684 WHERE source_path = ?2 AND sync_status = 'local_only'",
685 params![uploaded_at.to_rfc3339(), path],
686 )?;
687 count += 1;
688 }
689 }
690 Ok(count)
691 }
692
693 pub fn link_commit_session(
697 &self,
698 commit_hash: &str,
699 session_id: &str,
700 repo_path: Option<&str>,
701 branch: Option<&str>,
702 ) -> Result<()> {
703 self.conn().execute(
704 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
705 VALUES (?1, ?2, ?3, ?4) \
706 ON CONFLICT(commit_hash, session_id) DO NOTHING",
707 params![commit_hash, session_id, repo_path, branch],
708 )?;
709 Ok(())
710 }
711
712 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
714 let sql = format!(
715 "SELECT {LOCAL_SESSION_COLUMNS} \
716 {FROM_CLAUSE} \
717 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
718 WHERE csl.commit_hash = ?1 \
719 ORDER BY s.created_at DESC"
720 );
721 let conn = self.conn();
722 let mut stmt = conn.prepare(&sql)?;
723 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
724 let mut result = Vec::new();
725 for row in rows {
726 result.push(row?);
727 }
728 Ok(result)
729 }
730
731 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
733 let conn = self.conn();
734 let mut stmt = conn.prepare(
735 "SELECT commit_hash, session_id, repo_path, branch, created_at \
736 FROM commit_session_links WHERE session_id = ?1 \
737 ORDER BY created_at DESC",
738 )?;
739 let rows = stmt.query_map(params![session_id], |row| {
740 Ok(CommitLink {
741 commit_hash: row.get(0)?,
742 session_id: row.get(1)?,
743 repo_path: row.get(2)?,
744 branch: row.get(3)?,
745 created_at: row.get(4)?,
746 })
747 })?;
748 let mut result = Vec::new();
749 for row in rows {
750 result.push(row?);
751 }
752 Ok(result)
753 }
754
755 pub fn find_active_session_for_repo(
759 &self,
760 repo_path: &str,
761 since_minutes: u32,
762 ) -> Result<Option<LocalSessionRow>> {
763 let sql = format!(
764 "SELECT {LOCAL_SESSION_COLUMNS} \
765 {FROM_CLAUSE} \
766 WHERE s.working_directory LIKE ?1 \
767 AND s.created_at >= datetime('now', ?2) \
768 ORDER BY s.created_at DESC LIMIT 1"
769 );
770 let since = format!("-{since_minutes} minutes");
771 let like = format!("{repo_path}%");
772 let conn = self.conn();
773 let mut stmt = conn.prepare(&sql)?;
774 let row = stmt
775 .query_map(params![like, since], row_to_local_session)?
776 .next()
777 .transpose()?;
778 Ok(row)
779 }
780
781 pub fn list_repos(&self) -> Result<Vec<String>> {
783 let conn = self.conn();
784 let mut stmt = conn.prepare(
785 "SELECT DISTINCT git_repo_name FROM sessions \
786 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
787 )?;
788 let rows = stmt.query_map([], |row| row.get(0))?;
789 let mut result = Vec::new();
790 for row in rows {
791 result.push(row?);
792 }
793 Ok(result)
794 }
795}
796
797pub const LOCAL_SESSION_COLUMNS: &str = "\
799s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
800s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
801s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
802s.message_count, s.task_count, s.event_count, s.duration_seconds, \
803s.total_input_tokens, s.total_output_tokens, \
804s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
805s.pr_number, s.pr_url, s.working_directory, \
806s.files_modified, s.files_read, s.has_errors";
807
808fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
809 Ok(LocalSessionRow {
810 id: row.get(0)?,
811 source_path: row.get(1)?,
812 sync_status: row.get(2)?,
813 last_synced_at: row.get(3)?,
814 user_id: row.get(4)?,
815 nickname: row.get(5)?,
816 team_id: row.get(6)?,
817 tool: row.get(7)?,
818 agent_provider: row.get(8)?,
819 agent_model: row.get(9)?,
820 title: row.get(10)?,
821 description: row.get(11)?,
822 tags: row.get(12)?,
823 created_at: row.get(13)?,
824 uploaded_at: row.get(14)?,
825 message_count: row.get(15)?,
826 task_count: row.get(16)?,
827 event_count: row.get(17)?,
828 duration_seconds: row.get(18)?,
829 total_input_tokens: row.get(19)?,
830 total_output_tokens: row.get(20)?,
831 git_remote: row.get(21)?,
832 git_branch: row.get(22)?,
833 git_commit: row.get(23)?,
834 git_repo_name: row.get(24)?,
835 pr_number: row.get(25)?,
836 pr_url: row.get(26)?,
837 working_directory: row.get(27)?,
838 files_modified: row.get(28)?,
839 files_read: row.get(29)?,
840 has_errors: row.get::<_, i64>(30).unwrap_or(0) != 0,
841 })
842}
843
844fn default_db_path() -> Result<PathBuf> {
845 let home = std::env::var("HOME")
846 .or_else(|_| std::env::var("USERPROFILE"))
847 .context("Could not determine home directory")?;
848 Ok(PathBuf::from(home)
849 .join(".local")
850 .join("share")
851 .join("opensession")
852 .join("local.db"))
853}
854
855#[cfg(test)]
856mod tests {
857 use super::*;
858
859 fn test_db() -> LocalDb {
860 let dir = tempfile::tempdir().unwrap();
861 let path = dir.keep().join("test.db");
862 LocalDb::open_path(&path).unwrap()
863 }
864
865 #[test]
866 fn test_open_and_schema() {
867 let _db = test_db();
868 }
869
870 #[test]
871 fn test_sync_cursor() {
872 let db = test_db();
873 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
874 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
875 assert_eq!(
876 db.get_sync_cursor("team1").unwrap(),
877 Some("2024-01-01T00:00:00Z".to_string())
878 );
879 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
881 assert_eq!(
882 db.get_sync_cursor("team1").unwrap(),
883 Some("2024-06-01T00:00:00Z".to_string())
884 );
885 }
886
887 #[test]
888 fn test_body_cache() {
889 let db = test_db();
890 assert_eq!(db.get_cached_body("s1").unwrap(), None);
891 db.cache_body("s1", b"hello world").unwrap();
892 assert_eq!(
893 db.get_cached_body("s1").unwrap(),
894 Some(b"hello world".to_vec())
895 );
896 }
897
898 #[test]
899 fn test_upsert_remote_session() {
900 let db = test_db();
901 let summary = SessionSummary {
902 id: "remote-1".to_string(),
903 user_id: Some("u1".to_string()),
904 nickname: Some("alice".to_string()),
905 team_id: "t1".to_string(),
906 tool: "claude-code".to_string(),
907 agent_provider: None,
908 agent_model: None,
909 title: Some("Test session".to_string()),
910 description: None,
911 tags: None,
912 created_at: "2024-01-01T00:00:00Z".to_string(),
913 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
914 message_count: 10,
915 task_count: 2,
916 event_count: 20,
917 duration_seconds: 300,
918 total_input_tokens: 1000,
919 total_output_tokens: 500,
920 git_remote: None,
921 git_branch: None,
922 git_commit: None,
923 git_repo_name: None,
924 pr_number: None,
925 pr_url: None,
926 working_directory: None,
927 files_modified: None,
928 files_read: None,
929 has_errors: false,
930 };
931 db.upsert_remote_session(&summary).unwrap();
932
933 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
934 assert_eq!(sessions.len(), 1);
935 assert_eq!(sessions[0].id, "remote-1");
936 assert_eq!(sessions[0].sync_status, "remote_only");
937 assert_eq!(sessions[0].nickname, None); }
939
940 #[test]
941 fn test_list_filter_by_repo() {
942 let db = test_db();
943 let summary1 = SessionSummary {
945 id: "s1".to_string(),
946 user_id: None,
947 nickname: None,
948 team_id: "t1".to_string(),
949 tool: "claude-code".to_string(),
950 agent_provider: None,
951 agent_model: None,
952 title: Some("Session 1".to_string()),
953 description: None,
954 tags: None,
955 created_at: "2024-01-01T00:00:00Z".to_string(),
956 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
957 message_count: 5,
958 task_count: 0,
959 event_count: 10,
960 duration_seconds: 60,
961 total_input_tokens: 100,
962 total_output_tokens: 50,
963 git_remote: None,
964 git_branch: None,
965 git_commit: None,
966 git_repo_name: None,
967 pr_number: None,
968 pr_url: None,
969 working_directory: None,
970 files_modified: None,
971 files_read: None,
972 has_errors: false,
973 };
974 db.upsert_remote_session(&summary1).unwrap();
975
976 let filter = LocalSessionFilter {
978 team_id: Some("t1".to_string()),
979 ..Default::default()
980 };
981 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
982
983 let filter = LocalSessionFilter {
984 team_id: Some("t999".to_string()),
985 ..Default::default()
986 };
987 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
988 }
989
990 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> SessionSummary {
993 SessionSummary {
994 id: id.to_string(),
995 user_id: None,
996 nickname: None,
997 team_id: "t1".to_string(),
998 tool: tool.to_string(),
999 agent_provider: Some("anthropic".to_string()),
1000 agent_model: Some("claude-opus-4-6".to_string()),
1001 title: Some(title.to_string()),
1002 description: None,
1003 tags: None,
1004 created_at: created_at.to_string(),
1005 uploaded_at: created_at.to_string(),
1006 message_count: 5,
1007 task_count: 1,
1008 event_count: 10,
1009 duration_seconds: 300,
1010 total_input_tokens: 1000,
1011 total_output_tokens: 500,
1012 git_remote: None,
1013 git_branch: None,
1014 git_commit: None,
1015 git_repo_name: None,
1016 pr_number: None,
1017 pr_url: None,
1018 working_directory: None,
1019 files_modified: None,
1020 files_read: None,
1021 has_errors: false,
1022 }
1023 }
1024
1025 fn seed_sessions(db: &LocalDb) {
1026 db.upsert_remote_session(&make_summary(
1028 "s1",
1029 "claude-code",
1030 "First session",
1031 "2024-01-01T00:00:00Z",
1032 ))
1033 .unwrap();
1034 db.upsert_remote_session(&make_summary(
1035 "s2",
1036 "claude-code",
1037 "JWT auth work",
1038 "2024-01-02T00:00:00Z",
1039 ))
1040 .unwrap();
1041 db.upsert_remote_session(&make_summary(
1042 "s3",
1043 "gemini",
1044 "Gemini test",
1045 "2024-01-03T00:00:00Z",
1046 ))
1047 .unwrap();
1048 db.upsert_remote_session(&make_summary(
1049 "s4",
1050 "claude-code",
1051 "Error handling",
1052 "2024-01-04T00:00:00Z",
1053 ))
1054 .unwrap();
1055 db.upsert_remote_session(&make_summary(
1056 "s5",
1057 "claude-code",
1058 "Final polish",
1059 "2024-01-05T00:00:00Z",
1060 ))
1061 .unwrap();
1062 }
1063
1064 #[test]
1067 fn test_log_no_filters() {
1068 let db = test_db();
1069 seed_sessions(&db);
1070 let filter = LogFilter::default();
1071 let results = db.list_sessions_log(&filter).unwrap();
1072 assert_eq!(results.len(), 5);
1073 assert_eq!(results[0].id, "s5");
1075 assert_eq!(results[4].id, "s1");
1076 }
1077
1078 #[test]
1079 fn test_log_filter_by_tool() {
1080 let db = test_db();
1081 seed_sessions(&db);
1082 let filter = LogFilter {
1083 tool: Some("claude-code".to_string()),
1084 ..Default::default()
1085 };
1086 let results = db.list_sessions_log(&filter).unwrap();
1087 assert_eq!(results.len(), 4);
1088 assert!(results.iter().all(|s| s.tool == "claude-code"));
1089 }
1090
1091 #[test]
1092 fn test_log_filter_by_model_wildcard() {
1093 let db = test_db();
1094 seed_sessions(&db);
1095 let filter = LogFilter {
1096 model: Some("claude*".to_string()),
1097 ..Default::default()
1098 };
1099 let results = db.list_sessions_log(&filter).unwrap();
1100 assert_eq!(results.len(), 5); }
1102
1103 #[test]
1104 fn test_log_filter_since() {
1105 let db = test_db();
1106 seed_sessions(&db);
1107 let filter = LogFilter {
1108 since: Some("2024-01-03T00:00:00Z".to_string()),
1109 ..Default::default()
1110 };
1111 let results = db.list_sessions_log(&filter).unwrap();
1112 assert_eq!(results.len(), 3); }
1114
1115 #[test]
1116 fn test_log_filter_before() {
1117 let db = test_db();
1118 seed_sessions(&db);
1119 let filter = LogFilter {
1120 before: Some("2024-01-03T00:00:00Z".to_string()),
1121 ..Default::default()
1122 };
1123 let results = db.list_sessions_log(&filter).unwrap();
1124 assert_eq!(results.len(), 2); }
1126
1127 #[test]
1128 fn test_log_filter_since_and_before() {
1129 let db = test_db();
1130 seed_sessions(&db);
1131 let filter = LogFilter {
1132 since: Some("2024-01-02T00:00:00Z".to_string()),
1133 before: Some("2024-01-04T00:00:00Z".to_string()),
1134 ..Default::default()
1135 };
1136 let results = db.list_sessions_log(&filter).unwrap();
1137 assert_eq!(results.len(), 2); }
1139
1140 #[test]
1141 fn test_log_filter_grep() {
1142 let db = test_db();
1143 seed_sessions(&db);
1144 let filter = LogFilter {
1145 grep: Some("JWT".to_string()),
1146 ..Default::default()
1147 };
1148 let results = db.list_sessions_log(&filter).unwrap();
1149 assert_eq!(results.len(), 1);
1150 assert_eq!(results[0].id, "s2");
1151 }
1152
1153 #[test]
1154 fn test_log_limit_and_offset() {
1155 let db = test_db();
1156 seed_sessions(&db);
1157 let filter = LogFilter {
1158 limit: Some(2),
1159 offset: Some(1),
1160 ..Default::default()
1161 };
1162 let results = db.list_sessions_log(&filter).unwrap();
1163 assert_eq!(results.len(), 2);
1164 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
1166 }
1167
1168 #[test]
1169 fn test_log_limit_only() {
1170 let db = test_db();
1171 seed_sessions(&db);
1172 let filter = LogFilter {
1173 limit: Some(3),
1174 ..Default::default()
1175 };
1176 let results = db.list_sessions_log(&filter).unwrap();
1177 assert_eq!(results.len(), 3);
1178 }
1179
1180 #[test]
1181 fn test_log_combined_filters() {
1182 let db = test_db();
1183 seed_sessions(&db);
1184 let filter = LogFilter {
1185 tool: Some("claude-code".to_string()),
1186 since: Some("2024-01-03T00:00:00Z".to_string()),
1187 limit: Some(1),
1188 ..Default::default()
1189 };
1190 let results = db.list_sessions_log(&filter).unwrap();
1191 assert_eq!(results.len(), 1);
1192 assert_eq!(results[0].id, "s5"); }
1194
1195 #[test]
1198 fn test_get_session_by_offset() {
1199 let db = test_db();
1200 seed_sessions(&db);
1201 let row = db.get_session_by_offset(0).unwrap().unwrap();
1202 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
1204 assert_eq!(row.id, "s3");
1205 assert!(db.get_session_by_offset(10).unwrap().is_none());
1206 }
1207
1208 #[test]
1209 fn test_get_session_by_tool_offset() {
1210 let db = test_db();
1211 seed_sessions(&db);
1212 let row = db
1213 .get_session_by_tool_offset("claude-code", 0)
1214 .unwrap()
1215 .unwrap();
1216 assert_eq!(row.id, "s5");
1217 let row = db
1218 .get_session_by_tool_offset("claude-code", 1)
1219 .unwrap()
1220 .unwrap();
1221 assert_eq!(row.id, "s4");
1222 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
1223 assert_eq!(row.id, "s3");
1224 assert!(db
1225 .get_session_by_tool_offset("gemini", 1)
1226 .unwrap()
1227 .is_none());
1228 }
1229
1230 #[test]
1231 fn test_get_sessions_latest() {
1232 let db = test_db();
1233 seed_sessions(&db);
1234 let rows = db.get_sessions_latest(3).unwrap();
1235 assert_eq!(rows.len(), 3);
1236 assert_eq!(rows[0].id, "s5");
1237 assert_eq!(rows[1].id, "s4");
1238 assert_eq!(rows[2].id, "s3");
1239 }
1240
1241 #[test]
1242 fn test_get_sessions_by_tool_latest() {
1243 let db = test_db();
1244 seed_sessions(&db);
1245 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
1246 assert_eq!(rows.len(), 2);
1247 assert_eq!(rows[0].id, "s5");
1248 assert_eq!(rows[1].id, "s4");
1249 }
1250
1251 #[test]
1252 fn test_get_sessions_latest_more_than_available() {
1253 let db = test_db();
1254 seed_sessions(&db);
1255 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
1256 assert_eq!(rows.len(), 1); }
1258
1259 #[test]
1260 fn test_session_count() {
1261 let db = test_db();
1262 assert_eq!(db.session_count().unwrap(), 0);
1263 seed_sessions(&db);
1264 assert_eq!(db.session_count().unwrap(), 5);
1265 }
1266
1267 #[test]
1270 fn test_link_commit_session() {
1271 let db = test_db();
1272 seed_sessions(&db);
1273 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
1274 .unwrap();
1275
1276 let commits = db.get_commits_by_session("s1").unwrap();
1277 assert_eq!(commits.len(), 1);
1278 assert_eq!(commits[0].commit_hash, "abc123");
1279 assert_eq!(commits[0].session_id, "s1");
1280 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
1281 assert_eq!(commits[0].branch.as_deref(), Some("main"));
1282
1283 let sessions = db.get_sessions_by_commit("abc123").unwrap();
1284 assert_eq!(sessions.len(), 1);
1285 assert_eq!(sessions[0].id, "s1");
1286 }
1287
1288 #[test]
1289 fn test_get_sessions_by_commit() {
1290 let db = test_db();
1291 seed_sessions(&db);
1292 db.link_commit_session("abc123", "s1", None, None).unwrap();
1294 db.link_commit_session("abc123", "s2", None, None).unwrap();
1295 db.link_commit_session("abc123", "s3", None, None).unwrap();
1296
1297 let sessions = db.get_sessions_by_commit("abc123").unwrap();
1298 assert_eq!(sessions.len(), 3);
1299 assert_eq!(sessions[0].id, "s3");
1301 assert_eq!(sessions[1].id, "s2");
1302 assert_eq!(sessions[2].id, "s1");
1303 }
1304
1305 #[test]
1306 fn test_get_commits_by_session() {
1307 let db = test_db();
1308 seed_sessions(&db);
1309 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
1311 .unwrap();
1312 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
1313 .unwrap();
1314 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
1315 .unwrap();
1316
1317 let commits = db.get_commits_by_session("s1").unwrap();
1318 assert_eq!(commits.len(), 3);
1319 assert!(commits.iter().all(|c| c.session_id == "s1"));
1321 }
1322
1323 #[test]
1324 fn test_duplicate_link_ignored() {
1325 let db = test_db();
1326 seed_sessions(&db);
1327 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
1328 .unwrap();
1329 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
1331 .unwrap();
1332
1333 let commits = db.get_commits_by_session("s1").unwrap();
1334 assert_eq!(commits.len(), 1);
1335 }
1336
1337 #[test]
1338 fn test_log_filter_by_commit() {
1339 let db = test_db();
1340 seed_sessions(&db);
1341 db.link_commit_session("abc123", "s2", None, None).unwrap();
1342 db.link_commit_session("abc123", "s4", None, None).unwrap();
1343
1344 let filter = LogFilter {
1345 commit: Some("abc123".to_string()),
1346 ..Default::default()
1347 };
1348 let results = db.list_sessions_log(&filter).unwrap();
1349 assert_eq!(results.len(), 2);
1350 assert_eq!(results[0].id, "s4");
1351 assert_eq!(results[1].id, "s2");
1352
1353 let filter = LogFilter {
1355 commit: Some("nonexistent".to_string()),
1356 ..Default::default()
1357 };
1358 let results = db.list_sessions_log(&filter).unwrap();
1359 assert_eq!(results.len(), 0);
1360 }
1361}