Skip to main content

cargowatch_store/
lib.rs

1//! SQLite persistence for CargoWatch sessions.
2
3use std::path::Path;
4
5use anyhow::{Context, Result};
6use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
7use sqlx::{Row, SqlitePool};
8use tracing::debug;
9
10use cargowatch_core::{
11    ArtifactRecord, DetectedProcess, DiagnosticRecord, LogEntry, SessionFinished,
12    SessionHistoryEntry, SessionInfo, SessionMode, SessionState, SessionStatus, SummaryCounts,
13};
14
15static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
16
17/// Shared SQLite-backed session store.
18#[derive(Clone, Debug)]
19pub struct SessionStore {
20    pool: SqlitePool,
21}
22
23impl SessionStore {
24    /// Open the SQLite database, creating it and applying migrations if needed.
25    pub async fn connect(path: &Path) -> Result<Self> {
26        if let Some(parent) = path.parent() {
27            tokio::fs::create_dir_all(parent)
28                .await
29                .with_context(|| format!("failed to create {}", parent.display()))?;
30        }
31
32        let options = SqliteConnectOptions::new()
33            .filename(path)
34            .create_if_missing(true)
35            .journal_mode(SqliteJournalMode::Wal)
36            .foreign_keys(true);
37        let pool = SqlitePoolOptions::new()
38            .max_connections(4)
39            .connect_with(options)
40            .await
41            .context("failed to connect to sqlite")?;
42        MIGRATOR
43            .run(&pool)
44            .await
45            .context("failed to run migrations")?;
46        debug!(db_path = %path.display(), "sqlite store ready");
47        Ok(Self { pool })
48    }
49
50    /// Persist the start of a session.
51    pub async fn insert_session_start(&self, info: &SessionInfo) -> Result<()> {
52        sqlx::query(
53            r#"
54            INSERT INTO sessions (
55                session_id, mode, title, command_json, cwd, workspace_root, status,
56                external_pid, classification, started_at, updated_at
57            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
58            ON CONFLICT(session_id) DO UPDATE SET
59                mode = excluded.mode,
60                title = excluded.title,
61                command_json = excluded.command_json,
62                cwd = excluded.cwd,
63                workspace_root = excluded.workspace_root,
64                status = excluded.status,
65                external_pid = excluded.external_pid,
66                classification = excluded.classification,
67                started_at = excluded.started_at,
68                updated_at = excluded.updated_at
69            "#,
70        )
71        .bind(&info.session_id)
72        .bind(mode_to_str(info.mode))
73        .bind(&info.title)
74        .bind(serde_json::to_string(&info.command)?)
75        .bind(info.cwd.display().to_string())
76        .bind(
77            info.workspace_root
78                .as_ref()
79                .map(|path| path.display().to_string()),
80        )
81        .bind(status_to_str(info.status))
82        .bind(info.external_pid.map(i64::from))
83        .bind(
84            info.classification
85                .map(|classification| classification.label()),
86        )
87        .bind(info.started_at)
88        .bind(info.started_at)
89        .execute(&self.pool)
90        .await?;
91        Ok(())
92    }
93
94    /// Persist a log line for a session.
95    pub async fn insert_log_line(&self, session_id: &str, entry: &LogEntry) -> Result<()> {
96        sqlx::query(
97            r#"
98            INSERT INTO session_logs (session_id, sequence, timestamp, stream, text, raw, severity)
99            VALUES (?, ?, ?, ?, ?, ?, ?)
100            "#,
101        )
102        .bind(session_id)
103        .bind(i64::try_from(entry.sequence).unwrap_or(i64::MAX))
104        .bind(entry.timestamp)
105        .bind(stream_to_str(entry.stream))
106        .bind(&entry.text)
107        .bind(&entry.raw)
108        .bind(entry.severity.map(severity_to_str))
109        .execute(&self.pool)
110        .await?;
111        Ok(())
112    }
113
114    /// Persist a structured diagnostic.
115    pub async fn insert_diagnostic(
116        &self,
117        session_id: &str,
118        diagnostic: &DiagnosticRecord,
119    ) -> Result<()> {
120        sqlx::query(
121            r#"
122            INSERT INTO session_diagnostics (
123                session_id, diagnostic_id, timestamp, severity, message, rendered, code,
124                file, line, column_number, target, package_id
125            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
126            "#,
127        )
128        .bind(session_id)
129        .bind(&diagnostic.id)
130        .bind(diagnostic.timestamp)
131        .bind(severity_to_str(diagnostic.severity))
132        .bind(&diagnostic.message)
133        .bind(&diagnostic.rendered)
134        .bind(&diagnostic.code)
135        .bind(
136            diagnostic
137                .file
138                .as_ref()
139                .map(|path| path.display().to_string()),
140        )
141        .bind(diagnostic.line.map(i64::from))
142        .bind(diagnostic.column.map(i64::from))
143        .bind(&diagnostic.target)
144        .bind(&diagnostic.package_id)
145        .execute(&self.pool)
146        .await?;
147        Ok(())
148    }
149
150    /// Persist a built artifact in explicit sequence order.
151    pub async fn insert_artifact(&self, session_id: &str, artifact: &ArtifactRecord) -> Result<()> {
152        sqlx::query(
153            r#"
154            INSERT INTO session_artifacts (
155                session_id, sequence, timestamp, package_id, target,
156                filenames_json, executable, fresh
157            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
158            "#,
159        )
160        .bind(session_id)
161        .bind(i64::try_from(artifact.sequence).unwrap_or(i64::MAX))
162        .bind(artifact.timestamp)
163        .bind(&artifact.package_id)
164        .bind(&artifact.target)
165        .bind(serde_json::to_string(&artifact.filenames)?)
166        .bind(
167            artifact
168                .executable
169                .as_ref()
170                .map(|path| path.display().to_string()),
171        )
172        .bind(if artifact.fresh { 1_i64 } else { 0_i64 })
173        .execute(&self.pool)
174        .await?;
175        Ok(())
176    }
177
178    /// Persist session completion data.
179    pub async fn finish_session(&self, finish: &SessionFinished) -> Result<()> {
180        sqlx::query(
181            r#"
182            UPDATE sessions
183            SET status = ?, finished_at = ?, duration_ms = ?, exit_code = ?,
184                errors = ?, warnings = ?, notes = ?, help = ?, info = ?, updated_at = ?
185            WHERE session_id = ?
186            "#,
187        )
188        .bind(status_to_str(finish.status))
189        .bind(finish.finished_at)
190        .bind(finish.duration_ms)
191        .bind(finish.exit_code)
192        .bind(i64::from(finish.summary.errors))
193        .bind(i64::from(finish.summary.warnings))
194        .bind(i64::from(finish.summary.notes))
195        .bind(i64::from(finish.summary.help))
196        .bind(i64::from(finish.summary.info))
197        .bind(finish.finished_at)
198        .bind(&finish.session_id)
199        .execute(&self.pool)
200        .await?;
201        Ok(())
202    }
203
204    /// Upsert a passively detected external process snapshot.
205    pub async fn upsert_detected_process(&self, process: &DetectedProcess) -> Result<()> {
206        let info = SessionInfo {
207            session_id: process.session_id.clone(),
208            mode: SessionMode::Detected,
209            title: format!("{} ({})", process.classification.label(), process.pid),
210            command: process.command.clone(),
211            cwd: process.cwd.clone().unwrap_or_else(|| ".".into()),
212            workspace_root: process.workspace_root.clone(),
213            started_at: process.started_at,
214            status: SessionStatus::Running,
215            external_pid: Some(process.pid),
216            classification: Some(process.classification),
217        };
218        self.insert_session_start(&info).await?;
219        sqlx::query(
220            r#"
221            UPDATE sessions
222            SET duration_ms = ?, updated_at = ?
223            WHERE session_id = ?
224            "#,
225        )
226        .bind(process.elapsed_ms)
227        .bind(process.last_seen_at)
228        .bind(&process.session_id)
229        .execute(&self.pool)
230        .await?;
231        Ok(())
232    }
233
234    /// Mark a previously detected process as gone.
235    pub async fn mark_process_gone(
236        &self,
237        session_id: &str,
238        observed_at: time::OffsetDateTime,
239    ) -> Result<()> {
240        sqlx::query(
241            r#"
242            UPDATE sessions
243            SET status = ?, finished_at = ?, updated_at = ?
244            WHERE session_id = ?
245            "#,
246        )
247        .bind(status_to_str(SessionStatus::Lost))
248        .bind(observed_at)
249        .bind(observed_at)
250        .bind(session_id)
251        .execute(&self.pool)
252        .await?;
253        Ok(())
254    }
255
256    /// List the most recent sessions for the history view.
257    pub async fn recent_sessions(&self, limit: usize) -> Result<Vec<SessionHistoryEntry>> {
258        let rows = sqlx::query(
259            r#"
260            SELECT
261                session_id, mode, title, command_json, cwd, workspace_root, status,
262                external_pid, classification, started_at, finished_at, duration_ms, exit_code,
263                errors, warnings, notes, help, info
264            FROM sessions
265            ORDER BY started_at DESC
266            LIMIT ?
267            "#,
268        )
269        .bind(i64::try_from(limit).unwrap_or(i64::MAX))
270        .fetch_all(&self.pool)
271        .await?;
272
273        rows.into_iter().map(row_to_history).collect()
274    }
275
276    /// Load a full session including persisted logs and diagnostics.
277    pub async fn load_session(
278        &self,
279        session_id: &str,
280        max_logs: usize,
281    ) -> Result<Option<SessionState>> {
282        let row = sqlx::query(
283            r#"
284            SELECT
285                session_id, mode, title, command_json, cwd, workspace_root, status,
286                external_pid, classification, started_at, finished_at, duration_ms, exit_code,
287                errors, warnings, notes, help, info
288            FROM sessions
289            WHERE session_id = ?
290            "#,
291        )
292        .bind(session_id)
293        .fetch_optional(&self.pool)
294        .await?;
295        let Some(row) = row else {
296            return Ok(None);
297        };
298        let history = row_to_history(row)?;
299        let mut state = SessionState::new(history.info, max_logs);
300        state.finished_at = history.finished_at;
301        state.duration_ms = history.duration_ms;
302        state.exit_code = history.exit_code;
303        state.summary = history.summary;
304
305        let log_rows = sqlx::query(
306            r#"
307            SELECT sequence, timestamp, stream, text, raw, severity
308            FROM session_logs
309            WHERE session_id = ?
310            ORDER BY sequence DESC
311            LIMIT ?
312            "#,
313        )
314        .bind(session_id)
315        .bind(i64::try_from(max_logs).unwrap_or(i64::MAX))
316        .fetch_all(&self.pool)
317        .await?;
318        let mut logs = log_rows
319            .into_iter()
320            .map(|row| {
321                Ok(LogEntry {
322                    sequence: row.try_get::<i64, _>("sequence")?.max(0) as u64,
323                    timestamp: row.try_get("timestamp")?,
324                    stream: str_to_stream(row.try_get("stream")?),
325                    text: row.try_get("text")?,
326                    raw: row.try_get("raw")?,
327                    severity: row
328                        .try_get::<Option<String>, _>("severity")?
329                        .as_deref()
330                        .map(str_to_severity),
331                })
332            })
333            .collect::<Result<Vec<_>, sqlx::Error>>()?;
334        logs.reverse();
335        state.logs.extend(logs);
336
337        let diagnostics = sqlx::query(
338            r#"
339            SELECT diagnostic_id, timestamp, severity, message, rendered, code, file, line, column_number, target, package_id
340            FROM session_diagnostics
341            WHERE session_id = ?
342            ORDER BY timestamp ASC
343            "#,
344        )
345        .bind(session_id)
346        .fetch_all(&self.pool)
347        .await?;
348        state.diagnostics = diagnostics
349            .into_iter()
350            .map(|row| {
351                Ok(DiagnosticRecord {
352                    id: row.try_get("diagnostic_id")?,
353                    timestamp: row.try_get("timestamp")?,
354                    severity: str_to_severity(row.try_get::<String, _>("severity")?.as_str()),
355                    message: row.try_get("message")?,
356                    rendered: row.try_get("rendered")?,
357                    code: row.try_get("code")?,
358                    file: row
359                        .try_get::<Option<String>, _>("file")?
360                        .map(std::path::PathBuf::from),
361                    line: row
362                        .try_get::<Option<i64>, _>("line")?
363                        .map(|value| value as u32),
364                    column: row
365                        .try_get::<Option<i64>, _>("column_number")?
366                        .map(|value| value as u32),
367                    target: row.try_get("target")?,
368                    package_id: row.try_get("package_id")?,
369                })
370            })
371            .collect::<Result<Vec<_>, sqlx::Error>>()?;
372
373        let artifacts = sqlx::query(
374            r#"
375            SELECT sequence, timestamp, package_id, target, filenames_json, executable, fresh
376            FROM session_artifacts
377            WHERE session_id = ?
378            ORDER BY sequence ASC, id ASC
379            "#,
380        )
381        .bind(session_id)
382        .fetch_all(&self.pool)
383        .await?;
384        state.artifacts = artifacts
385            .into_iter()
386            .map(|row| {
387                Ok(ArtifactRecord {
388                    sequence: row.try_get::<i64, _>("sequence")?.max(0) as u64,
389                    timestamp: row.try_get("timestamp")?,
390                    package_id: row.try_get("package_id")?,
391                    target: row.try_get("target")?,
392                    filenames: serde_json::from_str(&row.try_get::<String, _>("filenames_json")?)?,
393                    executable: row
394                        .try_get::<Option<String>, _>("executable")?
395                        .map(Into::into),
396                    fresh: row.try_get::<i64, _>("fresh")? != 0,
397                })
398            })
399            .collect::<Result<Vec<_>>>()?;
400
401        Ok(Some(state))
402    }
403
404    /// Delete sessions older than the configured retention window.
405    pub async fn cleanup_old_sessions(&self, retention_days: u32) -> Result<u64> {
406        let cutoff =
407            time::OffsetDateTime::now_utc() - time::Duration::days(i64::from(retention_days));
408        let result = sqlx::query("DELETE FROM sessions WHERE started_at < ?")
409            .bind(cutoff)
410            .execute(&self.pool)
411            .await?;
412        Ok(result.rows_affected())
413    }
414}
415
416fn row_to_history(row: sqlx::sqlite::SqliteRow) -> Result<SessionHistoryEntry> {
417    let mode = str_to_mode(row.try_get::<String, _>("mode")?.as_str());
418    let status = str_to_status(row.try_get::<String, _>("status")?.as_str());
419    let info = SessionInfo {
420        session_id: row.try_get("session_id")?,
421        mode,
422        title: row.try_get("title")?,
423        command: serde_json::from_str(&row.try_get::<String, _>("command_json")?)?,
424        cwd: row.try_get::<String, _>("cwd")?.into(),
425        workspace_root: row
426            .try_get::<Option<String>, _>("workspace_root")?
427            .map(Into::into),
428        started_at: row.try_get("started_at")?,
429        status,
430        external_pid: row
431            .try_get::<Option<i64>, _>("external_pid")?
432            .map(|value| value as u32),
433        classification: None,
434    };
435    Ok(SessionHistoryEntry {
436        info,
437        finished_at: row.try_get("finished_at")?,
438        exit_code: row.try_get("exit_code")?,
439        duration_ms: row.try_get("duration_ms")?,
440        summary: SummaryCounts {
441            errors: row.try_get::<i64, _>("errors")?.max(0) as u32,
442            warnings: row.try_get::<i64, _>("warnings")?.max(0) as u32,
443            notes: row.try_get::<i64, _>("notes")?.max(0) as u32,
444            help: row.try_get::<i64, _>("help")?.max(0) as u32,
445            info: row.try_get::<i64, _>("info")?.max(0) as u32,
446        },
447    })
448}
449
450fn mode_to_str(mode: SessionMode) -> &'static str {
451    match mode {
452        SessionMode::Managed => "managed",
453        SessionMode::Detected => "detected",
454    }
455}
456
457fn str_to_mode(value: &str) -> SessionMode {
458    match value {
459        "detected" => SessionMode::Detected,
460        _ => SessionMode::Managed,
461    }
462}
463
464fn status_to_str(status: SessionStatus) -> &'static str {
465    match status {
466        SessionStatus::Running => "running",
467        SessionStatus::Succeeded => "succeeded",
468        SessionStatus::Failed => "failed",
469        SessionStatus::Cancelled => "cancelled",
470        SessionStatus::Lost => "lost",
471    }
472}
473
474fn str_to_status(value: &str) -> SessionStatus {
475    match value {
476        "succeeded" => SessionStatus::Succeeded,
477        "failed" => SessionStatus::Failed,
478        "cancelled" => SessionStatus::Cancelled,
479        "lost" => SessionStatus::Lost,
480        _ => SessionStatus::Running,
481    }
482}
483
484fn stream_to_str(stream: cargowatch_core::OutputStream) -> &'static str {
485    match stream {
486        cargowatch_core::OutputStream::Stdout => "stdout",
487        cargowatch_core::OutputStream::Stderr => "stderr",
488        cargowatch_core::OutputStream::System => "system",
489    }
490}
491
492fn str_to_stream(value: &str) -> cargowatch_core::OutputStream {
493    match value {
494        "stderr" => cargowatch_core::OutputStream::Stderr,
495        "system" => cargowatch_core::OutputStream::System,
496        _ => cargowatch_core::OutputStream::Stdout,
497    }
498}
499
500fn severity_to_str(severity: cargowatch_core::event::Severity) -> &'static str {
501    match severity {
502        cargowatch_core::event::Severity::Error => "error",
503        cargowatch_core::event::Severity::Warning => "warning",
504        cargowatch_core::event::Severity::Note => "note",
505        cargowatch_core::event::Severity::Help => "help",
506        cargowatch_core::event::Severity::Info => "info",
507        cargowatch_core::event::Severity::Success => "success",
508    }
509}
510
511fn str_to_severity(value: &str) -> cargowatch_core::event::Severity {
512    match value {
513        "error" => cargowatch_core::event::Severity::Error,
514        "warning" => cargowatch_core::event::Severity::Warning,
515        "note" => cargowatch_core::event::Severity::Note,
516        "help" => cargowatch_core::event::Severity::Help,
517        "success" => cargowatch_core::event::Severity::Success,
518        _ => cargowatch_core::event::Severity::Info,
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use tempfile::tempdir;
525
526    use super::*;
527    use cargowatch_core::{SessionMode, SessionStatus};
528
529    #[tokio::test]
530    async fn store_round_trips_history_and_session_details() {
531        let temp = tempdir().expect("tempdir");
532        let store = SessionStore::connect(&temp.path().join("cargowatch.db"))
533            .await
534            .expect("store");
535        let info = SessionInfo {
536            session_id: "session-1".to_string(),
537            mode: SessionMode::Managed,
538            title: "cargo check".to_string(),
539            command: vec!["cargo".into(), "check".into()],
540            cwd: temp.path().to_path_buf(),
541            workspace_root: Some(temp.path().to_path_buf()),
542            started_at: time::OffsetDateTime::now_utc(),
543            status: SessionStatus::Running,
544            external_pid: Some(42),
545            classification: None,
546        };
547
548        store.insert_session_start(&info).await.expect("start");
549        store
550            .insert_log_line(
551                &info.session_id,
552                &LogEntry {
553                    sequence: 1,
554                    timestamp: info.started_at,
555                    stream: cargowatch_core::OutputStream::Stdout,
556                    text: "checking demo".to_string(),
557                    raw: None,
558                    severity: Some(cargowatch_core::event::Severity::Info),
559                },
560            )
561            .await
562            .expect("log");
563        store
564            .insert_diagnostic(
565                &info.session_id,
566                &DiagnosticRecord {
567                    id: "diag-1".to_string(),
568                    timestamp: info.started_at,
569                    severity: cargowatch_core::event::Severity::Warning,
570                    message: "unused variable".to_string(),
571                    rendered: Some("warning: unused variable".to_string()),
572                    code: Some("unused_variables".to_string()),
573                    file: None,
574                    line: None,
575                    column: None,
576                    target: Some("demo".to_string()),
577                    package_id: Some("demo 0.1.0".to_string()),
578                },
579            )
580            .await
581            .expect("diagnostic");
582        store
583            .insert_artifact(
584                &info.session_id,
585                &ArtifactRecord {
586                    sequence: 2,
587                    timestamp: info.started_at + time::Duration::milliseconds(250),
588                    package_id: Some("demo 0.1.0".to_string()),
589                    target: Some("demo".to_string()),
590                    filenames: vec![temp.path().join("target/debug/demo")],
591                    executable: Some(temp.path().join("target/debug/demo")),
592                    fresh: false,
593                },
594            )
595            .await
596            .expect("artifact");
597        store
598            .insert_artifact(
599                &info.session_id,
600                &ArtifactRecord {
601                    sequence: 3,
602                    timestamp: info.started_at + time::Duration::milliseconds(500),
603                    package_id: Some("demo 0.1.0".to_string()),
604                    target: Some("demo-tests".to_string()),
605                    filenames: vec![temp.path().join("target/debug/deps/demo_tests")],
606                    executable: None,
607                    fresh: true,
608                },
609            )
610            .await
611            .expect("artifact");
612        store
613            .finish_session(&SessionFinished {
614                session_id: info.session_id.clone(),
615                finished_at: info.started_at + time::Duration::seconds(1),
616                status: SessionStatus::Succeeded,
617                exit_code: Some(0),
618                duration_ms: 1_000,
619                summary: SummaryCounts {
620                    errors: 0,
621                    warnings: 1,
622                    notes: 0,
623                    help: 0,
624                    info: 1,
625                },
626            })
627            .await
628            .expect("finish");
629
630        let history = store.recent_sessions(10).await.expect("history");
631        assert_eq!(history.len(), 1);
632        assert_eq!(history[0].summary.warnings, 1);
633
634        let session = store
635            .load_session(&info.session_id, 100)
636            .await
637            .expect("load")
638            .expect("session");
639        assert_eq!(session.logs.len(), 1);
640        assert_eq!(session.diagnostics.len(), 1);
641        assert_eq!(session.artifacts.len(), 2);
642        assert_eq!(session.artifacts[0].sequence, 2);
643        assert_eq!(session.artifacts[1].sequence, 3);
644        assert_eq!(session.artifacts[0].target.as_deref(), Some("demo"));
645    }
646}