1use std::fs;
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5
6use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};
7use sqlx::{Connection, Row, SqliteConnection};
8
9use crate::errors::{ErrorCode, TrackError};
10use crate::paths::{get_backend_database_path, path_to_string};
11
12type BoxDbFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, TrackError>> + Send + 'a>>;
13
14const SCHEMA_STATEMENTS: &[&str] = &[
15 r#"
16 CREATE TABLE IF NOT EXISTS projects (
17 canonical_name TEXT PRIMARY KEY,
18 repo_url TEXT NOT NULL DEFAULT '',
19 git_url TEXT NOT NULL DEFAULT '',
20 base_branch TEXT NOT NULL DEFAULT 'main',
21 description TEXT
22 )
23 "#,
24 r#"
25 CREATE TABLE IF NOT EXISTS project_aliases (
26 canonical_name TEXT NOT NULL,
27 alias TEXT NOT NULL,
28 PRIMARY KEY (canonical_name, alias),
29 UNIQUE (alias),
30 FOREIGN KEY (canonical_name) REFERENCES projects(canonical_name) ON DELETE CASCADE
31 )
32 "#,
33 r#"
34 CREATE TABLE IF NOT EXISTS tasks (
35 id TEXT PRIMARY KEY,
36 project TEXT NOT NULL,
37 priority TEXT NOT NULL,
38 status TEXT NOT NULL,
39 description TEXT NOT NULL,
40 created_at TEXT NOT NULL,
41 updated_at TEXT NOT NULL,
42 source TEXT,
43 FOREIGN KEY (project) REFERENCES projects(canonical_name) ON DELETE RESTRICT
44 )
45 "#,
46 r#"
47 CREATE TABLE IF NOT EXISTS task_dispatches (
48 dispatch_id TEXT PRIMARY KEY,
49 task_id TEXT NOT NULL,
50 project TEXT NOT NULL,
51 status TEXT NOT NULL,
52 created_at TEXT NOT NULL,
53 updated_at TEXT NOT NULL,
54 finished_at TEXT,
55 remote_host TEXT NOT NULL,
56 branch_name TEXT,
57 worktree_path TEXT,
58 pull_request_url TEXT,
59 preferred_tool TEXT NOT NULL DEFAULT 'codex',
60 follow_up_request TEXT,
61 summary TEXT,
62 notes TEXT,
63 error_message TEXT,
64 review_request_head_oid TEXT,
65 review_request_user TEXT,
66 FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
67 )
68 "#,
69 r#"
70 CREATE INDEX IF NOT EXISTS idx_task_dispatches_task_id_created_at
71 ON task_dispatches(task_id, created_at DESC)
72 "#,
73 r#"
74 CREATE TABLE IF NOT EXISTS reviews (
75 id TEXT PRIMARY KEY,
76 pull_request_url TEXT NOT NULL,
77 pull_request_number INTEGER NOT NULL,
78 pull_request_title TEXT NOT NULL,
79 repository_full_name TEXT NOT NULL,
80 repo_url TEXT NOT NULL,
81 git_url TEXT NOT NULL,
82 base_branch TEXT NOT NULL,
83 workspace_key TEXT NOT NULL,
84 preferred_tool TEXT NOT NULL DEFAULT 'codex',
85 project TEXT,
86 main_user TEXT NOT NULL,
87 default_review_prompt TEXT,
88 extra_instructions TEXT,
89 created_at TEXT NOT NULL,
90 updated_at TEXT NOT NULL
91 )
92 "#,
93 r#"
94 CREATE TABLE IF NOT EXISTS review_runs (
95 dispatch_id TEXT PRIMARY KEY,
96 review_id TEXT NOT NULL,
97 pull_request_url TEXT NOT NULL,
98 repository_full_name TEXT NOT NULL,
99 workspace_key TEXT NOT NULL,
100 preferred_tool TEXT NOT NULL DEFAULT 'codex',
101 status TEXT NOT NULL,
102 created_at TEXT NOT NULL,
103 updated_at TEXT NOT NULL,
104 finished_at TEXT,
105 remote_host TEXT NOT NULL,
106 branch_name TEXT,
107 worktree_path TEXT,
108 follow_up_request TEXT,
109 target_head_oid TEXT,
110 summary TEXT,
111 review_submitted INTEGER NOT NULL DEFAULT 0,
112 github_review_id TEXT,
113 github_review_url TEXT,
114 notes TEXT,
115 error_message TEXT,
116 FOREIGN KEY (review_id) REFERENCES reviews(id) ON DELETE CASCADE
117 )
118 "#,
119 r#"
120 CREATE INDEX IF NOT EXISTS idx_review_runs_review_id_created_at
121 ON review_runs(review_id, created_at DESC)
122 "#,
123 r#"
124 CREATE TABLE IF NOT EXISTS backend_settings (
125 setting_key TEXT PRIMARY KEY,
126 setting_json TEXT NOT NULL
127 )
128 "#,
129];
130
131const ADDITIVE_SCHEMA_UPDATES: &[(&str, &str, &str)] = &[
132 (
133 "task_dispatches",
134 "preferred_tool",
135 "TEXT NOT NULL DEFAULT 'codex'",
136 ),
137 ("reviews", "preferred_tool", "TEXT NOT NULL DEFAULT 'codex'"),
138 (
139 "review_runs",
140 "preferred_tool",
141 "TEXT NOT NULL DEFAULT 'codex'",
142 ),
143];
144
145#[derive(Debug, Clone)]
146pub struct DatabaseContext {
147 database_path: PathBuf,
148}
149
150impl DatabaseContext {
151 pub fn new(database_path: Option<PathBuf>) -> Result<Self, TrackError> {
152 let database_path = match database_path {
153 Some(database_path)
154 if database_path.extension().and_then(|value| value.to_str()) == Some("sqlite") =>
155 {
156 database_path
157 }
158 Some(database_path) => database_path.join(crate::paths::DATABASE_FILE_NAME),
159 None => get_backend_database_path()?,
160 };
161
162 Ok(Self { database_path })
163 }
164
165 pub fn database_path(&self) -> &Path {
166 &self.database_path
167 }
168
169 pub fn initialize(&self) -> Result<(), TrackError> {
170 self.run(|connection| {
171 Box::pin(async move {
172 for statement in SCHEMA_STATEMENTS {
173 sqlx::query(statement)
174 .execute(&mut *connection)
175 .await
176 .map_err(|error| {
177 TrackError::new(
178 ErrorCode::TaskWriteFailed,
179 format!("Could not initialize the SQLite schema: {error}"),
180 )
181 })?;
182 }
183
184 apply_additive_schema_updates(connection).await?;
185
186 Ok(())
187 })
188 })
189 }
190
191 pub fn run<T>(
192 &self,
193 operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T> + Send + 'static,
194 ) -> Result<T, TrackError>
195 where
196 T: Send + 'static,
197 {
198 let connect_options = self.connect_options()?;
199 let database_path = self.database_path.clone();
200
201 if tokio::runtime::Handle::try_current().is_ok() {
202 return std::thread::spawn(move || {
203 run_database_operation(connect_options, database_path, operation)
204 })
205 .join()
206 .map_err(|_| {
207 TrackError::new(
208 ErrorCode::TaskWriteFailed,
209 "The SQLite worker thread panicked.",
210 )
211 })?;
212 }
213
214 run_database_operation(connect_options, database_path, operation)
215 }
216
217 pub fn transaction<T>(
218 &self,
219 operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T> + Send + 'static,
220 ) -> Result<T, TrackError>
221 where
222 T: Send + 'static,
223 {
224 self.run(move |connection| {
225 Box::pin(async move {
226 begin_transaction(connection).await?;
227
228 match operation(connection).await {
229 Ok(value) => {
230 commit_transaction(connection).await?;
231 Ok(value)
232 }
233 Err(error) => {
234 rollback_transaction(connection)
235 .await
236 .map_err(|rollback_error| {
237 TrackError::new(
238 error.code,
239 format!(
240 "{} The SQLite rollback also failed: {}",
241 error, rollback_error
242 ),
243 )
244 })?;
245 Err(error)
246 }
247 }
248 })
249 })
250 }
251
252 fn connect_options(&self) -> Result<SqliteConnectOptions, TrackError> {
253 if let Some(parent) = self.database_path.parent() {
254 fs::create_dir_all(parent).map_err(|error| {
255 TrackError::new(
256 ErrorCode::TaskWriteFailed,
257 format!(
258 "Could not create the backend state directory at {}: {error}",
259 path_to_string(parent)
260 ),
261 )
262 })?;
263 }
264
265 Ok(SqliteConnectOptions::new()
266 .filename(&self.database_path)
267 .create_if_missing(true)
268 .foreign_keys(true)
269 .journal_mode(SqliteJournalMode::Wal)
270 .synchronous(SqliteSynchronous::Normal))
271 }
272}
273
274async fn apply_additive_schema_updates(
283 connection: &mut SqliteConnection,
284) -> Result<(), TrackError> {
285 for (table_name, column_name, column_definition) in ADDITIVE_SCHEMA_UPDATES {
286 if sqlite_column_exists(connection, table_name, column_name).await? {
287 continue;
288 }
289
290 let alter_statement =
291 format!("ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition}");
292 if let Err(error) = sqlx::query(&alter_statement)
293 .execute(&mut *connection)
294 .await
295 {
296 if sqlite_duplicate_column_error(&error, column_name) {
303 continue;
304 }
305
306 return Err(TrackError::new(
307 ErrorCode::TaskWriteFailed,
308 format!("Could not add the SQLite column {table_name}.{column_name}: {error}"),
309 ));
310 }
311 }
312
313 Ok(())
314}
315
316async fn sqlite_column_exists(
317 connection: &mut SqliteConnection,
318 table_name: &str,
319 column_name: &str,
320) -> Result<bool, TrackError> {
321 let rows = sqlx::query(&format!("PRAGMA table_info({table_name})"))
322 .fetch_all(&mut *connection)
323 .await
324 .map_err(|error| {
325 TrackError::new(
326 ErrorCode::TaskWriteFailed,
327 format!("Could not inspect the SQLite schema for table {table_name}: {error}"),
328 )
329 })?;
330
331 Ok(rows
332 .into_iter()
333 .any(|row| row.get::<String, _>("name") == column_name))
334}
335
336fn sqlite_duplicate_column_error(error: &sqlx::Error, column_name: &str) -> bool {
337 let message = error.to_string().to_ascii_lowercase();
338 let column_name = column_name.to_ascii_lowercase();
339
340 message.contains("duplicate column name") && message.contains(&column_name)
341}
342
343async fn begin_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
344 sqlx::query("BEGIN")
348 .execute(&mut *connection)
349 .await
350 .map_err(|error| {
351 TrackError::new(
352 ErrorCode::TaskWriteFailed,
353 format!("Could not begin the SQLite transaction: {error}"),
354 )
355 })?;
356
357 Ok(())
358}
359
360async fn commit_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
361 sqlx::query("COMMIT")
362 .execute(&mut *connection)
363 .await
364 .map_err(|error| {
365 TrackError::new(
366 ErrorCode::TaskWriteFailed,
367 format!("Could not commit the SQLite transaction: {error}"),
368 )
369 })?;
370
371 Ok(())
372}
373
374async fn rollback_transaction(connection: &mut SqliteConnection) -> Result<(), TrackError> {
375 sqlx::query("ROLLBACK")
376 .execute(&mut *connection)
377 .await
378 .map_err(|error| {
379 TrackError::new(
380 ErrorCode::TaskWriteFailed,
381 format!("Could not roll back the SQLite transaction: {error}"),
382 )
383 })?;
384
385 Ok(())
386}
387
388fn run_database_operation<T>(
389 connect_options: SqliteConnectOptions,
390 database_path: PathBuf,
391 operation: impl for<'a> FnOnce(&'a mut SqliteConnection) -> BoxDbFuture<'a, T>,
392) -> Result<T, TrackError> {
393 let runtime = tokio::runtime::Builder::new_current_thread()
394 .enable_all()
395 .build()
396 .map_err(|error| {
397 TrackError::new(
398 ErrorCode::TaskWriteFailed,
399 format!("Could not start the SQLite runtime: {error}"),
400 )
401 })?;
402
403 runtime.block_on(async move {
404 let mut connection = SqliteConnection::connect_with(&connect_options)
405 .await
406 .map_err(|error| {
407 TrackError::new(
408 ErrorCode::TaskWriteFailed,
409 format!(
410 "Could not open the SQLite database at {}: {error}",
411 path_to_string(&database_path)
412 ),
413 )
414 })?;
415
416 operation(&mut connection).await
417 })
418}