1use std::path::Path;
4
5use anyhow::{Context, Result};
6use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
7use sqlx::{Row, SqlitePool};
8use tracing::debug;
9
10use cargowatch_core::{
11 ArtifactRecord, DetectedProcess, DiagnosticRecord, LogEntry, SessionFinished,
12 SessionHistoryEntry, SessionInfo, SessionMode, SessionState, SessionStatus, SummaryCounts,
13};
14
15static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
16
17#[derive(Clone, Debug)]
19pub struct SessionStore {
20 pool: SqlitePool,
21}
22
23impl SessionStore {
24 pub async fn connect(path: &Path) -> Result<Self> {
26 if let Some(parent) = path.parent() {
27 tokio::fs::create_dir_all(parent)
28 .await
29 .with_context(|| format!("failed to create {}", parent.display()))?;
30 }
31
32 let options = SqliteConnectOptions::new()
33 .filename(path)
34 .create_if_missing(true)
35 .journal_mode(SqliteJournalMode::Wal)
36 .foreign_keys(true);
37 let pool = SqlitePoolOptions::new()
38 .max_connections(4)
39 .connect_with(options)
40 .await
41 .context("failed to connect to sqlite")?;
42 MIGRATOR
43 .run(&pool)
44 .await
45 .context("failed to run migrations")?;
46 debug!(db_path = %path.display(), "sqlite store ready");
47 Ok(Self { pool })
48 }
49
50 pub async fn insert_session_start(&self, info: &SessionInfo) -> Result<()> {
52 sqlx::query(
53 r#"
54 INSERT INTO sessions (
55 session_id, mode, title, command_json, cwd, workspace_root, status,
56 external_pid, classification, started_at, updated_at
57 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
58 ON CONFLICT(session_id) DO UPDATE SET
59 mode = excluded.mode,
60 title = excluded.title,
61 command_json = excluded.command_json,
62 cwd = excluded.cwd,
63 workspace_root = excluded.workspace_root,
64 status = excluded.status,
65 external_pid = excluded.external_pid,
66 classification = excluded.classification,
67 started_at = excluded.started_at,
68 updated_at = excluded.updated_at
69 "#,
70 )
71 .bind(&info.session_id)
72 .bind(mode_to_str(info.mode))
73 .bind(&info.title)
74 .bind(serde_json::to_string(&info.command)?)
75 .bind(info.cwd.display().to_string())
76 .bind(
77 info.workspace_root
78 .as_ref()
79 .map(|path| path.display().to_string()),
80 )
81 .bind(status_to_str(info.status))
82 .bind(info.external_pid.map(i64::from))
83 .bind(
84 info.classification
85 .map(|classification| classification.label()),
86 )
87 .bind(info.started_at)
88 .bind(info.started_at)
89 .execute(&self.pool)
90 .await?;
91 Ok(())
92 }
93
94 pub async fn insert_log_line(&self, session_id: &str, entry: &LogEntry) -> Result<()> {
96 sqlx::query(
97 r#"
98 INSERT INTO session_logs (session_id, sequence, timestamp, stream, text, raw, severity)
99 VALUES (?, ?, ?, ?, ?, ?, ?)
100 "#,
101 )
102 .bind(session_id)
103 .bind(i64::try_from(entry.sequence).unwrap_or(i64::MAX))
104 .bind(entry.timestamp)
105 .bind(stream_to_str(entry.stream))
106 .bind(&entry.text)
107 .bind(&entry.raw)
108 .bind(entry.severity.map(severity_to_str))
109 .execute(&self.pool)
110 .await?;
111 Ok(())
112 }
113
114 pub async fn insert_diagnostic(
116 &self,
117 session_id: &str,
118 diagnostic: &DiagnosticRecord,
119 ) -> Result<()> {
120 sqlx::query(
121 r#"
122 INSERT INTO session_diagnostics (
123 session_id, diagnostic_id, timestamp, severity, message, rendered, code,
124 file, line, column_number, target, package_id
125 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
126 "#,
127 )
128 .bind(session_id)
129 .bind(&diagnostic.id)
130 .bind(diagnostic.timestamp)
131 .bind(severity_to_str(diagnostic.severity))
132 .bind(&diagnostic.message)
133 .bind(&diagnostic.rendered)
134 .bind(&diagnostic.code)
135 .bind(
136 diagnostic
137 .file
138 .as_ref()
139 .map(|path| path.display().to_string()),
140 )
141 .bind(diagnostic.line.map(i64::from))
142 .bind(diagnostic.column.map(i64::from))
143 .bind(&diagnostic.target)
144 .bind(&diagnostic.package_id)
145 .execute(&self.pool)
146 .await?;
147 Ok(())
148 }
149
150 pub async fn insert_artifact(&self, session_id: &str, artifact: &ArtifactRecord) -> Result<()> {
152 sqlx::query(
153 r#"
154 INSERT INTO session_artifacts (
155 session_id, sequence, timestamp, package_id, target,
156 filenames_json, executable, fresh
157 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
158 "#,
159 )
160 .bind(session_id)
161 .bind(i64::try_from(artifact.sequence).unwrap_or(i64::MAX))
162 .bind(artifact.timestamp)
163 .bind(&artifact.package_id)
164 .bind(&artifact.target)
165 .bind(serde_json::to_string(&artifact.filenames)?)
166 .bind(
167 artifact
168 .executable
169 .as_ref()
170 .map(|path| path.display().to_string()),
171 )
172 .bind(if artifact.fresh { 1_i64 } else { 0_i64 })
173 .execute(&self.pool)
174 .await?;
175 Ok(())
176 }
177
178 pub async fn finish_session(&self, finish: &SessionFinished) -> Result<()> {
180 sqlx::query(
181 r#"
182 UPDATE sessions
183 SET status = ?, finished_at = ?, duration_ms = ?, exit_code = ?,
184 errors = ?, warnings = ?, notes = ?, help = ?, info = ?, updated_at = ?
185 WHERE session_id = ?
186 "#,
187 )
188 .bind(status_to_str(finish.status))
189 .bind(finish.finished_at)
190 .bind(finish.duration_ms)
191 .bind(finish.exit_code)
192 .bind(i64::from(finish.summary.errors))
193 .bind(i64::from(finish.summary.warnings))
194 .bind(i64::from(finish.summary.notes))
195 .bind(i64::from(finish.summary.help))
196 .bind(i64::from(finish.summary.info))
197 .bind(finish.finished_at)
198 .bind(&finish.session_id)
199 .execute(&self.pool)
200 .await?;
201 Ok(())
202 }
203
204 pub async fn upsert_detected_process(&self, process: &DetectedProcess) -> Result<()> {
206 let info = SessionInfo {
207 session_id: process.session_id.clone(),
208 mode: SessionMode::Detected,
209 title: format!("{} ({})", process.classification.label(), process.pid),
210 command: process.command.clone(),
211 cwd: process.cwd.clone().unwrap_or_else(|| ".".into()),
212 workspace_root: process.workspace_root.clone(),
213 started_at: process.started_at,
214 status: SessionStatus::Running,
215 external_pid: Some(process.pid),
216 classification: Some(process.classification),
217 };
218 self.insert_session_start(&info).await?;
219 sqlx::query(
220 r#"
221 UPDATE sessions
222 SET duration_ms = ?, updated_at = ?
223 WHERE session_id = ?
224 "#,
225 )
226 .bind(process.elapsed_ms)
227 .bind(process.last_seen_at)
228 .bind(&process.session_id)
229 .execute(&self.pool)
230 .await?;
231 Ok(())
232 }
233
234 pub async fn mark_process_gone(
236 &self,
237 session_id: &str,
238 observed_at: time::OffsetDateTime,
239 ) -> Result<()> {
240 sqlx::query(
241 r#"
242 UPDATE sessions
243 SET status = ?, finished_at = ?, updated_at = ?
244 WHERE session_id = ?
245 "#,
246 )
247 .bind(status_to_str(SessionStatus::Lost))
248 .bind(observed_at)
249 .bind(observed_at)
250 .bind(session_id)
251 .execute(&self.pool)
252 .await?;
253 Ok(())
254 }
255
256 pub async fn recent_sessions(&self, limit: usize) -> Result<Vec<SessionHistoryEntry>> {
258 let rows = sqlx::query(
259 r#"
260 SELECT
261 session_id, mode, title, command_json, cwd, workspace_root, status,
262 external_pid, classification, started_at, finished_at, duration_ms, exit_code,
263 errors, warnings, notes, help, info
264 FROM sessions
265 ORDER BY started_at DESC
266 LIMIT ?
267 "#,
268 )
269 .bind(i64::try_from(limit).unwrap_or(i64::MAX))
270 .fetch_all(&self.pool)
271 .await?;
272
273 rows.into_iter().map(row_to_history).collect()
274 }
275
276 pub async fn load_session(
278 &self,
279 session_id: &str,
280 max_logs: usize,
281 ) -> Result<Option<SessionState>> {
282 let row = sqlx::query(
283 r#"
284 SELECT
285 session_id, mode, title, command_json, cwd, workspace_root, status,
286 external_pid, classification, started_at, finished_at, duration_ms, exit_code,
287 errors, warnings, notes, help, info
288 FROM sessions
289 WHERE session_id = ?
290 "#,
291 )
292 .bind(session_id)
293 .fetch_optional(&self.pool)
294 .await?;
295 let Some(row) = row else {
296 return Ok(None);
297 };
298 let history = row_to_history(row)?;
299 let mut state = SessionState::new(history.info, max_logs);
300 state.finished_at = history.finished_at;
301 state.duration_ms = history.duration_ms;
302 state.exit_code = history.exit_code;
303 state.summary = history.summary;
304
305 let log_rows = sqlx::query(
306 r#"
307 SELECT sequence, timestamp, stream, text, raw, severity
308 FROM session_logs
309 WHERE session_id = ?
310 ORDER BY sequence DESC
311 LIMIT ?
312 "#,
313 )
314 .bind(session_id)
315 .bind(i64::try_from(max_logs).unwrap_or(i64::MAX))
316 .fetch_all(&self.pool)
317 .await?;
318 let mut logs = log_rows
319 .into_iter()
320 .map(|row| {
321 Ok(LogEntry {
322 sequence: row.try_get::<i64, _>("sequence")?.max(0) as u64,
323 timestamp: row.try_get("timestamp")?,
324 stream: str_to_stream(row.try_get("stream")?),
325 text: row.try_get("text")?,
326 raw: row.try_get("raw")?,
327 severity: row
328 .try_get::<Option<String>, _>("severity")?
329 .as_deref()
330 .map(str_to_severity),
331 })
332 })
333 .collect::<Result<Vec<_>, sqlx::Error>>()?;
334 logs.reverse();
335 state.logs.extend(logs);
336
337 let diagnostics = sqlx::query(
338 r#"
339 SELECT diagnostic_id, timestamp, severity, message, rendered, code, file, line, column_number, target, package_id
340 FROM session_diagnostics
341 WHERE session_id = ?
342 ORDER BY timestamp ASC
343 "#,
344 )
345 .bind(session_id)
346 .fetch_all(&self.pool)
347 .await?;
348 state.diagnostics = diagnostics
349 .into_iter()
350 .map(|row| {
351 Ok(DiagnosticRecord {
352 id: row.try_get("diagnostic_id")?,
353 timestamp: row.try_get("timestamp")?,
354 severity: str_to_severity(row.try_get::<String, _>("severity")?.as_str()),
355 message: row.try_get("message")?,
356 rendered: row.try_get("rendered")?,
357 code: row.try_get("code")?,
358 file: row
359 .try_get::<Option<String>, _>("file")?
360 .map(std::path::PathBuf::from),
361 line: row
362 .try_get::<Option<i64>, _>("line")?
363 .map(|value| value as u32),
364 column: row
365 .try_get::<Option<i64>, _>("column_number")?
366 .map(|value| value as u32),
367 target: row.try_get("target")?,
368 package_id: row.try_get("package_id")?,
369 })
370 })
371 .collect::<Result<Vec<_>, sqlx::Error>>()?;
372
373 let artifacts = sqlx::query(
374 r#"
375 SELECT sequence, timestamp, package_id, target, filenames_json, executable, fresh
376 FROM session_artifacts
377 WHERE session_id = ?
378 ORDER BY sequence ASC, id ASC
379 "#,
380 )
381 .bind(session_id)
382 .fetch_all(&self.pool)
383 .await?;
384 state.artifacts = artifacts
385 .into_iter()
386 .map(|row| {
387 Ok(ArtifactRecord {
388 sequence: row.try_get::<i64, _>("sequence")?.max(0) as u64,
389 timestamp: row.try_get("timestamp")?,
390 package_id: row.try_get("package_id")?,
391 target: row.try_get("target")?,
392 filenames: serde_json::from_str(&row.try_get::<String, _>("filenames_json")?)?,
393 executable: row
394 .try_get::<Option<String>, _>("executable")?
395 .map(Into::into),
396 fresh: row.try_get::<i64, _>("fresh")? != 0,
397 })
398 })
399 .collect::<Result<Vec<_>>>()?;
400
401 Ok(Some(state))
402 }
403
404 pub async fn cleanup_old_sessions(&self, retention_days: u32) -> Result<u64> {
406 let cutoff =
407 time::OffsetDateTime::now_utc() - time::Duration::days(i64::from(retention_days));
408 let result = sqlx::query("DELETE FROM sessions WHERE started_at < ?")
409 .bind(cutoff)
410 .execute(&self.pool)
411 .await?;
412 Ok(result.rows_affected())
413 }
414}
415
416fn row_to_history(row: sqlx::sqlite::SqliteRow) -> Result<SessionHistoryEntry> {
417 let mode = str_to_mode(row.try_get::<String, _>("mode")?.as_str());
418 let status = str_to_status(row.try_get::<String, _>("status")?.as_str());
419 let info = SessionInfo {
420 session_id: row.try_get("session_id")?,
421 mode,
422 title: row.try_get("title")?,
423 command: serde_json::from_str(&row.try_get::<String, _>("command_json")?)?,
424 cwd: row.try_get::<String, _>("cwd")?.into(),
425 workspace_root: row
426 .try_get::<Option<String>, _>("workspace_root")?
427 .map(Into::into),
428 started_at: row.try_get("started_at")?,
429 status,
430 external_pid: row
431 .try_get::<Option<i64>, _>("external_pid")?
432 .map(|value| value as u32),
433 classification: None,
434 };
435 Ok(SessionHistoryEntry {
436 info,
437 finished_at: row.try_get("finished_at")?,
438 exit_code: row.try_get("exit_code")?,
439 duration_ms: row.try_get("duration_ms")?,
440 summary: SummaryCounts {
441 errors: row.try_get::<i64, _>("errors")?.max(0) as u32,
442 warnings: row.try_get::<i64, _>("warnings")?.max(0) as u32,
443 notes: row.try_get::<i64, _>("notes")?.max(0) as u32,
444 help: row.try_get::<i64, _>("help")?.max(0) as u32,
445 info: row.try_get::<i64, _>("info")?.max(0) as u32,
446 },
447 })
448}
449
450fn mode_to_str(mode: SessionMode) -> &'static str {
451 match mode {
452 SessionMode::Managed => "managed",
453 SessionMode::Detected => "detected",
454 }
455}
456
457fn str_to_mode(value: &str) -> SessionMode {
458 match value {
459 "detected" => SessionMode::Detected,
460 _ => SessionMode::Managed,
461 }
462}
463
464fn status_to_str(status: SessionStatus) -> &'static str {
465 match status {
466 SessionStatus::Running => "running",
467 SessionStatus::Succeeded => "succeeded",
468 SessionStatus::Failed => "failed",
469 SessionStatus::Cancelled => "cancelled",
470 SessionStatus::Lost => "lost",
471 }
472}
473
474fn str_to_status(value: &str) -> SessionStatus {
475 match value {
476 "succeeded" => SessionStatus::Succeeded,
477 "failed" => SessionStatus::Failed,
478 "cancelled" => SessionStatus::Cancelled,
479 "lost" => SessionStatus::Lost,
480 _ => SessionStatus::Running,
481 }
482}
483
484fn stream_to_str(stream: cargowatch_core::OutputStream) -> &'static str {
485 match stream {
486 cargowatch_core::OutputStream::Stdout => "stdout",
487 cargowatch_core::OutputStream::Stderr => "stderr",
488 cargowatch_core::OutputStream::System => "system",
489 }
490}
491
492fn str_to_stream(value: &str) -> cargowatch_core::OutputStream {
493 match value {
494 "stderr" => cargowatch_core::OutputStream::Stderr,
495 "system" => cargowatch_core::OutputStream::System,
496 _ => cargowatch_core::OutputStream::Stdout,
497 }
498}
499
500fn severity_to_str(severity: cargowatch_core::event::Severity) -> &'static str {
501 match severity {
502 cargowatch_core::event::Severity::Error => "error",
503 cargowatch_core::event::Severity::Warning => "warning",
504 cargowatch_core::event::Severity::Note => "note",
505 cargowatch_core::event::Severity::Help => "help",
506 cargowatch_core::event::Severity::Info => "info",
507 cargowatch_core::event::Severity::Success => "success",
508 }
509}
510
511fn str_to_severity(value: &str) -> cargowatch_core::event::Severity {
512 match value {
513 "error" => cargowatch_core::event::Severity::Error,
514 "warning" => cargowatch_core::event::Severity::Warning,
515 "note" => cargowatch_core::event::Severity::Note,
516 "help" => cargowatch_core::event::Severity::Help,
517 "success" => cargowatch_core::event::Severity::Success,
518 _ => cargowatch_core::event::Severity::Info,
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use tempfile::tempdir;
525
526 use super::*;
527 use cargowatch_core::{SessionMode, SessionStatus};
528
529 #[tokio::test]
530 async fn store_round_trips_history_and_session_details() {
531 let temp = tempdir().expect("tempdir");
532 let store = SessionStore::connect(&temp.path().join("cargowatch.db"))
533 .await
534 .expect("store");
535 let info = SessionInfo {
536 session_id: "session-1".to_string(),
537 mode: SessionMode::Managed,
538 title: "cargo check".to_string(),
539 command: vec!["cargo".into(), "check".into()],
540 cwd: temp.path().to_path_buf(),
541 workspace_root: Some(temp.path().to_path_buf()),
542 started_at: time::OffsetDateTime::now_utc(),
543 status: SessionStatus::Running,
544 external_pid: Some(42),
545 classification: None,
546 };
547
548 store.insert_session_start(&info).await.expect("start");
549 store
550 .insert_log_line(
551 &info.session_id,
552 &LogEntry {
553 sequence: 1,
554 timestamp: info.started_at,
555 stream: cargowatch_core::OutputStream::Stdout,
556 text: "checking demo".to_string(),
557 raw: None,
558 severity: Some(cargowatch_core::event::Severity::Info),
559 },
560 )
561 .await
562 .expect("log");
563 store
564 .insert_diagnostic(
565 &info.session_id,
566 &DiagnosticRecord {
567 id: "diag-1".to_string(),
568 timestamp: info.started_at,
569 severity: cargowatch_core::event::Severity::Warning,
570 message: "unused variable".to_string(),
571 rendered: Some("warning: unused variable".to_string()),
572 code: Some("unused_variables".to_string()),
573 file: None,
574 line: None,
575 column: None,
576 target: Some("demo".to_string()),
577 package_id: Some("demo 0.1.0".to_string()),
578 },
579 )
580 .await
581 .expect("diagnostic");
582 store
583 .insert_artifact(
584 &info.session_id,
585 &ArtifactRecord {
586 sequence: 2,
587 timestamp: info.started_at + time::Duration::milliseconds(250),
588 package_id: Some("demo 0.1.0".to_string()),
589 target: Some("demo".to_string()),
590 filenames: vec![temp.path().join("target/debug/demo")],
591 executable: Some(temp.path().join("target/debug/demo")),
592 fresh: false,
593 },
594 )
595 .await
596 .expect("artifact");
597 store
598 .insert_artifact(
599 &info.session_id,
600 &ArtifactRecord {
601 sequence: 3,
602 timestamp: info.started_at + time::Duration::milliseconds(500),
603 package_id: Some("demo 0.1.0".to_string()),
604 target: Some("demo-tests".to_string()),
605 filenames: vec![temp.path().join("target/debug/deps/demo_tests")],
606 executable: None,
607 fresh: true,
608 },
609 )
610 .await
611 .expect("artifact");
612 store
613 .finish_session(&SessionFinished {
614 session_id: info.session_id.clone(),
615 finished_at: info.started_at + time::Duration::seconds(1),
616 status: SessionStatus::Succeeded,
617 exit_code: Some(0),
618 duration_ms: 1_000,
619 summary: SummaryCounts {
620 errors: 0,
621 warnings: 1,
622 notes: 0,
623 help: 0,
624 info: 1,
625 },
626 })
627 .await
628 .expect("finish");
629
630 let history = store.recent_sessions(10).await.expect("history");
631 assert_eq!(history.len(), 1);
632 assert_eq!(history[0].summary.warnings, 1);
633
634 let session = store
635 .load_session(&info.session_id, 100)
636 .await
637 .expect("load")
638 .expect("session");
639 assert_eq!(session.logs.len(), 1);
640 assert_eq!(session.diagnostics.len(), 1);
641 assert_eq!(session.artifacts.len(), 2);
642 assert_eq!(session.artifacts[0].sequence, 2);
643 assert_eq!(session.artifacts[1].sequence, 3);
644 assert_eq!(session.artifacts[0].target.as_deref(), Some("demo"));
645 }
646}