Skip to main content

agentics_persistence/db/challenge_creation/
validation.rs

1use sqlx::{PgPool, Postgres, Transaction};
2
3use agentics_domain::models::challenge_creation::{
4    ChallengeReviewRecordStatus, ChallengeReviewValidationStatus,
5};
6use agentics_domain::models::hashes::Sha256Digest;
7use agentics_domain::models::ids::{ChallengeReviewRecordId, ChallengeReviewValidationRecordId};
8use agentics_error::{Result, ServiceError};
9
10use super::rows::{ChallengeReviewValidationRecord, row_to_validation_record};
11use super::{
12    CreateChallengeReviewRecordAuditEventInput, clear_stale_active_validation_tx,
13    create_challenge_review_audit_event_tx, lock_quota_scope,
14};
15
16/// Input for reserving one review_record validation admission slot before expensive work starts.
17#[derive(Debug, Clone)]
18pub struct BeginChallengeReviewRecordValidationInput {
19    pub validation_record_id: ChallengeReviewValidationRecordId,
20    pub review_record_id: ChallengeReviewRecordId,
21    pub repository_path: String,
22    pub manifest_sha256: Sha256Digest,
23}
24
25/// Input for completing a previously reserved review_record validation record.
26#[derive(Debug, Clone)]
27pub struct FinishChallengeReviewRecordValidationInput {
28    pub validation_record_id: ChallengeReviewValidationRecordId,
29    pub review_record_id: ChallengeReviewRecordId,
30    pub status: ChallengeReviewValidationStatus,
31    pub message: String,
32    pub bundle_sha256: Option<Sha256Digest>,
33}
34
35/// Reserve one validation quota slot and record a running validation attempt.
36pub async fn begin_challenge_review_record_validation(
37    pool: &PgPool,
38    input: &BeginChallengeReviewRecordValidationInput,
39    window_seconds: i64,
40    validation_limit: i64,
41    validation_timeout_minutes: i32,
42) -> Result<ChallengeReviewValidationRecord> {
43    let mut tx = pool.begin().await?;
44    let scope = format!(
45        "challenge-review-record:{}:validations",
46        input.review_record_id
47    );
48    lock_quota_scope(&mut tx, &scope).await?;
49
50    let status: Option<(String, Option<String>)> = sqlx::query_as(
51        r#"
52        SELECT status, active_validation_record_id::text AS active_validation_record_id
53        FROM challenge_review_records
54        WHERE id = $1::uuid
55        FOR UPDATE
56        "#,
57    )
58    .bind(input.review_record_id.as_str())
59    .fetch_optional(&mut *tx)
60    .await?;
61    let Some((status, active_validation_record_id)) = status else {
62        return Err(ServiceError::NotFound);
63    };
64    let status = ChallengeReviewRecordStatus::from_storage_value(&status).ok_or_else(|| {
65        ServiceError::Internal(format!("unknown challenge review record status `{status}`"))
66    })?;
67    if !matches!(
68        status,
69        ChallengeReviewRecordStatus::PendingReview | ChallengeReviewRecordStatus::Validated
70    ) {
71        return Err(ServiceError::Conflict);
72    }
73    let active_validation_record_id = if active_validation_record_id.is_some() {
74        clear_stale_active_validation_tx(
75            &mut tx,
76            input.review_record_id.as_str(),
77            validation_timeout_minutes,
78        )
79        .await?;
80        let refreshed_active: Option<String> = sqlx::query_scalar(
81            "SELECT active_validation_record_id::text FROM challenge_review_records WHERE id = $1::uuid",
82        )
83        .bind(input.review_record_id.as_str())
84        .fetch_one(&mut *tx)
85        .await?;
86        refreshed_active
87    } else {
88        active_validation_record_id
89    };
90    if active_validation_record_id.is_some() {
91        return Err(ServiceError::Conflict);
92    }
93
94    let recent_validations = count_recent_challenge_review_record_validations_tx(
95        &mut tx,
96        input.review_record_id.as_str(),
97        window_seconds,
98    )
99    .await?;
100    if recent_validations >= validation_limit {
101        return Err(ServiceError::TooManyRequests(format!(
102            "challenge review record validation quota exceeded for `{}`: {} of {} validations used in the last 24 hours",
103            input.review_record_id, recent_validations, validation_limit
104        )));
105    }
106
107    let row = sqlx::query(
108        r#"
109        INSERT INTO challenge_review_validation_records (
110            id, review_record_id, status, message, repository_path, manifest_sha256, bundle_sha256
111        )
112        VALUES ($1::uuid, $2::uuid, 'running', $3, $4, $5, NULL)
113        RETURNING *
114        "#,
115    )
116    .bind(input.validation_record_id.as_str())
117    .bind(input.review_record_id.as_str())
118    .bind("challenge review record validation is running")
119    .bind(&input.repository_path)
120    .bind(input.manifest_sha256.to_string())
121    .fetch_one(&mut *tx)
122    .await?;
123
124    let claim = sqlx::query(
125        r#"
126        UPDATE challenge_review_records
127        SET active_validation_record_id = $2::uuid,
128            updated_at = NOW()
129        WHERE id = $1::uuid
130          AND active_validation_record_id IS NULL
131        "#,
132    )
133    .bind(input.review_record_id.as_str())
134    .bind(input.validation_record_id.as_str())
135    .execute(&mut *tx)
136    .await?;
137    if claim.rows_affected() != 1 {
138        return Err(ServiceError::Conflict);
139    }
140
141    tx.commit().await?;
142    row_to_validation_record(row)
143}
144
145/// Count validation attempts for one review_record inside a rolling window under a quota lock.
146async fn count_recent_challenge_review_record_validations_tx(
147    tx: &mut Transaction<'_, Postgres>,
148    review_record_id: &str,
149    window_seconds: i64,
150) -> Result<i64> {
151    let count = sqlx::query_scalar::<_, i64>(
152        r#"
153        SELECT COUNT(*)::BIGINT
154        FROM challenge_review_validation_records
155        WHERE review_record_id = $1::uuid
156          AND created_at >= NOW() - ($2::TEXT || ' seconds')::INTERVAL
157        "#,
158    )
159    .bind(review_record_id)
160    .bind(window_seconds)
161    .fetch_one(&mut **tx)
162    .await?;
163
164    Ok(count)
165}
166
167/// Complete a reserved review_record validation record and transition the review_record status.
168pub async fn finish_challenge_review_record_validation(
169    pool: &PgPool,
170    input: &FinishChallengeReviewRecordValidationInput,
171    audit_event: &CreateChallengeReviewRecordAuditEventInput,
172) -> Result<ChallengeReviewValidationRecord> {
173    let mut tx = pool.begin().await?;
174    let next_status = match input.status {
175        ChallengeReviewValidationStatus::Passed => ChallengeReviewRecordStatus::Validated,
176        ChallengeReviewValidationStatus::Failed => ChallengeReviewRecordStatus::PendingReview,
177        ChallengeReviewValidationStatus::Running => {
178            return Err(ServiceError::Internal(
179                "running review record validation cannot finish as running".to_string(),
180            ));
181        }
182    };
183
184    let row = sqlx::query(
185        r#"
186        UPDATE challenge_review_validation_records
187        SET status = $3,
188            message = $4,
189            bundle_sha256 = $5
190        WHERE id = $1::uuid
191          AND review_record_id = $2::uuid
192          AND status = 'running'
193        RETURNING *
194        "#,
195    )
196    .bind(input.validation_record_id.as_str())
197    .bind(input.review_record_id.as_str())
198    .bind(input.status.as_str())
199    .bind(&input.message)
200    .bind(input.bundle_sha256.map(|digest| digest.to_string()))
201    .fetch_optional(&mut *tx)
202    .await?;
203    let Some(row) = row else {
204        return Err(ServiceError::Conflict);
205    };
206
207    let update = sqlx::query(
208        r#"
209        UPDATE challenge_review_records
210        SET status = $2,
211            validation_message = $3,
212            validation_repository_path = (
213                SELECT repository_path
214                FROM challenge_review_validation_records
215                WHERE id = $1::uuid
216            ),
217            validation_bundle_sha256 = $4,
218            active_validation_record_id = NULL,
219            updated_at = NOW()
220        WHERE id = $5::uuid
221          AND active_validation_record_id = $1::uuid
222          AND status IN ('pending_review', 'validated')
223        "#,
224    )
225    .bind(input.validation_record_id.as_str())
226    .bind(next_status.as_str())
227    .bind(&input.message)
228    .bind(input.bundle_sha256.map(|digest| digest.to_string()))
229    .bind(input.review_record_id.as_str())
230    .execute(&mut *tx)
231    .await?;
232    if update.rows_affected() == 0 {
233        tx.commit().await?;
234        return Err(ServiceError::Conflict);
235    }
236
237    create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
238
239    tx.commit().await?;
240    row_to_validation_record(row)
241}