agentics_persistence/db/challenge_creation/
validation.rs1use sqlx::{PgPool, Postgres, Transaction};
2
3use agentics_domain::models::challenge_creation::{
4 ChallengeReviewRecordStatus, ChallengeReviewValidationStatus,
5};
6use agentics_domain::models::hashes::Sha256Digest;
7use agentics_domain::models::ids::{ChallengeReviewRecordId, ChallengeReviewValidationRecordId};
8use agentics_error::{Result, ServiceError};
9
10use super::rows::{ChallengeReviewValidationRecord, row_to_validation_record};
11use super::{
12 CreateChallengeReviewRecordAuditEventInput, clear_stale_active_validation_tx,
13 create_challenge_review_audit_event_tx, lock_quota_scope,
14};
15
16#[derive(Debug, Clone)]
18pub struct BeginChallengeReviewRecordValidationInput {
19 pub validation_record_id: ChallengeReviewValidationRecordId,
20 pub review_record_id: ChallengeReviewRecordId,
21 pub repository_path: String,
22 pub manifest_sha256: Sha256Digest,
23}
24
25#[derive(Debug, Clone)]
27pub struct FinishChallengeReviewRecordValidationInput {
28 pub validation_record_id: ChallengeReviewValidationRecordId,
29 pub review_record_id: ChallengeReviewRecordId,
30 pub status: ChallengeReviewValidationStatus,
31 pub message: String,
32 pub bundle_sha256: Option<Sha256Digest>,
33}
34
35pub async fn begin_challenge_review_record_validation(
37 pool: &PgPool,
38 input: &BeginChallengeReviewRecordValidationInput,
39 window_seconds: i64,
40 validation_limit: i64,
41 validation_timeout_minutes: i32,
42) -> Result<ChallengeReviewValidationRecord> {
43 let mut tx = pool.begin().await?;
44 let scope = format!(
45 "challenge-review-record:{}:validations",
46 input.review_record_id
47 );
48 lock_quota_scope(&mut tx, &scope).await?;
49
50 let status: Option<(String, Option<String>)> = sqlx::query_as(
51 r#"
52 SELECT status, active_validation_record_id::text AS active_validation_record_id
53 FROM challenge_review_records
54 WHERE id = $1::uuid
55 FOR UPDATE
56 "#,
57 )
58 .bind(input.review_record_id.as_str())
59 .fetch_optional(&mut *tx)
60 .await?;
61 let Some((status, active_validation_record_id)) = status else {
62 return Err(ServiceError::NotFound);
63 };
64 let status = ChallengeReviewRecordStatus::from_storage_value(&status).ok_or_else(|| {
65 ServiceError::Internal(format!("unknown challenge review record status `{status}`"))
66 })?;
67 if !matches!(
68 status,
69 ChallengeReviewRecordStatus::PendingReview | ChallengeReviewRecordStatus::Validated
70 ) {
71 return Err(ServiceError::Conflict);
72 }
73 let active_validation_record_id = if active_validation_record_id.is_some() {
74 clear_stale_active_validation_tx(
75 &mut tx,
76 input.review_record_id.as_str(),
77 validation_timeout_minutes,
78 )
79 .await?;
80 let refreshed_active: Option<String> = sqlx::query_scalar(
81 "SELECT active_validation_record_id::text FROM challenge_review_records WHERE id = $1::uuid",
82 )
83 .bind(input.review_record_id.as_str())
84 .fetch_one(&mut *tx)
85 .await?;
86 refreshed_active
87 } else {
88 active_validation_record_id
89 };
90 if active_validation_record_id.is_some() {
91 return Err(ServiceError::Conflict);
92 }
93
94 let recent_validations = count_recent_challenge_review_record_validations_tx(
95 &mut tx,
96 input.review_record_id.as_str(),
97 window_seconds,
98 )
99 .await?;
100 if recent_validations >= validation_limit {
101 return Err(ServiceError::TooManyRequests(format!(
102 "challenge review record validation quota exceeded for `{}`: {} of {} validations used in the last 24 hours",
103 input.review_record_id, recent_validations, validation_limit
104 )));
105 }
106
107 let row = sqlx::query(
108 r#"
109 INSERT INTO challenge_review_validation_records (
110 id, review_record_id, status, message, repository_path, manifest_sha256, bundle_sha256
111 )
112 VALUES ($1::uuid, $2::uuid, 'running', $3, $4, $5, NULL)
113 RETURNING *
114 "#,
115 )
116 .bind(input.validation_record_id.as_str())
117 .bind(input.review_record_id.as_str())
118 .bind("challenge review record validation is running")
119 .bind(&input.repository_path)
120 .bind(input.manifest_sha256.to_string())
121 .fetch_one(&mut *tx)
122 .await?;
123
124 let claim = sqlx::query(
125 r#"
126 UPDATE challenge_review_records
127 SET active_validation_record_id = $2::uuid,
128 updated_at = NOW()
129 WHERE id = $1::uuid
130 AND active_validation_record_id IS NULL
131 "#,
132 )
133 .bind(input.review_record_id.as_str())
134 .bind(input.validation_record_id.as_str())
135 .execute(&mut *tx)
136 .await?;
137 if claim.rows_affected() != 1 {
138 return Err(ServiceError::Conflict);
139 }
140
141 tx.commit().await?;
142 row_to_validation_record(row)
143}
144
145async fn count_recent_challenge_review_record_validations_tx(
147 tx: &mut Transaction<'_, Postgres>,
148 review_record_id: &str,
149 window_seconds: i64,
150) -> Result<i64> {
151 let count = sqlx::query_scalar::<_, i64>(
152 r#"
153 SELECT COUNT(*)::BIGINT
154 FROM challenge_review_validation_records
155 WHERE review_record_id = $1::uuid
156 AND created_at >= NOW() - ($2::TEXT || ' seconds')::INTERVAL
157 "#,
158 )
159 .bind(review_record_id)
160 .bind(window_seconds)
161 .fetch_one(&mut **tx)
162 .await?;
163
164 Ok(count)
165}
166
167pub async fn finish_challenge_review_record_validation(
169 pool: &PgPool,
170 input: &FinishChallengeReviewRecordValidationInput,
171 audit_event: &CreateChallengeReviewRecordAuditEventInput,
172) -> Result<ChallengeReviewValidationRecord> {
173 let mut tx = pool.begin().await?;
174 let next_status = match input.status {
175 ChallengeReviewValidationStatus::Passed => ChallengeReviewRecordStatus::Validated,
176 ChallengeReviewValidationStatus::Failed => ChallengeReviewRecordStatus::PendingReview,
177 ChallengeReviewValidationStatus::Running => {
178 return Err(ServiceError::Internal(
179 "running review record validation cannot finish as running".to_string(),
180 ));
181 }
182 };
183
184 let row = sqlx::query(
185 r#"
186 UPDATE challenge_review_validation_records
187 SET status = $3,
188 message = $4,
189 bundle_sha256 = $5
190 WHERE id = $1::uuid
191 AND review_record_id = $2::uuid
192 AND status = 'running'
193 RETURNING *
194 "#,
195 )
196 .bind(input.validation_record_id.as_str())
197 .bind(input.review_record_id.as_str())
198 .bind(input.status.as_str())
199 .bind(&input.message)
200 .bind(input.bundle_sha256.map(|digest| digest.to_string()))
201 .fetch_optional(&mut *tx)
202 .await?;
203 let Some(row) = row else {
204 return Err(ServiceError::Conflict);
205 };
206
207 let update = sqlx::query(
208 r#"
209 UPDATE challenge_review_records
210 SET status = $2,
211 validation_message = $3,
212 validation_repository_path = (
213 SELECT repository_path
214 FROM challenge_review_validation_records
215 WHERE id = $1::uuid
216 ),
217 validation_bundle_sha256 = $4,
218 active_validation_record_id = NULL,
219 updated_at = NOW()
220 WHERE id = $5::uuid
221 AND active_validation_record_id = $1::uuid
222 AND status IN ('pending_review', 'validated')
223 "#,
224 )
225 .bind(input.validation_record_id.as_str())
226 .bind(next_status.as_str())
227 .bind(&input.message)
228 .bind(input.bundle_sha256.map(|digest| digest.to_string()))
229 .bind(input.review_record_id.as_str())
230 .execute(&mut *tx)
231 .await?;
232 if update.rows_affected() == 0 {
233 tx.commit().await?;
234 return Err(ServiceError::Conflict);
235 }
236
237 create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
238
239 tx.commit().await?;
240 row_to_validation_record(row)
241}