1use std::path::PathBuf;
2
3use sqlx::Row;
4
5use crate::database::DatabaseContext;
6use crate::errors::{ErrorCode, TrackError};
7use crate::path_component::validate_single_normal_path_component;
8use crate::time_utils::{format_iso_8601_millis, now_utc, parse_iso_8601_millis};
9use crate::types::{DispatchStatus, RemoteAgentPreferredTool, ReviewRecord, ReviewRunRecord};
10
11#[derive(Debug, Clone)]
12pub struct ReviewDispatchRepository {
13 database: DatabaseContext,
14}
15
16impl ReviewDispatchRepository {
17 pub fn new(database_path: Option<PathBuf>) -> Result<Self, TrackError> {
18 let database = DatabaseContext::new(database_path)?;
19 database.initialize()?;
20
21 Ok(Self { database })
22 }
23
24 pub fn create_dispatch(
25 &self,
26 review: &ReviewRecord,
27 remote_host: &str,
28 preferred_tool: RemoteAgentPreferredTool,
29 ) -> Result<ReviewRunRecord, TrackError> {
30 let timestamp = now_utc();
31 let record = ReviewRunRecord {
32 dispatch_id: format!("dispatch-{}", timestamp.unix_timestamp_nanos()),
33 review_id: review.id.clone(),
34 pull_request_url: review.pull_request_url.clone(),
35 repository_full_name: review.repository_full_name.clone(),
36 workspace_key: review.workspace_key.clone(),
37 preferred_tool,
38 status: DispatchStatus::Preparing,
39 created_at: timestamp,
40 updated_at: timestamp,
41 finished_at: None,
42 remote_host: remote_host.to_owned(),
43 branch_name: None,
44 worktree_path: None,
45 follow_up_request: None,
46 target_head_oid: None,
47 summary: None,
48 review_submitted: false,
49 github_review_id: None,
50 github_review_url: None,
51 notes: None,
52 error_message: None,
53 };
54
55 self.save_dispatch(&record)?;
56 Ok(record)
57 }
58
59 pub fn save_dispatch(&self, record: &ReviewRunRecord) -> Result<(), TrackError> {
60 let record = record.clone();
61 validate_single_normal_path_component(
62 &record.review_id,
63 "Review id",
64 ErrorCode::InvalidPathComponent,
65 )?;
66 validate_single_normal_path_component(
67 &record.dispatch_id,
68 "Dispatch id",
69 ErrorCode::InvalidPathComponent,
70 )?;
71
72 self.database.run(move |connection| {
73 Box::pin(async move {
74 sqlx::query(
75 r#"
76 INSERT INTO review_runs (
77 dispatch_id, review_id, pull_request_url, repository_full_name,
78 workspace_key, preferred_tool, status, created_at, updated_at,
79 finished_at, remote_host, branch_name, worktree_path,
80 follow_up_request, target_head_oid, summary, review_submitted,
81 github_review_id, github_review_url, notes, error_message
82 )
83 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
84 ON CONFLICT(dispatch_id) DO UPDATE SET
85 review_id = excluded.review_id,
86 pull_request_url = excluded.pull_request_url,
87 repository_full_name = excluded.repository_full_name,
88 workspace_key = excluded.workspace_key,
89 preferred_tool = excluded.preferred_tool,
90 status = excluded.status,
91 created_at = excluded.created_at,
92 updated_at = excluded.updated_at,
93 finished_at = excluded.finished_at,
94 remote_host = excluded.remote_host,
95 branch_name = excluded.branch_name,
96 worktree_path = excluded.worktree_path,
97 follow_up_request = excluded.follow_up_request,
98 target_head_oid = excluded.target_head_oid,
99 summary = excluded.summary,
100 review_submitted = excluded.review_submitted,
101 github_review_id = excluded.github_review_id,
102 github_review_url = excluded.github_review_url,
103 notes = excluded.notes,
104 error_message = excluded.error_message
105 "#,
106 )
107 .bind(&record.dispatch_id)
108 .bind(&record.review_id)
109 .bind(&record.pull_request_url)
110 .bind(&record.repository_full_name)
111 .bind(&record.workspace_key)
112 .bind(record.preferred_tool.as_str())
113 .bind(record.status.as_str())
114 .bind(format_iso_8601_millis(record.created_at))
115 .bind(format_iso_8601_millis(record.updated_at))
116 .bind(record.finished_at.map(format_iso_8601_millis))
117 .bind(&record.remote_host)
118 .bind(record.branch_name.as_deref())
119 .bind(record.worktree_path.as_deref())
120 .bind(record.follow_up_request.as_deref())
121 .bind(record.target_head_oid.as_deref())
122 .bind(record.summary.as_deref())
123 .bind(record.review_submitted as i64)
124 .bind(record.github_review_id.as_deref())
125 .bind(record.github_review_url.as_deref())
126 .bind(record.notes.as_deref())
127 .bind(record.error_message.as_deref())
128 .execute(&mut *connection)
129 .await
130 .map_err(|error| {
131 TrackError::new(
132 ErrorCode::DispatchWriteFailed,
133 format!(
134 "Could not save the review run record for review {}: {error}",
135 record.review_id
136 ),
137 )
138 })?;
139
140 Ok(())
141 })
142 })
143 }
144
145 pub fn latest_dispatch_for_review(
146 &self,
147 review_id: &str,
148 ) -> Result<Option<ReviewRunRecord>, TrackError> {
149 Ok(self.dispatches_for_review(review_id)?.into_iter().next())
150 }
151
152 pub fn dispatches_for_review(
153 &self,
154 review_id: &str,
155 ) -> Result<Vec<ReviewRunRecord>, TrackError> {
156 let review_id = validate_single_normal_path_component(
157 review_id,
158 "Review id",
159 ErrorCode::InvalidPathComponent,
160 )?;
161
162 self.database.run(move |connection| {
163 Box::pin(async move {
164 let rows = sqlx::query(
165 r#"
166 SELECT *
167 FROM review_runs
168 WHERE review_id = ?1
169 ORDER BY created_at DESC
170 "#,
171 )
172 .bind(&review_id)
173 .fetch_all(&mut *connection)
174 .await
175 .map_err(|error| {
176 TrackError::new(
177 ErrorCode::DispatchWriteFailed,
178 format!("Could not load review runs for {review_id}: {error}"),
179 )
180 })?;
181
182 rows.into_iter().map(review_run_from_row).collect()
183 })
184 })
185 }
186
187 pub fn list_dispatches(
188 &self,
189 limit: Option<usize>,
190 ) -> Result<Vec<ReviewRunRecord>, TrackError> {
191 let limit = limit.map(|value| value as i64);
192 self.database.run(move |connection| {
193 Box::pin(async move {
194 let rows = if let Some(limit) = limit {
195 sqlx::query(
196 r#"
197 SELECT *
198 FROM review_runs
199 ORDER BY created_at DESC
200 LIMIT ?1
201 "#,
202 )
203 .bind(limit)
204 .fetch_all(&mut *connection)
205 .await
206 } else {
207 sqlx::query(
208 r#"
209 SELECT *
210 FROM review_runs
211 ORDER BY created_at DESC
212 "#,
213 )
214 .fetch_all(&mut *connection)
215 .await
216 }
217 .map_err(|error| {
218 TrackError::new(
219 ErrorCode::DispatchWriteFailed,
220 format!("Could not list review run records: {error}"),
221 )
222 })?;
223
224 rows.into_iter().map(review_run_from_row).collect()
225 })
226 })
227 }
228
229 pub fn review_ids_with_history(&self) -> Result<Vec<String>, TrackError> {
230 self.database.run(move |connection| {
231 Box::pin(async move {
232 let rows = sqlx::query(
233 r#"
234 SELECT DISTINCT review_id
235 FROM review_runs
236 ORDER BY review_id ASC
237 "#,
238 )
239 .fetch_all(&mut *connection)
240 .await
241 .map_err(|error| {
242 TrackError::new(
243 ErrorCode::DispatchWriteFailed,
244 format!("Could not load review ids with run history: {error}"),
245 )
246 })?;
247
248 Ok(rows
249 .into_iter()
250 .map(|row| row.get::<String, _>("review_id"))
251 .collect())
252 })
253 })
254 }
255
256 pub fn get_dispatch(
257 &self,
258 review_id: &str,
259 dispatch_id: &str,
260 ) -> Result<Option<ReviewRunRecord>, TrackError> {
261 let review_id = validate_single_normal_path_component(
262 review_id,
263 "Review id",
264 ErrorCode::InvalidPathComponent,
265 )?;
266 let dispatch_id = validate_single_normal_path_component(
267 dispatch_id,
268 "Dispatch id",
269 ErrorCode::InvalidPathComponent,
270 )?;
271
272 self.database.run(move |connection| {
273 Box::pin(async move {
274 let row = sqlx::query(
275 r#"
276 SELECT *
277 FROM review_runs
278 WHERE review_id = ?1 AND dispatch_id = ?2
279 "#,
280 )
281 .bind(&review_id)
282 .bind(&dispatch_id)
283 .fetch_optional(&mut *connection)
284 .await
285 .map_err(|error| {
286 TrackError::new(
287 ErrorCode::DispatchWriteFailed,
288 format!(
289 "Could not load the review run {dispatch_id} for review {review_id}: {error}"
290 ),
291 )
292 })?;
293
294 row.map(review_run_from_row).transpose()
295 })
296 })
297 }
298
299 pub fn delete_dispatch_history_for_review(&self, review_id: &str) -> Result<(), TrackError> {
300 let review_id = validate_single_normal_path_component(
301 review_id,
302 "Review id",
303 ErrorCode::InvalidPathComponent,
304 )?;
305
306 self.database.run(move |connection| {
307 Box::pin(async move {
308 sqlx::query("DELETE FROM review_runs WHERE review_id = ?1")
309 .bind(&review_id)
310 .execute(&mut *connection)
311 .await
312 .map_err(|error| {
313 TrackError::new(
314 ErrorCode::DispatchWriteFailed,
315 format!(
316 "Could not remove the review dispatch history for {review_id}: {error}"
317 ),
318 )
319 })?;
320
321 Ok(())
322 })
323 })
324 }
325}
326
327fn review_run_from_row(row: sqlx::sqlite::SqliteRow) -> Result<ReviewRunRecord, TrackError> {
328 let dispatch_id = row.get::<String, _>("dispatch_id");
329 let created_at =
330 parse_iso_8601_millis(&row.get::<String, _>("created_at")).map_err(|error| {
331 TrackError::new(
332 ErrorCode::DispatchWriteFailed,
333 format!("Review run {dispatch_id} has an invalid created_at timestamp: {error}"),
334 )
335 })?;
336 let updated_at =
337 parse_iso_8601_millis(&row.get::<String, _>("updated_at")).map_err(|error| {
338 TrackError::new(
339 ErrorCode::DispatchWriteFailed,
340 format!("Review run {dispatch_id} has an invalid updated_at timestamp: {error}"),
341 )
342 })?;
343 let finished_at = row
344 .get::<Option<String>, _>("finished_at")
345 .map(|value| parse_iso_8601_millis(&value))
346 .transpose()
347 .map_err(|error| {
348 TrackError::new(
349 ErrorCode::DispatchWriteFailed,
350 format!("Review run {dispatch_id} has an invalid finished_at timestamp: {error}"),
351 )
352 })?;
353
354 Ok(ReviewRunRecord {
355 dispatch_id,
356 review_id: row.get::<String, _>("review_id"),
357 pull_request_url: row.get::<String, _>("pull_request_url"),
358 repository_full_name: row.get::<String, _>("repository_full_name"),
359 workspace_key: row.get::<String, _>("workspace_key"),
360 preferred_tool: parse_preferred_tool(
361 row.try_get::<String, _>("preferred_tool")
362 .unwrap_or_else(|_| "codex".to_owned())
363 .as_str(),
364 )?,
365 status: parse_dispatch_status(row.get::<String, _>("status").as_str())?,
366 created_at,
367 updated_at,
368 finished_at,
369 remote_host: row.get::<String, _>("remote_host"),
370 branch_name: row.get::<Option<String>, _>("branch_name"),
371 worktree_path: row.get::<Option<String>, _>("worktree_path"),
372 follow_up_request: row.get::<Option<String>, _>("follow_up_request"),
373 target_head_oid: row.get::<Option<String>, _>("target_head_oid"),
374 summary: row.get::<Option<String>, _>("summary"),
375 review_submitted: row.get::<i64, _>("review_submitted") != 0,
376 github_review_id: row.get::<Option<String>, _>("github_review_id"),
377 github_review_url: row.get::<Option<String>, _>("github_review_url"),
378 notes: row.get::<Option<String>, _>("notes"),
379 error_message: row.get::<Option<String>, _>("error_message"),
380 })
381}
382
383fn parse_dispatch_status(value: &str) -> Result<DispatchStatus, TrackError> {
384 match value {
385 "preparing" => Ok(DispatchStatus::Preparing),
386 "running" => Ok(DispatchStatus::Running),
387 "succeeded" => Ok(DispatchStatus::Succeeded),
388 "canceled" => Ok(DispatchStatus::Canceled),
389 "failed" => Ok(DispatchStatus::Failed),
390 "blocked" => Ok(DispatchStatus::Blocked),
391 _ => Err(TrackError::new(
392 ErrorCode::DispatchWriteFailed,
393 format!("Dispatch status `{value}` is not valid."),
394 )),
395 }
396}
397
398fn parse_preferred_tool(value: &str) -> Result<RemoteAgentPreferredTool, TrackError> {
399 RemoteAgentPreferredTool::from_str(value).ok_or_else(|| {
400 TrackError::new(
401 ErrorCode::DispatchWriteFailed,
402 format!("Remote agent preferred tool `{value}` is not valid."),
403 )
404 })
405}