Skip to main content

track_core/
review_dispatch_repository.rs

1use std::path::PathBuf;
2
3use sqlx::Row;
4
5use crate::database::DatabaseContext;
6use crate::errors::{ErrorCode, TrackError};
7use crate::path_component::validate_single_normal_path_component;
8use crate::time_utils::{format_iso_8601_millis, now_utc, parse_iso_8601_millis};
9use crate::types::{DispatchStatus, RemoteAgentPreferredTool, ReviewRecord, ReviewRunRecord};
10
11#[derive(Debug, Clone)]
12pub struct ReviewDispatchRepository {
13    database: DatabaseContext,
14}
15
16impl ReviewDispatchRepository {
17    pub fn new(database_path: Option<PathBuf>) -> Result<Self, TrackError> {
18        let database = DatabaseContext::new(database_path)?;
19        database.initialize()?;
20
21        Ok(Self { database })
22    }
23
24    pub fn create_dispatch(
25        &self,
26        review: &ReviewRecord,
27        remote_host: &str,
28        preferred_tool: RemoteAgentPreferredTool,
29    ) -> Result<ReviewRunRecord, TrackError> {
30        let timestamp = now_utc();
31        let record = ReviewRunRecord {
32            dispatch_id: format!("dispatch-{}", timestamp.unix_timestamp_nanos()),
33            review_id: review.id.clone(),
34            pull_request_url: review.pull_request_url.clone(),
35            repository_full_name: review.repository_full_name.clone(),
36            workspace_key: review.workspace_key.clone(),
37            preferred_tool,
38            status: DispatchStatus::Preparing,
39            created_at: timestamp,
40            updated_at: timestamp,
41            finished_at: None,
42            remote_host: remote_host.to_owned(),
43            branch_name: None,
44            worktree_path: None,
45            follow_up_request: None,
46            target_head_oid: None,
47            summary: None,
48            review_submitted: false,
49            github_review_id: None,
50            github_review_url: None,
51            notes: None,
52            error_message: None,
53        };
54
55        self.save_dispatch(&record)?;
56        Ok(record)
57    }
58
59    pub fn save_dispatch(&self, record: &ReviewRunRecord) -> Result<(), TrackError> {
60        let record = record.clone();
61        validate_single_normal_path_component(
62            &record.review_id,
63            "Review id",
64            ErrorCode::InvalidPathComponent,
65        )?;
66        validate_single_normal_path_component(
67            &record.dispatch_id,
68            "Dispatch id",
69            ErrorCode::InvalidPathComponent,
70        )?;
71
72        self.database.run(move |connection| {
73            Box::pin(async move {
74                sqlx::query(
75                    r#"
76                    INSERT INTO review_runs (
77                        dispatch_id, review_id, pull_request_url, repository_full_name,
78                        workspace_key, preferred_tool, status, created_at, updated_at,
79                        finished_at, remote_host, branch_name, worktree_path,
80                        follow_up_request, target_head_oid, summary, review_submitted,
81                        github_review_id, github_review_url, notes, error_message
82                    )
83                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
84                    ON CONFLICT(dispatch_id) DO UPDATE SET
85                        review_id = excluded.review_id,
86                        pull_request_url = excluded.pull_request_url,
87                        repository_full_name = excluded.repository_full_name,
88                        workspace_key = excluded.workspace_key,
89                        preferred_tool = excluded.preferred_tool,
90                        status = excluded.status,
91                        created_at = excluded.created_at,
92                        updated_at = excluded.updated_at,
93                        finished_at = excluded.finished_at,
94                        remote_host = excluded.remote_host,
95                        branch_name = excluded.branch_name,
96                        worktree_path = excluded.worktree_path,
97                        follow_up_request = excluded.follow_up_request,
98                        target_head_oid = excluded.target_head_oid,
99                        summary = excluded.summary,
100                        review_submitted = excluded.review_submitted,
101                        github_review_id = excluded.github_review_id,
102                        github_review_url = excluded.github_review_url,
103                        notes = excluded.notes,
104                        error_message = excluded.error_message
105                    "#,
106                )
107                .bind(&record.dispatch_id)
108                .bind(&record.review_id)
109                .bind(&record.pull_request_url)
110                .bind(&record.repository_full_name)
111                .bind(&record.workspace_key)
112                .bind(record.preferred_tool.as_str())
113                .bind(record.status.as_str())
114                .bind(format_iso_8601_millis(record.created_at))
115                .bind(format_iso_8601_millis(record.updated_at))
116                .bind(record.finished_at.map(format_iso_8601_millis))
117                .bind(&record.remote_host)
118                .bind(record.branch_name.as_deref())
119                .bind(record.worktree_path.as_deref())
120                .bind(record.follow_up_request.as_deref())
121                .bind(record.target_head_oid.as_deref())
122                .bind(record.summary.as_deref())
123                .bind(record.review_submitted as i64)
124                .bind(record.github_review_id.as_deref())
125                .bind(record.github_review_url.as_deref())
126                .bind(record.notes.as_deref())
127                .bind(record.error_message.as_deref())
128                .execute(&mut *connection)
129                .await
130                .map_err(|error| {
131                    TrackError::new(
132                        ErrorCode::DispatchWriteFailed,
133                        format!(
134                            "Could not save the review run record for review {}: {error}",
135                            record.review_id
136                        ),
137                    )
138                })?;
139
140                Ok(())
141            })
142        })
143    }
144
145    pub fn latest_dispatch_for_review(
146        &self,
147        review_id: &str,
148    ) -> Result<Option<ReviewRunRecord>, TrackError> {
149        Ok(self.dispatches_for_review(review_id)?.into_iter().next())
150    }
151
152    pub fn dispatches_for_review(
153        &self,
154        review_id: &str,
155    ) -> Result<Vec<ReviewRunRecord>, TrackError> {
156        let review_id = validate_single_normal_path_component(
157            review_id,
158            "Review id",
159            ErrorCode::InvalidPathComponent,
160        )?;
161
162        self.database.run(move |connection| {
163            Box::pin(async move {
164                let rows = sqlx::query(
165                    r#"
166                    SELECT *
167                    FROM review_runs
168                    WHERE review_id = ?1
169                    ORDER BY created_at DESC
170                    "#,
171                )
172                .bind(&review_id)
173                .fetch_all(&mut *connection)
174                .await
175                .map_err(|error| {
176                    TrackError::new(
177                        ErrorCode::DispatchWriteFailed,
178                        format!("Could not load review runs for {review_id}: {error}"),
179                    )
180                })?;
181
182                rows.into_iter().map(review_run_from_row).collect()
183            })
184        })
185    }
186
187    pub fn list_dispatches(
188        &self,
189        limit: Option<usize>,
190    ) -> Result<Vec<ReviewRunRecord>, TrackError> {
191        let limit = limit.map(|value| value as i64);
192        self.database.run(move |connection| {
193            Box::pin(async move {
194                let rows = if let Some(limit) = limit {
195                    sqlx::query(
196                        r#"
197                        SELECT *
198                        FROM review_runs
199                        ORDER BY created_at DESC
200                        LIMIT ?1
201                        "#,
202                    )
203                    .bind(limit)
204                    .fetch_all(&mut *connection)
205                    .await
206                } else {
207                    sqlx::query(
208                        r#"
209                        SELECT *
210                        FROM review_runs
211                        ORDER BY created_at DESC
212                        "#,
213                    )
214                    .fetch_all(&mut *connection)
215                    .await
216                }
217                .map_err(|error| {
218                    TrackError::new(
219                        ErrorCode::DispatchWriteFailed,
220                        format!("Could not list review run records: {error}"),
221                    )
222                })?;
223
224                rows.into_iter().map(review_run_from_row).collect()
225            })
226        })
227    }
228
229    pub fn review_ids_with_history(&self) -> Result<Vec<String>, TrackError> {
230        self.database.run(move |connection| {
231            Box::pin(async move {
232                let rows = sqlx::query(
233                    r#"
234                    SELECT DISTINCT review_id
235                    FROM review_runs
236                    ORDER BY review_id ASC
237                    "#,
238                )
239                .fetch_all(&mut *connection)
240                .await
241                .map_err(|error| {
242                    TrackError::new(
243                        ErrorCode::DispatchWriteFailed,
244                        format!("Could not load review ids with run history: {error}"),
245                    )
246                })?;
247
248                Ok(rows
249                    .into_iter()
250                    .map(|row| row.get::<String, _>("review_id"))
251                    .collect())
252            })
253        })
254    }
255
256    pub fn get_dispatch(
257        &self,
258        review_id: &str,
259        dispatch_id: &str,
260    ) -> Result<Option<ReviewRunRecord>, TrackError> {
261        let review_id = validate_single_normal_path_component(
262            review_id,
263            "Review id",
264            ErrorCode::InvalidPathComponent,
265        )?;
266        let dispatch_id = validate_single_normal_path_component(
267            dispatch_id,
268            "Dispatch id",
269            ErrorCode::InvalidPathComponent,
270        )?;
271
272        self.database.run(move |connection| {
273            Box::pin(async move {
274                let row = sqlx::query(
275                    r#"
276                    SELECT *
277                    FROM review_runs
278                    WHERE review_id = ?1 AND dispatch_id = ?2
279                    "#,
280                )
281                .bind(&review_id)
282                .bind(&dispatch_id)
283                .fetch_optional(&mut *connection)
284                .await
285                .map_err(|error| {
286                    TrackError::new(
287                        ErrorCode::DispatchWriteFailed,
288                        format!(
289                            "Could not load the review run {dispatch_id} for review {review_id}: {error}"
290                        ),
291                    )
292                })?;
293
294                row.map(review_run_from_row).transpose()
295            })
296        })
297    }
298
299    pub fn delete_dispatch_history_for_review(&self, review_id: &str) -> Result<(), TrackError> {
300        let review_id = validate_single_normal_path_component(
301            review_id,
302            "Review id",
303            ErrorCode::InvalidPathComponent,
304        )?;
305
306        self.database.run(move |connection| {
307            Box::pin(async move {
308                sqlx::query("DELETE FROM review_runs WHERE review_id = ?1")
309                    .bind(&review_id)
310                    .execute(&mut *connection)
311                    .await
312                    .map_err(|error| {
313                        TrackError::new(
314                            ErrorCode::DispatchWriteFailed,
315                            format!(
316                                "Could not remove the review dispatch history for {review_id}: {error}"
317                            ),
318                        )
319                    })?;
320
321                Ok(())
322            })
323        })
324    }
325}
326
327fn review_run_from_row(row: sqlx::sqlite::SqliteRow) -> Result<ReviewRunRecord, TrackError> {
328    let dispatch_id = row.get::<String, _>("dispatch_id");
329    let created_at =
330        parse_iso_8601_millis(&row.get::<String, _>("created_at")).map_err(|error| {
331            TrackError::new(
332                ErrorCode::DispatchWriteFailed,
333                format!("Review run {dispatch_id} has an invalid created_at timestamp: {error}"),
334            )
335        })?;
336    let updated_at =
337        parse_iso_8601_millis(&row.get::<String, _>("updated_at")).map_err(|error| {
338            TrackError::new(
339                ErrorCode::DispatchWriteFailed,
340                format!("Review run {dispatch_id} has an invalid updated_at timestamp: {error}"),
341            )
342        })?;
343    let finished_at = row
344        .get::<Option<String>, _>("finished_at")
345        .map(|value| parse_iso_8601_millis(&value))
346        .transpose()
347        .map_err(|error| {
348            TrackError::new(
349                ErrorCode::DispatchWriteFailed,
350                format!("Review run {dispatch_id} has an invalid finished_at timestamp: {error}"),
351            )
352        })?;
353
354    Ok(ReviewRunRecord {
355        dispatch_id,
356        review_id: row.get::<String, _>("review_id"),
357        pull_request_url: row.get::<String, _>("pull_request_url"),
358        repository_full_name: row.get::<String, _>("repository_full_name"),
359        workspace_key: row.get::<String, _>("workspace_key"),
360        preferred_tool: parse_preferred_tool(
361            row.try_get::<String, _>("preferred_tool")
362                .unwrap_or_else(|_| "codex".to_owned())
363                .as_str(),
364        )?,
365        status: parse_dispatch_status(row.get::<String, _>("status").as_str())?,
366        created_at,
367        updated_at,
368        finished_at,
369        remote_host: row.get::<String, _>("remote_host"),
370        branch_name: row.get::<Option<String>, _>("branch_name"),
371        worktree_path: row.get::<Option<String>, _>("worktree_path"),
372        follow_up_request: row.get::<Option<String>, _>("follow_up_request"),
373        target_head_oid: row.get::<Option<String>, _>("target_head_oid"),
374        summary: row.get::<Option<String>, _>("summary"),
375        review_submitted: row.get::<i64, _>("review_submitted") != 0,
376        github_review_id: row.get::<Option<String>, _>("github_review_id"),
377        github_review_url: row.get::<Option<String>, _>("github_review_url"),
378        notes: row.get::<Option<String>, _>("notes"),
379        error_message: row.get::<Option<String>, _>("error_message"),
380    })
381}
382
383fn parse_dispatch_status(value: &str) -> Result<DispatchStatus, TrackError> {
384    match value {
385        "preparing" => Ok(DispatchStatus::Preparing),
386        "running" => Ok(DispatchStatus::Running),
387        "succeeded" => Ok(DispatchStatus::Succeeded),
388        "canceled" => Ok(DispatchStatus::Canceled),
389        "failed" => Ok(DispatchStatus::Failed),
390        "blocked" => Ok(DispatchStatus::Blocked),
391        _ => Err(TrackError::new(
392            ErrorCode::DispatchWriteFailed,
393            format!("Dispatch status `{value}` is not valid."),
394        )),
395    }
396}
397
398fn parse_preferred_tool(value: &str) -> Result<RemoteAgentPreferredTool, TrackError> {
399    RemoteAgentPreferredTool::from_str(value).ok_or_else(|| {
400        TrackError::new(
401            ErrorCode::DispatchWriteFailed,
402            format!("Remote agent preferred tool `{value}` is not valid."),
403        )
404    })
405}