1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api_types::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_api_types::SessionSummary;
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use std::path::PathBuf;
9use std::sync::Mutex;
10
11use git::GitContext;
12
13#[derive(Debug, Clone)]
15pub struct LocalSessionRow {
16 pub id: String,
17 pub source_path: Option<String>,
18 pub sync_status: String,
19 pub last_synced_at: Option<String>,
20 pub user_id: Option<String>,
21 pub nickname: Option<String>,
22 pub team_id: Option<String>,
23 pub tool: String,
24 pub agent_provider: Option<String>,
25 pub agent_model: Option<String>,
26 pub title: Option<String>,
27 pub description: Option<String>,
28 pub tags: Option<String>,
29 pub created_at: String,
30 pub uploaded_at: Option<String>,
31 pub message_count: i64,
32 pub user_message_count: i64,
33 pub task_count: i64,
34 pub event_count: i64,
35 pub duration_seconds: i64,
36 pub total_input_tokens: i64,
37 pub total_output_tokens: i64,
38 pub git_remote: Option<String>,
39 pub git_branch: Option<String>,
40 pub git_commit: Option<String>,
41 pub git_repo_name: Option<String>,
42 pub pr_number: Option<i64>,
43 pub pr_url: Option<String>,
44 pub working_directory: Option<String>,
45 pub files_modified: Option<String>,
46 pub files_read: Option<String>,
47 pub has_errors: bool,
48}
49
50#[derive(Debug, Clone)]
52pub struct CommitLink {
53 pub commit_hash: String,
54 pub session_id: String,
55 pub repo_path: Option<String>,
56 pub branch: Option<String>,
57 pub created_at: String,
58}
59
60#[derive(Debug, Default)]
62pub struct LocalSessionFilter {
63 pub team_id: Option<String>,
64 pub sync_status: Option<String>,
65 pub git_repo_name: Option<String>,
66 pub search: Option<String>,
67 pub tool: Option<String>,
68}
69
70#[derive(Debug, Default)]
72pub struct LogFilter {
73 pub tool: Option<String>,
75 pub model: Option<String>,
77 pub since: Option<String>,
79 pub before: Option<String>,
81 pub touches: Option<String>,
83 pub grep: Option<String>,
85 pub has_errors: Option<bool>,
87 pub working_directory: Option<String>,
89 pub git_repo_name: Option<String>,
91 pub commit: Option<String>,
93 pub limit: Option<u32>,
95 pub offset: Option<u32>,
97}
98
99const FROM_CLAUSE: &str = "\
101FROM sessions s \
102LEFT JOIN session_sync ss ON ss.session_id = s.id \
103LEFT JOIN users u ON u.id = s.user_id";
104
105pub struct LocalDb {
108 conn: Mutex<Connection>,
109}
110
111impl LocalDb {
112 pub fn open() -> Result<Self> {
115 let path = default_db_path()?;
116 Self::open_path(&path)
117 }
118
119 pub fn open_path(path: &PathBuf) -> Result<Self> {
121 if let Some(parent) = path.parent() {
122 std::fs::create_dir_all(parent)
123 .with_context(|| format!("create dir for {}", path.display()))?;
124 }
125 let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
126 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
127
128 conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
130
131 for (_name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
135 if let Err(e) = conn.execute_batch(sql) {
136 let msg = e.to_string();
137 if msg.contains("duplicate column name") || msg.contains("no such column") {
138 continue;
140 }
141 return Err(e.into());
142 }
143 }
144
145 Ok(Self {
146 conn: Mutex::new(conn),
147 })
148 }
149
150 fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
151 self.conn.lock().expect("local db mutex poisoned")
152 }
153
154 pub fn upsert_local_session(
157 &self,
158 session: &Session,
159 source_path: &str,
160 git: &GitContext,
161 ) -> Result<()> {
162 let title = session.context.title.as_deref();
163 let description = session.context.description.as_deref();
164 let tags = if session.context.tags.is_empty() {
165 None
166 } else {
167 Some(session.context.tags.join(","))
168 };
169 let created_at = session.context.created_at.to_rfc3339();
170 let cwd = session
171 .context
172 .attributes
173 .get("cwd")
174 .or_else(|| session.context.attributes.get("working_directory"))
175 .and_then(|v| v.as_str().map(String::from));
176
177 let (files_modified, files_read, has_errors) =
179 opensession_core::extract::extract_file_metadata(session);
180
181 let conn = self.conn();
182 conn.execute(
183 "INSERT INTO sessions \
184 (id, team_id, tool, agent_provider, agent_model, \
185 title, description, tags, created_at, \
186 message_count, user_message_count, task_count, event_count, duration_seconds, \
187 total_input_tokens, total_output_tokens, body_storage_key, \
188 git_remote, git_branch, git_commit, git_repo_name, working_directory, \
189 files_modified, files_read, has_errors) \
190 VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23) \
191 ON CONFLICT(id) DO UPDATE SET \
192 tool=excluded.tool, agent_provider=excluded.agent_provider, \
193 agent_model=excluded.agent_model, \
194 title=excluded.title, description=excluded.description, \
195 tags=excluded.tags, \
196 message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
197 task_count=excluded.task_count, \
198 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
199 total_input_tokens=excluded.total_input_tokens, \
200 total_output_tokens=excluded.total_output_tokens, \
201 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
202 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
203 working_directory=excluded.working_directory, \
204 files_modified=excluded.files_modified, files_read=excluded.files_read, \
205 has_errors=excluded.has_errors",
206 params![
207 &session.session_id,
208 &session.agent.tool,
209 &session.agent.provider,
210 &session.agent.model,
211 title,
212 description,
213 &tags,
214 &created_at,
215 session.stats.message_count as i64,
216 session.stats.user_message_count as i64,
217 session.stats.task_count as i64,
218 session.stats.event_count as i64,
219 session.stats.duration_seconds as i64,
220 session.stats.total_input_tokens as i64,
221 session.stats.total_output_tokens as i64,
222 &git.remote,
223 &git.branch,
224 &git.commit,
225 &git.repo_name,
226 &cwd,
227 &files_modified,
228 &files_read,
229 has_errors,
230 ],
231 )?;
232
233 conn.execute(
234 "INSERT INTO session_sync (session_id, source_path, sync_status) \
235 VALUES (?1, ?2, 'local_only') \
236 ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
237 params![&session.session_id, source_path],
238 )?;
239 Ok(())
240 }
241
242 pub fn upsert_remote_session(&self, summary: &SessionSummary) -> Result<()> {
245 let conn = self.conn();
246 conn.execute(
247 "INSERT INTO sessions \
248 (id, user_id, team_id, tool, agent_provider, agent_model, \
249 title, description, tags, created_at, uploaded_at, \
250 message_count, task_count, event_count, duration_seconds, \
251 total_input_tokens, total_output_tokens, body_storage_key, \
252 git_remote, git_branch, git_commit, git_repo_name, \
253 pr_number, pr_url, working_directory, \
254 files_modified, files_read, has_errors) \
255 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27) \
256 ON CONFLICT(id) DO UPDATE SET \
257 title=excluded.title, description=excluded.description, \
258 tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
259 message_count=excluded.message_count, task_count=excluded.task_count, \
260 event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
261 total_input_tokens=excluded.total_input_tokens, \
262 total_output_tokens=excluded.total_output_tokens, \
263 git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
264 git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
265 pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
266 working_directory=excluded.working_directory, \
267 files_modified=excluded.files_modified, files_read=excluded.files_read, \
268 has_errors=excluded.has_errors",
269 params![
270 &summary.id,
271 &summary.user_id,
272 &summary.team_id,
273 &summary.tool,
274 &summary.agent_provider,
275 &summary.agent_model,
276 &summary.title,
277 &summary.description,
278 &summary.tags,
279 &summary.created_at,
280 &summary.uploaded_at,
281 summary.message_count,
282 summary.task_count,
283 summary.event_count,
284 summary.duration_seconds,
285 summary.total_input_tokens,
286 summary.total_output_tokens,
287 &summary.git_remote,
288 &summary.git_branch,
289 &summary.git_commit,
290 &summary.git_repo_name,
291 summary.pr_number,
292 &summary.pr_url,
293 &summary.working_directory,
294 &summary.files_modified,
295 &summary.files_read,
296 summary.has_errors,
297 ],
298 )?;
299
300 conn.execute(
301 "INSERT INTO session_sync (session_id, sync_status) \
302 VALUES (?1, 'remote_only') \
303 ON CONFLICT(session_id) DO UPDATE SET \
304 sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
305 params![&summary.id],
306 )?;
307 Ok(())
308 }
309
310 pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
313 let mut where_clauses = vec!["1=1".to_string()];
314 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
315 let mut idx = 1u32;
316
317 if let Some(ref team_id) = filter.team_id {
318 where_clauses.push(format!("s.team_id = ?{idx}"));
319 param_values.push(Box::new(team_id.clone()));
320 idx += 1;
321 }
322
323 if let Some(ref sync_status) = filter.sync_status {
324 where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
325 param_values.push(Box::new(sync_status.clone()));
326 idx += 1;
327 }
328
329 if let Some(ref repo) = filter.git_repo_name {
330 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
331 param_values.push(Box::new(repo.clone()));
332 idx += 1;
333 }
334
335 if let Some(ref tool) = filter.tool {
336 where_clauses.push(format!("s.tool = ?{idx}"));
337 param_values.push(Box::new(tool.clone()));
338 idx += 1;
339 }
340
341 if let Some(ref search) = filter.search {
342 let like = format!("%{search}%");
343 where_clauses.push(format!(
344 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
345 i1 = idx,
346 i2 = idx + 1,
347 i3 = idx + 2,
348 ));
349 param_values.push(Box::new(like.clone()));
350 param_values.push(Box::new(like.clone()));
351 param_values.push(Box::new(like));
352 }
353
354 let _ = idx; let where_str = where_clauses.join(" AND ");
357 let sql = format!(
358 "SELECT {LOCAL_SESSION_COLUMNS} \
359 {FROM_CLAUSE} WHERE {where_str} \
360 ORDER BY s.created_at DESC"
361 );
362
363 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
364 param_values.iter().map(|p| p.as_ref()).collect();
365 let conn = self.conn();
366 let mut stmt = conn.prepare(&sql)?;
367 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
368
369 let mut result = Vec::new();
370 for row in rows {
371 result.push(row?);
372 }
373 Ok(result)
374 }
375
376 pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
380 let mut where_clauses = vec!["1=1".to_string()];
381 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
382 let mut idx = 1u32;
383
384 if let Some(ref tool) = filter.tool {
385 where_clauses.push(format!("s.tool = ?{idx}"));
386 param_values.push(Box::new(tool.clone()));
387 idx += 1;
388 }
389
390 if let Some(ref model) = filter.model {
391 let like = model.replace('*', "%");
392 where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
393 param_values.push(Box::new(like));
394 idx += 1;
395 }
396
397 if let Some(ref since) = filter.since {
398 where_clauses.push(format!("s.created_at >= ?{idx}"));
399 param_values.push(Box::new(since.clone()));
400 idx += 1;
401 }
402
403 if let Some(ref before) = filter.before {
404 where_clauses.push(format!("s.created_at < ?{idx}"));
405 param_values.push(Box::new(before.clone()));
406 idx += 1;
407 }
408
409 if let Some(ref touches) = filter.touches {
410 let like = format!("%\"{touches}\"%");
411 where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
412 param_values.push(Box::new(like));
413 idx += 1;
414 }
415
416 if let Some(ref grep) = filter.grep {
417 let like = format!("%{grep}%");
418 where_clauses.push(format!(
419 "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
420 i1 = idx,
421 i2 = idx + 1,
422 i3 = idx + 2,
423 ));
424 param_values.push(Box::new(like.clone()));
425 param_values.push(Box::new(like.clone()));
426 param_values.push(Box::new(like));
427 idx += 3;
428 }
429
430 if let Some(true) = filter.has_errors {
431 where_clauses.push("s.has_errors = 1".to_string());
432 }
433
434 if let Some(ref wd) = filter.working_directory {
435 where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
436 param_values.push(Box::new(format!("{wd}%")));
437 idx += 1;
438 }
439
440 if let Some(ref repo) = filter.git_repo_name {
441 where_clauses.push(format!("s.git_repo_name = ?{idx}"));
442 param_values.push(Box::new(repo.clone()));
443 idx += 1;
444 }
445
446 let mut extra_join = String::new();
448 if let Some(ref commit) = filter.commit {
449 extra_join =
450 " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
451 where_clauses.push(format!("csl.commit_hash = ?{idx}"));
452 param_values.push(Box::new(commit.clone()));
453 idx += 1;
454 }
455
456 let _ = idx; let where_str = where_clauses.join(" AND ");
459 let mut sql = format!(
460 "SELECT {LOCAL_SESSION_COLUMNS} \
461 {FROM_CLAUSE}{extra_join} WHERE {where_str} \
462 ORDER BY s.created_at DESC"
463 );
464
465 if let Some(limit) = filter.limit {
466 sql.push_str(" LIMIT ?");
467 param_values.push(Box::new(limit));
468 if let Some(offset) = filter.offset {
469 sql.push_str(" OFFSET ?");
470 param_values.push(Box::new(offset));
471 }
472 }
473
474 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
475 param_values.iter().map(|p| p.as_ref()).collect();
476 let conn = self.conn();
477 let mut stmt = conn.prepare(&sql)?;
478 let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
479
480 let mut result = Vec::new();
481 for row in rows {
482 result.push(row?);
483 }
484 Ok(result)
485 }
486
487 pub fn get_sessions_by_tool_latest(
489 &self,
490 tool: &str,
491 count: u32,
492 ) -> Result<Vec<LocalSessionRow>> {
493 let sql = format!(
494 "SELECT {LOCAL_SESSION_COLUMNS} \
495 {FROM_CLAUSE} WHERE s.tool = ?1 \
496 ORDER BY s.created_at DESC LIMIT ?2"
497 );
498 let conn = self.conn();
499 let mut stmt = conn.prepare(&sql)?;
500 let rows = stmt.query_map(params![tool, count], row_to_local_session)?;
501 let mut result = Vec::new();
502 for row in rows {
503 result.push(row?);
504 }
505 Ok(result)
506 }
507
508 pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
510 let sql = format!(
511 "SELECT {LOCAL_SESSION_COLUMNS} \
512 {FROM_CLAUSE} \
513 ORDER BY s.created_at DESC LIMIT ?1"
514 );
515 let conn = self.conn();
516 let mut stmt = conn.prepare(&sql)?;
517 let rows = stmt.query_map(params![count], row_to_local_session)?;
518 let mut result = Vec::new();
519 for row in rows {
520 result.push(row?);
521 }
522 Ok(result)
523 }
524
525 pub fn get_session_by_tool_offset(
527 &self,
528 tool: &str,
529 offset: u32,
530 ) -> Result<Option<LocalSessionRow>> {
531 let sql = format!(
532 "SELECT {LOCAL_SESSION_COLUMNS} \
533 {FROM_CLAUSE} WHERE s.tool = ?1 \
534 ORDER BY s.created_at DESC LIMIT 1 OFFSET ?2"
535 );
536 let conn = self.conn();
537 let mut stmt = conn.prepare(&sql)?;
538 let row = stmt
539 .query_map(params![tool, offset], row_to_local_session)?
540 .next()
541 .transpose()?;
542 Ok(row)
543 }
544
545 pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
547 let sql = format!(
548 "SELECT {LOCAL_SESSION_COLUMNS} \
549 {FROM_CLAUSE} \
550 ORDER BY s.created_at DESC LIMIT 1 OFFSET ?1"
551 );
552 let conn = self.conn();
553 let mut stmt = conn.prepare(&sql)?;
554 let row = stmt
555 .query_map(params![offset], row_to_local_session)?
556 .next()
557 .transpose()?;
558 Ok(row)
559 }
560
561 pub fn session_count(&self) -> Result<i64> {
563 let count = self
564 .conn()
565 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
566 Ok(count)
567 }
568
569 pub fn delete_session(&self, session_id: &str) -> Result<()> {
572 let conn = self.conn();
573 conn.execute(
574 "DELETE FROM body_cache WHERE session_id = ?1",
575 params![session_id],
576 )?;
577 conn.execute(
578 "DELETE FROM session_sync WHERE session_id = ?1",
579 params![session_id],
580 )?;
581 conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
582 Ok(())
583 }
584
585 pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
588 let cursor = self
589 .conn()
590 .query_row(
591 "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
592 params![team_id],
593 |row| row.get(0),
594 )
595 .optional()?;
596 Ok(cursor)
597 }
598
599 pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
600 self.conn().execute(
601 "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
602 VALUES (?1, ?2, datetime('now')) \
603 ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
604 params![team_id, cursor],
605 )?;
606 Ok(())
607 }
608
609 pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
613 let sql = format!(
614 "SELECT {LOCAL_SESSION_COLUMNS} \
615 FROM sessions s \
616 INNER JOIN session_sync ss ON ss.session_id = s.id \
617 LEFT JOIN users u ON u.id = s.user_id \
618 WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
619 ORDER BY s.created_at ASC"
620 );
621 let conn = self.conn();
622 let mut stmt = conn.prepare(&sql)?;
623 let rows = stmt.query_map(params![team_id], row_to_local_session)?;
624 let mut result = Vec::new();
625 for row in rows {
626 result.push(row?);
627 }
628 Ok(result)
629 }
630
631 pub fn mark_synced(&self, session_id: &str) -> Result<()> {
632 self.conn().execute(
633 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
634 WHERE session_id = ?1",
635 params![session_id],
636 )?;
637 Ok(())
638 }
639
640 pub fn was_uploaded_after(
642 &self,
643 source_path: &str,
644 modified: &chrono::DateTime<chrono::Utc>,
645 ) -> Result<bool> {
646 let result: Option<String> = self
647 .conn()
648 .query_row(
649 "SELECT last_synced_at FROM session_sync \
650 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
651 params![source_path],
652 |row| row.get(0),
653 )
654 .optional()?;
655
656 if let Some(synced_at) = result {
657 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
658 return Ok(dt >= *modified);
659 }
660 }
661 Ok(false)
662 }
663
664 pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
667 self.conn().execute(
668 "INSERT INTO body_cache (session_id, body, cached_at) \
669 VALUES (?1, ?2, datetime('now')) \
670 ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
671 params![session_id, body],
672 )?;
673 Ok(())
674 }
675
676 pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
677 let body = self
678 .conn()
679 .query_row(
680 "SELECT body FROM body_cache WHERE session_id = ?1",
681 params![session_id],
682 |row| row.get(0),
683 )
684 .optional()?;
685 Ok(body)
686 }
687
688 pub fn migrate_from_state_json(
693 &self,
694 uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
695 ) -> Result<usize> {
696 let mut count = 0;
697 for (path, uploaded_at) in uploaded {
698 let exists: bool = self
699 .conn()
700 .query_row(
701 "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
702 params![path],
703 |row| row.get(0),
704 )
705 .unwrap_or(false);
706
707 if exists {
708 self.conn().execute(
709 "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
710 WHERE source_path = ?2 AND sync_status = 'local_only'",
711 params![uploaded_at.to_rfc3339(), path],
712 )?;
713 count += 1;
714 }
715 }
716 Ok(count)
717 }
718
719 pub fn link_commit_session(
723 &self,
724 commit_hash: &str,
725 session_id: &str,
726 repo_path: Option<&str>,
727 branch: Option<&str>,
728 ) -> Result<()> {
729 self.conn().execute(
730 "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
731 VALUES (?1, ?2, ?3, ?4) \
732 ON CONFLICT(commit_hash, session_id) DO NOTHING",
733 params![commit_hash, session_id, repo_path, branch],
734 )?;
735 Ok(())
736 }
737
738 pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
740 let sql = format!(
741 "SELECT {LOCAL_SESSION_COLUMNS} \
742 {FROM_CLAUSE} \
743 INNER JOIN commit_session_links csl ON csl.session_id = s.id \
744 WHERE csl.commit_hash = ?1 \
745 ORDER BY s.created_at DESC"
746 );
747 let conn = self.conn();
748 let mut stmt = conn.prepare(&sql)?;
749 let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
750 let mut result = Vec::new();
751 for row in rows {
752 result.push(row?);
753 }
754 Ok(result)
755 }
756
757 pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
759 let conn = self.conn();
760 let mut stmt = conn.prepare(
761 "SELECT commit_hash, session_id, repo_path, branch, created_at \
762 FROM commit_session_links WHERE session_id = ?1 \
763 ORDER BY created_at DESC",
764 )?;
765 let rows = stmt.query_map(params![session_id], |row| {
766 Ok(CommitLink {
767 commit_hash: row.get(0)?,
768 session_id: row.get(1)?,
769 repo_path: row.get(2)?,
770 branch: row.get(3)?,
771 created_at: row.get(4)?,
772 })
773 })?;
774 let mut result = Vec::new();
775 for row in rows {
776 result.push(row?);
777 }
778 Ok(result)
779 }
780
781 pub fn find_active_session_for_repo(
785 &self,
786 repo_path: &str,
787 since_minutes: u32,
788 ) -> Result<Option<LocalSessionRow>> {
789 let sql = format!(
790 "SELECT {LOCAL_SESSION_COLUMNS} \
791 {FROM_CLAUSE} \
792 WHERE s.working_directory LIKE ?1 \
793 AND s.created_at >= datetime('now', ?2) \
794 ORDER BY s.created_at DESC LIMIT 1"
795 );
796 let since = format!("-{since_minutes} minutes");
797 let like = format!("{repo_path}%");
798 let conn = self.conn();
799 let mut stmt = conn.prepare(&sql)?;
800 let row = stmt
801 .query_map(params![like, since], row_to_local_session)?
802 .next()
803 .transpose()?;
804 Ok(row)
805 }
806
807 pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
809 let conn = self.conn();
810 let mut stmt = conn
811 .prepare("SELECT id FROM sessions")
812 .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
813 let rows = stmt.query_map([], |row| row.get::<_, String>(0));
814 let mut set = std::collections::HashSet::new();
815 if let Ok(rows) = rows {
816 for row in rows.flatten() {
817 set.insert(row);
818 }
819 }
820 set
821 }
822
823 pub fn update_session_stats(&self, session: &Session) -> Result<()> {
825 let title = session.context.title.as_deref();
826 let description = session.context.description.as_deref();
827 let (files_modified, files_read, has_errors) =
828 opensession_core::extract::extract_file_metadata(session);
829
830 self.conn().execute(
831 "UPDATE sessions SET \
832 title=?2, description=?3, \
833 message_count=?4, user_message_count=?5, task_count=?6, \
834 event_count=?7, duration_seconds=?8, \
835 total_input_tokens=?9, total_output_tokens=?10, \
836 files_modified=?11, files_read=?12, has_errors=?13 \
837 WHERE id=?1",
838 params![
839 &session.session_id,
840 title,
841 description,
842 session.stats.message_count as i64,
843 session.stats.user_message_count as i64,
844 session.stats.task_count as i64,
845 session.stats.event_count as i64,
846 session.stats.duration_seconds as i64,
847 session.stats.total_input_tokens as i64,
848 session.stats.total_output_tokens as i64,
849 &files_modified,
850 &files_read,
851 has_errors,
852 ],
853 )?;
854 Ok(())
855 }
856
857 pub fn list_repos(&self) -> Result<Vec<String>> {
859 let conn = self.conn();
860 let mut stmt = conn.prepare(
861 "SELECT DISTINCT git_repo_name FROM sessions \
862 WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
863 )?;
864 let rows = stmt.query_map([], |row| row.get(0))?;
865 let mut result = Vec::new();
866 for row in rows {
867 result.push(row?);
868 }
869 Ok(result)
870 }
871}
872
873pub const LOCAL_SESSION_COLUMNS: &str = "\
875s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
876s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
877s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
878s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
879s.total_input_tokens, s.total_output_tokens, \
880s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
881s.pr_number, s.pr_url, s.working_directory, \
882s.files_modified, s.files_read, s.has_errors";
883
884fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
885 Ok(LocalSessionRow {
886 id: row.get(0)?,
887 source_path: row.get(1)?,
888 sync_status: row.get(2)?,
889 last_synced_at: row.get(3)?,
890 user_id: row.get(4)?,
891 nickname: row.get(5)?,
892 team_id: row.get(6)?,
893 tool: row.get(7)?,
894 agent_provider: row.get(8)?,
895 agent_model: row.get(9)?,
896 title: row.get(10)?,
897 description: row.get(11)?,
898 tags: row.get(12)?,
899 created_at: row.get(13)?,
900 uploaded_at: row.get(14)?,
901 message_count: row.get(15)?,
902 user_message_count: row.get(16)?,
903 task_count: row.get(17)?,
904 event_count: row.get(18)?,
905 duration_seconds: row.get(19)?,
906 total_input_tokens: row.get(20)?,
907 total_output_tokens: row.get(21)?,
908 git_remote: row.get(22)?,
909 git_branch: row.get(23)?,
910 git_commit: row.get(24)?,
911 git_repo_name: row.get(25)?,
912 pr_number: row.get(26)?,
913 pr_url: row.get(27)?,
914 working_directory: row.get(28)?,
915 files_modified: row.get(29)?,
916 files_read: row.get(30)?,
917 has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
918 })
919}
920
921fn default_db_path() -> Result<PathBuf> {
922 let home = std::env::var("HOME")
923 .or_else(|_| std::env::var("USERPROFILE"))
924 .context("Could not determine home directory")?;
925 Ok(PathBuf::from(home)
926 .join(".local")
927 .join("share")
928 .join("opensession")
929 .join("local.db"))
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935
936 fn test_db() -> LocalDb {
937 let dir = tempfile::tempdir().unwrap();
938 let path = dir.keep().join("test.db");
939 LocalDb::open_path(&path).unwrap()
940 }
941
942 #[test]
943 fn test_open_and_schema() {
944 let _db = test_db();
945 }
946
947 #[test]
948 fn test_sync_cursor() {
949 let db = test_db();
950 assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
951 db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
952 assert_eq!(
953 db.get_sync_cursor("team1").unwrap(),
954 Some("2024-01-01T00:00:00Z".to_string())
955 );
956 db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
958 assert_eq!(
959 db.get_sync_cursor("team1").unwrap(),
960 Some("2024-06-01T00:00:00Z".to_string())
961 );
962 }
963
964 #[test]
965 fn test_body_cache() {
966 let db = test_db();
967 assert_eq!(db.get_cached_body("s1").unwrap(), None);
968 db.cache_body("s1", b"hello world").unwrap();
969 assert_eq!(
970 db.get_cached_body("s1").unwrap(),
971 Some(b"hello world".to_vec())
972 );
973 }
974
975 #[test]
976 fn test_upsert_remote_session() {
977 let db = test_db();
978 let summary = SessionSummary {
979 id: "remote-1".to_string(),
980 user_id: Some("u1".to_string()),
981 nickname: Some("alice".to_string()),
982 team_id: "t1".to_string(),
983 tool: "claude-code".to_string(),
984 agent_provider: None,
985 agent_model: None,
986 title: Some("Test session".to_string()),
987 description: None,
988 tags: None,
989 created_at: "2024-01-01T00:00:00Z".to_string(),
990 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
991 message_count: 10,
992 task_count: 2,
993 event_count: 20,
994 duration_seconds: 300,
995 total_input_tokens: 1000,
996 total_output_tokens: 500,
997 git_remote: None,
998 git_branch: None,
999 git_commit: None,
1000 git_repo_name: None,
1001 pr_number: None,
1002 pr_url: None,
1003 working_directory: None,
1004 files_modified: None,
1005 files_read: None,
1006 has_errors: false,
1007 };
1008 db.upsert_remote_session(&summary).unwrap();
1009
1010 let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1011 assert_eq!(sessions.len(), 1);
1012 assert_eq!(sessions[0].id, "remote-1");
1013 assert_eq!(sessions[0].sync_status, "remote_only");
1014 assert_eq!(sessions[0].nickname, None); }
1016
1017 #[test]
1018 fn test_list_filter_by_repo() {
1019 let db = test_db();
1020 let summary1 = SessionSummary {
1022 id: "s1".to_string(),
1023 user_id: None,
1024 nickname: None,
1025 team_id: "t1".to_string(),
1026 tool: "claude-code".to_string(),
1027 agent_provider: None,
1028 agent_model: None,
1029 title: Some("Session 1".to_string()),
1030 description: None,
1031 tags: None,
1032 created_at: "2024-01-01T00:00:00Z".to_string(),
1033 uploaded_at: "2024-01-01T01:00:00Z".to_string(),
1034 message_count: 5,
1035 task_count: 0,
1036 event_count: 10,
1037 duration_seconds: 60,
1038 total_input_tokens: 100,
1039 total_output_tokens: 50,
1040 git_remote: None,
1041 git_branch: None,
1042 git_commit: None,
1043 git_repo_name: None,
1044 pr_number: None,
1045 pr_url: None,
1046 working_directory: None,
1047 files_modified: None,
1048 files_read: None,
1049 has_errors: false,
1050 };
1051 db.upsert_remote_session(&summary1).unwrap();
1052
1053 let filter = LocalSessionFilter {
1055 team_id: Some("t1".to_string()),
1056 ..Default::default()
1057 };
1058 assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
1059
1060 let filter = LocalSessionFilter {
1061 team_id: Some("t999".to_string()),
1062 ..Default::default()
1063 };
1064 assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
1065 }
1066
1067 fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> SessionSummary {
1070 SessionSummary {
1071 id: id.to_string(),
1072 user_id: None,
1073 nickname: None,
1074 team_id: "t1".to_string(),
1075 tool: tool.to_string(),
1076 agent_provider: Some("anthropic".to_string()),
1077 agent_model: Some("claude-opus-4-6".to_string()),
1078 title: Some(title.to_string()),
1079 description: None,
1080 tags: None,
1081 created_at: created_at.to_string(),
1082 uploaded_at: created_at.to_string(),
1083 message_count: 5,
1084 task_count: 1,
1085 event_count: 10,
1086 duration_seconds: 300,
1087 total_input_tokens: 1000,
1088 total_output_tokens: 500,
1089 git_remote: None,
1090 git_branch: None,
1091 git_commit: None,
1092 git_repo_name: None,
1093 pr_number: None,
1094 pr_url: None,
1095 working_directory: None,
1096 files_modified: None,
1097 files_read: None,
1098 has_errors: false,
1099 }
1100 }
1101
1102 fn seed_sessions(db: &LocalDb) {
1103 db.upsert_remote_session(&make_summary(
1105 "s1",
1106 "claude-code",
1107 "First session",
1108 "2024-01-01T00:00:00Z",
1109 ))
1110 .unwrap();
1111 db.upsert_remote_session(&make_summary(
1112 "s2",
1113 "claude-code",
1114 "JWT auth work",
1115 "2024-01-02T00:00:00Z",
1116 ))
1117 .unwrap();
1118 db.upsert_remote_session(&make_summary(
1119 "s3",
1120 "gemini",
1121 "Gemini test",
1122 "2024-01-03T00:00:00Z",
1123 ))
1124 .unwrap();
1125 db.upsert_remote_session(&make_summary(
1126 "s4",
1127 "claude-code",
1128 "Error handling",
1129 "2024-01-04T00:00:00Z",
1130 ))
1131 .unwrap();
1132 db.upsert_remote_session(&make_summary(
1133 "s5",
1134 "claude-code",
1135 "Final polish",
1136 "2024-01-05T00:00:00Z",
1137 ))
1138 .unwrap();
1139 }
1140
1141 #[test]
1144 fn test_log_no_filters() {
1145 let db = test_db();
1146 seed_sessions(&db);
1147 let filter = LogFilter::default();
1148 let results = db.list_sessions_log(&filter).unwrap();
1149 assert_eq!(results.len(), 5);
1150 assert_eq!(results[0].id, "s5");
1152 assert_eq!(results[4].id, "s1");
1153 }
1154
1155 #[test]
1156 fn test_log_filter_by_tool() {
1157 let db = test_db();
1158 seed_sessions(&db);
1159 let filter = LogFilter {
1160 tool: Some("claude-code".to_string()),
1161 ..Default::default()
1162 };
1163 let results = db.list_sessions_log(&filter).unwrap();
1164 assert_eq!(results.len(), 4);
1165 assert!(results.iter().all(|s| s.tool == "claude-code"));
1166 }
1167
1168 #[test]
1169 fn test_log_filter_by_model_wildcard() {
1170 let db = test_db();
1171 seed_sessions(&db);
1172 let filter = LogFilter {
1173 model: Some("claude*".to_string()),
1174 ..Default::default()
1175 };
1176 let results = db.list_sessions_log(&filter).unwrap();
1177 assert_eq!(results.len(), 5); }
1179
1180 #[test]
1181 fn test_log_filter_since() {
1182 let db = test_db();
1183 seed_sessions(&db);
1184 let filter = LogFilter {
1185 since: Some("2024-01-03T00:00:00Z".to_string()),
1186 ..Default::default()
1187 };
1188 let results = db.list_sessions_log(&filter).unwrap();
1189 assert_eq!(results.len(), 3); }
1191
1192 #[test]
1193 fn test_log_filter_before() {
1194 let db = test_db();
1195 seed_sessions(&db);
1196 let filter = LogFilter {
1197 before: Some("2024-01-03T00:00:00Z".to_string()),
1198 ..Default::default()
1199 };
1200 let results = db.list_sessions_log(&filter).unwrap();
1201 assert_eq!(results.len(), 2); }
1203
1204 #[test]
1205 fn test_log_filter_since_and_before() {
1206 let db = test_db();
1207 seed_sessions(&db);
1208 let filter = LogFilter {
1209 since: Some("2024-01-02T00:00:00Z".to_string()),
1210 before: Some("2024-01-04T00:00:00Z".to_string()),
1211 ..Default::default()
1212 };
1213 let results = db.list_sessions_log(&filter).unwrap();
1214 assert_eq!(results.len(), 2); }
1216
1217 #[test]
1218 fn test_log_filter_grep() {
1219 let db = test_db();
1220 seed_sessions(&db);
1221 let filter = LogFilter {
1222 grep: Some("JWT".to_string()),
1223 ..Default::default()
1224 };
1225 let results = db.list_sessions_log(&filter).unwrap();
1226 assert_eq!(results.len(), 1);
1227 assert_eq!(results[0].id, "s2");
1228 }
1229
1230 #[test]
1231 fn test_log_limit_and_offset() {
1232 let db = test_db();
1233 seed_sessions(&db);
1234 let filter = LogFilter {
1235 limit: Some(2),
1236 offset: Some(1),
1237 ..Default::default()
1238 };
1239 let results = db.list_sessions_log(&filter).unwrap();
1240 assert_eq!(results.len(), 2);
1241 assert_eq!(results[0].id, "s4"); assert_eq!(results[1].id, "s3");
1243 }
1244
1245 #[test]
1246 fn test_log_limit_only() {
1247 let db = test_db();
1248 seed_sessions(&db);
1249 let filter = LogFilter {
1250 limit: Some(3),
1251 ..Default::default()
1252 };
1253 let results = db.list_sessions_log(&filter).unwrap();
1254 assert_eq!(results.len(), 3);
1255 }
1256
1257 #[test]
1258 fn test_log_combined_filters() {
1259 let db = test_db();
1260 seed_sessions(&db);
1261 let filter = LogFilter {
1262 tool: Some("claude-code".to_string()),
1263 since: Some("2024-01-03T00:00:00Z".to_string()),
1264 limit: Some(1),
1265 ..Default::default()
1266 };
1267 let results = db.list_sessions_log(&filter).unwrap();
1268 assert_eq!(results.len(), 1);
1269 assert_eq!(results[0].id, "s5"); }
1271
1272 #[test]
1275 fn test_get_session_by_offset() {
1276 let db = test_db();
1277 seed_sessions(&db);
1278 let row = db.get_session_by_offset(0).unwrap().unwrap();
1279 assert_eq!(row.id, "s5"); let row = db.get_session_by_offset(2).unwrap().unwrap();
1281 assert_eq!(row.id, "s3");
1282 assert!(db.get_session_by_offset(10).unwrap().is_none());
1283 }
1284
1285 #[test]
1286 fn test_get_session_by_tool_offset() {
1287 let db = test_db();
1288 seed_sessions(&db);
1289 let row = db
1290 .get_session_by_tool_offset("claude-code", 0)
1291 .unwrap()
1292 .unwrap();
1293 assert_eq!(row.id, "s5");
1294 let row = db
1295 .get_session_by_tool_offset("claude-code", 1)
1296 .unwrap()
1297 .unwrap();
1298 assert_eq!(row.id, "s4");
1299 let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
1300 assert_eq!(row.id, "s3");
1301 assert!(db
1302 .get_session_by_tool_offset("gemini", 1)
1303 .unwrap()
1304 .is_none());
1305 }
1306
1307 #[test]
1308 fn test_get_sessions_latest() {
1309 let db = test_db();
1310 seed_sessions(&db);
1311 let rows = db.get_sessions_latest(3).unwrap();
1312 assert_eq!(rows.len(), 3);
1313 assert_eq!(rows[0].id, "s5");
1314 assert_eq!(rows[1].id, "s4");
1315 assert_eq!(rows[2].id, "s3");
1316 }
1317
1318 #[test]
1319 fn test_get_sessions_by_tool_latest() {
1320 let db = test_db();
1321 seed_sessions(&db);
1322 let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
1323 assert_eq!(rows.len(), 2);
1324 assert_eq!(rows[0].id, "s5");
1325 assert_eq!(rows[1].id, "s4");
1326 }
1327
1328 #[test]
1329 fn test_get_sessions_latest_more_than_available() {
1330 let db = test_db();
1331 seed_sessions(&db);
1332 let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
1333 assert_eq!(rows.len(), 1); }
1335
1336 #[test]
1337 fn test_session_count() {
1338 let db = test_db();
1339 assert_eq!(db.session_count().unwrap(), 0);
1340 seed_sessions(&db);
1341 assert_eq!(db.session_count().unwrap(), 5);
1342 }
1343
1344 #[test]
1347 fn test_link_commit_session() {
1348 let db = test_db();
1349 seed_sessions(&db);
1350 db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
1351 .unwrap();
1352
1353 let commits = db.get_commits_by_session("s1").unwrap();
1354 assert_eq!(commits.len(), 1);
1355 assert_eq!(commits[0].commit_hash, "abc123");
1356 assert_eq!(commits[0].session_id, "s1");
1357 assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
1358 assert_eq!(commits[0].branch.as_deref(), Some("main"));
1359
1360 let sessions = db.get_sessions_by_commit("abc123").unwrap();
1361 assert_eq!(sessions.len(), 1);
1362 assert_eq!(sessions[0].id, "s1");
1363 }
1364
1365 #[test]
1366 fn test_get_sessions_by_commit() {
1367 let db = test_db();
1368 seed_sessions(&db);
1369 db.link_commit_session("abc123", "s1", None, None).unwrap();
1371 db.link_commit_session("abc123", "s2", None, None).unwrap();
1372 db.link_commit_session("abc123", "s3", None, None).unwrap();
1373
1374 let sessions = db.get_sessions_by_commit("abc123").unwrap();
1375 assert_eq!(sessions.len(), 3);
1376 assert_eq!(sessions[0].id, "s3");
1378 assert_eq!(sessions[1].id, "s2");
1379 assert_eq!(sessions[2].id, "s1");
1380 }
1381
1382 #[test]
1383 fn test_get_commits_by_session() {
1384 let db = test_db();
1385 seed_sessions(&db);
1386 db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
1388 .unwrap();
1389 db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
1390 .unwrap();
1391 db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
1392 .unwrap();
1393
1394 let commits = db.get_commits_by_session("s1").unwrap();
1395 assert_eq!(commits.len(), 3);
1396 assert!(commits.iter().all(|c| c.session_id == "s1"));
1398 }
1399
1400 #[test]
1401 fn test_duplicate_link_ignored() {
1402 let db = test_db();
1403 seed_sessions(&db);
1404 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
1405 .unwrap();
1406 db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
1408 .unwrap();
1409
1410 let commits = db.get_commits_by_session("s1").unwrap();
1411 assert_eq!(commits.len(), 1);
1412 }
1413
1414 #[test]
1415 fn test_log_filter_by_commit() {
1416 let db = test_db();
1417 seed_sessions(&db);
1418 db.link_commit_session("abc123", "s2", None, None).unwrap();
1419 db.link_commit_session("abc123", "s4", None, None).unwrap();
1420
1421 let filter = LogFilter {
1422 commit: Some("abc123".to_string()),
1423 ..Default::default()
1424 };
1425 let results = db.list_sessions_log(&filter).unwrap();
1426 assert_eq!(results.len(), 2);
1427 assert_eq!(results[0].id, "s4");
1428 assert_eq!(results[1].id, "s2");
1429
1430 let filter = LogFilter {
1432 commit: Some("nonexistent".to_string()),
1433 ..Default::default()
1434 };
1435 let results = db.list_sessions_log(&filter).unwrap();
1436 assert_eq!(results.len(), 0);
1437 }
1438}