Skip to main content

track_core/
database.rs

1use std::fs;
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5
6use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};
7use sqlx::{Connection, Row, SqliteConnection};
8
9use crate::errors::{ErrorCode, TrackError};
10use crate::paths::{get_backend_database_path, path_to_string};
11
12type BoxDbFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, TrackError>> + Send + 'a>>;
13
14const SCHEMA_STATEMENTS: &[&str] = &[
15    r#"
16    CREATE TABLE IF NOT EXISTS projects (
17        canonical_name TEXT PRIMARY KEY,
18        repo_url TEXT NOT NULL DEFAULT '',
19        git_url TEXT NOT NULL DEFAULT '',
20        base_branch TEXT NOT NULL DEFAULT 'main',
21        description TEXT
22    )
23    "#,
24    r#"
25    CREATE TABLE IF NOT EXISTS project_aliases (
26        canonical_name TEXT NOT NULL,
27        alias TEXT NOT NULL,
28        PRIMARY KEY (canonical_name, alias),
29        UNIQUE (alias),
30        FOREIGN KEY (canonical_name) REFERENCES projects(canonical_name) ON DELETE CASCADE
31    )
32    "#,
33    r#"
34    CREATE TABLE IF NOT EXISTS tasks (
35        id TEXT PRIMARY KEY,
36        project TEXT NOT NULL,
37        priority TEXT NOT NULL,
38        status TEXT NOT NULL,
39        description TEXT NOT NULL,
40        created_at TEXT NOT NULL,
41        updated_at TEXT NOT NULL,
42        source TEXT,
43        FOREIGN KEY (project) REFERENCES projects(canonical_name) ON DELETE RESTRICT
44    )
45    "#,
46    r#"
47    CREATE TABLE IF NOT EXISTS task_dispatches (
48        dispatch_id TEXT PRIMARY KEY,
49        task_id TEXT NOT NULL,
50        project TEXT NOT NULL,
51        status TEXT NOT NULL,
52        created_at TEXT NOT NULL,
53        updated_at TEXT NOT NULL,
54        finished_at TEXT,
55        remote_host TEXT NOT NULL,
56        branch_name TEXT,
57        worktree_path TEXT,
58        pull_request_url TEXT,
59        preferred_tool TEXT NOT NULL DEFAULT 'codex',
60        follow_up_request TEXT,
61        summary TEXT,
62        notes TEXT,
63        error_message TEXT,
64        review_request_head_oid TEXT,
65        review_request_user TEXT,
66        FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
67    )
68    "#,
69    r#"
70    CREATE INDEX IF NOT EXISTS idx_task_dispatches_task_id_created_at
71    ON task_dispatches(task_id, created_at DESC)
72    "#,
73    r#"
74    CREATE TABLE IF NOT EXISTS reviews (
75        id TEXT PRIMARY KEY,
76        pull_request_url TEXT NOT NULL,
77        pull_request_number INTEGER NOT NULL,
78        pull_request_title TEXT NOT NULL,
79        repository_full_name TEXT NOT NULL,
80        repo_url TEXT NOT NULL,
81        git_url TEXT NOT NULL,
82        base_branch TEXT NOT NULL,
83        workspace_key TEXT NOT NULL,
84        preferred_tool TEXT NOT NULL DEFAULT 'codex',
85        project TEXT,
86        main_user TEXT NOT NULL,
87        default_review_prompt TEXT,
88        extra_instructions TEXT,
89        created_at TEXT NOT NULL,
90        updated_at TEXT NOT NULL
91    )
92    "#,
93    r#"
94    CREATE TABLE IF NOT EXISTS review_runs (
95        dispatch_id TEXT PRIMARY KEY,
96        review_id TEXT NOT NULL,
97        pull_request_url TEXT NOT NULL,
98        repository_full_name TEXT NOT NULL,
99        workspace_key TEXT NOT NULL,
100        preferred_tool TEXT NOT NULL DEFAULT 'codex',
101        status TEXT NOT NULL,
102        created_at TEXT NOT NULL,
103        updated_at TEXT NOT NULL,
104        finished_at TEXT,
105        remote_host TEXT NOT NULL,
106        branch_name TEXT,
107        worktree_path TEXT,
108        follow_up_request TEXT,
109        target_head_oid TEXT,
110        summary TEXT,
111        review_submitted INTEGER NOT NULL DEFAULT 0,
112        github_review_id TEXT,
113        github_review_url TEXT,
114        notes TEXT,
115        error_message TEXT,
116        FOREIGN KEY (review_id) REFERENCES reviews(id) ON DELETE CASCADE
117    )
118    "#,
119    r#"
120    CREATE INDEX IF NOT EXISTS idx_review_runs_review_id_created_at
121    ON review_runs(review_id, created_at DESC)
122    "#,
123    r#"
124    CREATE TABLE IF NOT EXISTS backend_settings (
125        setting_key TEXT PRIMARY KEY,
126        setting_json TEXT NOT NULL
127    )
128    "#,
129];
130
131const ADDITIVE_SCHEMA_UPDATES: &[(&str, &str, &str)] = &[
132    (
133        "task_dispatches",
134        "preferred_tool",
135        "TEXT NOT NULL DEFAULT 'codex'",
136    ),
137    ("reviews", "preferred_tool", "TEXT NOT NULL DEFAULT 'codex'"),
138    (
139        "review_runs",
140        "preferred_tool",
141        "TEXT NOT NULL DEFAULT 'codex'",
142    ),
143];
144
145#[derive(Debug, Clone)]
146pub struct DatabaseContext {
147    database_path: PathBuf,
148}
149
150impl DatabaseContext {
151    pub fn new(database_path: Option<PathBuf>) -> Result<Self, TrackError> {
152        let database_path = match database_path {
153            Some(database_path)
154                if database_path.extension().and_then(|value| value.to_str()) == Some("sqlite") =>
155            {
156                database_path
157            }
158            Some(database_path) => database_path.join(crate::paths::DATABASE_FILE_NAME),
159            None => get_backend_database_path()?,
160        };
161
162        Ok(Self { database_path })
163    }
164
165    pub fn database_path(&self) -> &Path {
166        &self.database_path
167    }
168
169    pub fn initialize(&self) -> Result<(), TrackError> {
170        self.run(|connection| {
171            Box::pin(async move {
172                for statement in SCHEMA_STATEMENTS {
173                    sqlx::query(statement)
174                        .execute(&mut *connection)
175                        .await
176                        .map_err(|error| {
177                            TrackError::new(
178                                ErrorCode::TaskWriteFailed,
179                                format!("Could not initialize the SQLite schema: {error}"),
180                            )
181                        })?;
182                }
183
184                apply_additive_schema_updates(connection).await?;
185
186                Ok(())
187            })
188        })
189    }
190
191    pub fn run<T>(
192        &self,
193        operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T> + Send + 'static,
194    ) -> Result<T, TrackError>
195    where
196        T: Send + 'static,
197    {
198        let connect_options = self.connect_options()?;
199        let database_path = self.database_path.clone();
200
201        if tokio::runtime::Handle::try_current().is_ok() {
202            return std::thread::spawn(move || {
203                run_database_operation(connect_options, database_path, operation)
204            })
205            .join()
206            .map_err(|_| {
207                TrackError::new(
208                    ErrorCode::TaskWriteFailed,
209                    "The SQLite worker thread panicked.",
210                )
211            })?;
212        }
213
214        run_database_operation(connect_options, database_path, operation)
215    }
216
217    pub fn transaction<T>(
218        &self,
219        operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T> + Send + 'static,
220    ) -> Result<T, TrackError>
221    where
222        T: Send + 'static,
223    {
224        self.run(move |connection| {
225            Box::pin(async move {
226                begin_transaction(connection).await?;
227
228                match operation(connection).await {
229                    Ok(value) => {
230                        commit_transaction(connection).await?;
231                        Ok(value)
232                    }
233                    Err(error) => {
234                        rollback_transaction(connection)
235                            .await
236                            .map_err(|rollback_error| {
237                                TrackError::new(
238                                    error.code,
239                                    format!(
240                                        "{} The SQLite rollback also failed: {}",
241                                        error, rollback_error
242                                    ),
243                                )
244                            })?;
245                        Err(error)
246                    }
247                }
248            })
249        })
250    }
251
252    fn connect_options(&self) -> Result<SqliteConnectOptions, TrackError> {
253        if let Some(parent) = self.database_path.parent() {
254            fs::create_dir_all(parent).map_err(|error| {
255                TrackError::new(
256                    ErrorCode::TaskWriteFailed,
257                    format!(
258                        "Could not create the backend state directory at {}: {error}",
259                        path_to_string(parent)
260                    ),
261                )
262            })?;
263        }
264
265        Ok(SqliteConnectOptions::new()
266            .filename(&self.database_path)
267            .create_if_missing(true)
268            .foreign_keys(true)
269            .journal_mode(SqliteJournalMode::Wal)
270            .synchronous(SqliteSynchronous::Normal))
271    }
272}
273
274// =============================================================================
275// SQLite Additive Migrations
276// =============================================================================
277//
278// The backend still keeps schema setup intentionally lightweight, but new
279// releases now need to add columns to already-existing local databases. We only
280// support additive updates here because they are safe to apply opportunistically
281// during startup without introducing a full migration framework yet.
282async fn apply_additive_schema_updates(
283    connection: &mut SqliteConnection,
284) -> Result<(), TrackError> {
285    for (table_name, column_name, column_definition) in ADDITIVE_SCHEMA_UPDATES {
286        if sqlite_column_exists(connection, table_name, column_name).await? {
287            continue;
288        }
289
290        let alter_statement =
291            format!("ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition}");
292        if let Err(error) = sqlx::query(&alter_statement)
293            .execute(&mut *connection)
294            .await
295        {
296            // Two track processes can start against the same SQLite file at the
297            // same time. If both observe the old schema, one `ADD COLUMN`
298            // finishes first and the other sees SQLite's duplicate-column
299            // error. That outcome still means the schema is now correct, so we
300            // treat it as a successful concurrent upgrade instead of aborting
301            // startup.
302            if sqlite_duplicate_column_error(&error, column_name) {
303                continue;
304            }
305
306            return Err(TrackError::new(
307                ErrorCode::TaskWriteFailed,
308                format!("Could not add the SQLite column {table_name}.{column_name}: {error}"),
309            ));
310        }
311    }
312
313    Ok(())
314}
315
316async fn sqlite_column_exists(
317    connection: &mut SqliteConnection,
318    table_name: &str,
319    column_name: &str,
320) -> Result<bool, TrackError> {
321    let rows = sqlx::query(&format!("PRAGMA table_info({table_name})"))
322        .fetch_all(&mut *connection)
323        .await
324        .map_err(|error| {
325            TrackError::new(
326                ErrorCode::TaskWriteFailed,
327                format!("Could not inspect the SQLite schema for table {table_name}: {error}"),
328            )
329        })?;
330
331    Ok(rows
332        .into_iter()
333        .any(|row| row.get::<String, _>("name") == column_name))
334}
335
336fn sqlite_duplicate_column_error(error: &sqlx::Error, column_name: &str) -> bool {
337    let message = error.to_string().to_ascii_lowercase();
338    let column_name = column_name.to_ascii_lowercase();
339
340    message.contains("duplicate column name") && message.contains(&column_name)
341}
342
343async fn begin_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
344    // Migration imports need all-or-nothing behavior. A plain BEGIN keeps the
345    // implementation simple while still ensuring the new SQLite state either
346    // fully replaces the legacy files or stays empty enough to retry safely.
347    sqlx::query("BEGIN")
348        .execute(&mut *connection)
349        .await
350        .map_err(|error| {
351            TrackError::new(
352                ErrorCode::TaskWriteFailed,
353                format!("Could not begin the SQLite transaction: {error}"),
354            )
355        })?;
356
357    Ok(())
358}
359
360async fn commit_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
361    sqlx::query("COMMIT")
362        .execute(&mut *connection)
363        .await
364        .map_err(|error| {
365            TrackError::new(
366                ErrorCode::TaskWriteFailed,
367                format!("Could not commit the SQLite transaction: {error}"),
368            )
369        })?;
370
371    Ok(())
372}
373
374async fn rollback_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
375    sqlx::query("ROLLBACK")
376        .execute(&mut *connection)
377        .await
378        .map_err(|error| {
379            TrackError::new(
380                ErrorCode::TaskWriteFailed,
381                format!("Could not roll back the SQLite transaction: {error}"),
382            )
383        })?;
384
385    Ok(())
386}
387
388fn run_database_operation<T>(
389    connect_options: SqliteConnectOptions,
390    database_path: PathBuf,
391    operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T>,
392) -> Result<T, TrackError> {
393    let runtime = tokio::runtime::Builder::new_current_thread()
394        .enable_all()
395        .build()
396        .map_err(|error| {
397            TrackError::new(
398                ErrorCode::TaskWriteFailed,
399                format!("Could not start the SQLite runtime: {error}"),
400            )
401        })?;
402
403    runtime.block_on(async move {
404        let mut connection = SqliteConnection::connect_with(&connect_options)
405            .await
406            .map_err(|error| {
407                TrackError::new(
408                    ErrorCode::TaskWriteFailed,
409                    format!(
410                        "Could not open the SQLite database at {}: {error}",
411                        path_to_string(&database_path)
412                    ),
413                )
414            })?;
415
416        operation(&mut connection).await
417    })
418}