Skip to main content

agentics_persistence/db/
challenge_creation.rs

1//! Challenge review record, GitHub identity, private asset, and review lifecycle queries.
2
3use sqlx::{PgPool, Postgres, Row, Transaction};
4
5use agentics_domain::models::auth::GithubUserId;
6use agentics_domain::models::challenge_creation::{
7    ChallengeCreationManifest, ChallengePrivateAssetKind, ChallengeReviewRecordStatus,
8};
9use agentics_domain::models::github::GithubPullRequestNumber;
10use agentics_domain::models::hashes::GitCommitSha;
11use agentics_domain::models::hashes::Sha256Digest;
12use agentics_domain::models::ids::{ChallengePrivateAssetId, ChallengeReviewRecordId, HumanId};
13use agentics_domain::models::names::AssetName;
14use agentics_domain::models::paths::RepoRelativePath;
15use agentics_domain::models::urls::{GithubPullRequestUrl, GithubRepoRemote};
16use agentics_domain::storage::StorageKey;
17use agentics_error::{Result, ServiceError};
18
19mod assets;
20mod audit;
21mod publishing;
22mod rows;
23mod validation;
24
25pub use assets::{
26    activate_challenge_private_asset, activate_challenge_private_asset_with_audit,
27    fail_challenge_private_asset, private_asset_storage_key_has_active_reference,
28    reserve_challenge_private_asset,
29};
30pub use audit::CreateChallengeReviewRecordAuditEventInput;
31use audit::create_challenge_review_audit_event_tx;
32pub use publishing::{
33    ClaimedChallengeReviewRecordForPublish, PublishArchiveChallengeReviewRecordInput,
34    PublishNewChallengeReviewRecordInput, claim_challenge_review_record_for_publish,
35    fail_challenge_review_record_publish, mark_challenge_review_record_published,
36    publish_archive_challenge_review_record, publish_new_challenge_review_record,
37};
38pub use rows::{
39    AdminChallengePrivateAssetRecord, ChallengePrivateAssetRecord, ChallengeReviewRecordRecord,
40    ChallengeReviewValidationRecord, list_challenge_private_asset_states,
41};
42use rows::{
43    list_private_assets_for_review_record, list_validation_records_for_review_record,
44    optional_storage_key_from_row, row_to_review_record, storage_key_from_row,
45};
46pub use validation::{
47    BeginChallengeReviewRecordValidationInput, FinishChallengeReviewRecordValidationInput,
48    begin_challenge_review_record_validation, finish_challenge_review_record_validation,
49};
50
51use super::ids::{challenge_private_asset_id_from_row, challenge_review_record_id_from_row};
52
53/// Input for inserting one GitHub PR-backed challenge review record.
54#[derive(Debug, Clone)]
55pub struct CreateChallengeReviewRecordInput {
56    pub review_record_id: ChallengeReviewRecordId,
57    pub creator_human_id: HumanId,
58    pub max_active_review_records: i64,
59    pub creator_github_user_id: GithubUserId,
60    pub creator_github_login: String,
61    pub repo_url: GithubRepoRemote,
62    pub pr_number: GithubPullRequestNumber,
63    pub pr_url: GithubPullRequestUrl,
64    pub commit_sha: GitCommitSha,
65    pub challenge_path: RepoRelativePath,
66    pub manifest_sha256: Sha256Digest,
67    pub manifest: ChallengeCreationManifest,
68}
69
70/// Input for persisting one private benchmark asset.
71#[derive(Debug, Clone)]
72pub struct CreateChallengePrivateAssetInput {
73    pub asset_row_id: ChallengePrivateAssetId,
74    pub review_record_id: ChallengeReviewRecordId,
75    pub asset_name: AssetName,
76    pub kind: ChallengePrivateAssetKind,
77    pub required: bool,
78    pub size_bytes: i64,
79    pub sha256: Sha256Digest,
80    pub storage_key: StorageKey,
81    pub temporary_storage_key: StorageKey,
82    pub uploader_human_id: HumanId,
83}
84
85/// Internal private asset cleanup candidate.
86#[derive(Debug, Clone)]
87pub struct ChallengePrivateAssetPurgeRecord {
88    pub id: ChallengePrivateAssetId,
89    pub storage_key: StorageKey,
90    pub temporary_storage_key: Option<StorageKey>,
91}
92
93/// Insert a new challenge review record bound to a GitHub PR.
94pub async fn create_challenge_review_record(
95    pool: &PgPool,
96    input: &CreateChallengeReviewRecordInput,
97    audit_event: &CreateChallengeReviewRecordAuditEventInput,
98) -> Result<ChallengeReviewRecordRecord> {
99    if audit_event.review_record_id != input.review_record_id {
100        return Err(ServiceError::Internal(
101            "review record creation audit event targets a different review record".to_string(),
102        ));
103    }
104    let manifest_json =
105        serde_json::to_value(&input.manifest).map_err(|e| ServiceError::Internal(e.to_string()))?;
106    let mut tx = pool.begin().await?;
107    let scope = format!("challenge-review-records:human:{}", input.creator_human_id);
108    lock_quota_scope(&mut tx, &scope).await?;
109    let active_review_records =
110        count_active_challenge_review_records_for_human_tx(&mut tx, &input.creator_human_id)
111            .await?;
112    if active_review_records >= input.max_active_review_records {
113        return Err(ServiceError::TooManyRequests(format!(
114            "challenge review record quota exceeded: {active_review_records} of {} active review records are already open",
115            input.max_active_review_records
116        )));
117    }
118    let row = sqlx::query(
119        r#"
120        INSERT INTO challenge_review_records (
121            id,
122            challenge_name,
123            request_kind,
124            status,
125            creator_human_id,
126            creator_github_user_id,
127            creator_github_login,
128            repo_url,
129            repo_key,
130            pr_number,
131            pr_url,
132            commit_sha,
133            challenge_path,
134            manifest_sha256,
135            manifest_json
136        )
137        VALUES ($1::uuid, $2, $3, 'pending_review', $4::uuid, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
138        RETURNING *
139        "#,
140    )
141    .bind(input.review_record_id.as_str())
142    .bind(input.manifest.challenge_name.as_str())
143    .bind(input.manifest.request.as_str())
144    .bind(input.creator_human_id.as_str())
145    .bind(input.creator_github_user_id.as_i64())
146    .bind(&input.creator_github_login)
147    .bind(input.repo_url.as_str())
148    .bind(input.repo_url.repository_key().as_str())
149    .bind(input.pr_number.as_i32().map_err(|e| {
150        ServiceError::Internal(format!(
151            "invalid typed pull request number reached database: {e}"
152        ))
153    })?)
154    .bind(input.pr_url.as_str())
155    .bind(input.commit_sha.to_string())
156    .bind(input.challenge_path.as_str())
157    .bind(input.manifest_sha256.to_string())
158    .bind(&manifest_json)
159    .fetch_one(&mut *tx)
160    .await?;
161
162    create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
163    tx.commit().await?;
164
165    row_to_review_record(row, Vec::new(), Vec::new())
166}
167
168/// Get one review_record with its private assets and validation records.
169pub async fn get_challenge_review_record(
170    pool: &PgPool,
171    review_record_id: &ChallengeReviewRecordId,
172) -> Result<Option<ChallengeReviewRecordRecord>> {
173    let row = sqlx::query("SELECT * FROM challenge_review_records WHERE id = $1::uuid")
174        .bind(review_record_id.as_str())
175        .fetch_optional(pool)
176        .await?;
177
178    let Some(row) = row else {
179        return Ok(None);
180    };
181
182    let assets = list_private_assets_for_review_record(pool, review_record_id).await?;
183    let validation_records =
184        list_validation_records_for_review_record(pool, review_record_id).await?;
185    row_to_review_record(row, assets, validation_records).map(Some)
186}
187
188/// List recent review_records for admin review.
189pub async fn list_challenge_review_records(
190    pool: &PgPool,
191    limit: i64,
192) -> Result<Vec<ChallengeReviewRecordRecord>> {
193    let rows = sqlx::query(
194        r#"
195        SELECT *
196        FROM challenge_review_records
197        ORDER BY updated_at DESC, created_at DESC
198        LIMIT $1
199        "#,
200    )
201    .bind(limit)
202    .fetch_all(pool)
203    .await?;
204
205    let mut review_records = Vec::with_capacity(rows.len());
206    for row in rows {
207        let review_record_id = challenge_review_record_id_from_row(&row, "id")?;
208        let assets = list_private_assets_for_review_record(pool, &review_record_id).await?;
209        let validation_records =
210            list_validation_records_for_review_record(pool, &review_record_id).await?;
211        review_records.push(row_to_review_record(row, assets, validation_records)?);
212    }
213    Ok(review_records)
214}
215
216/// Count non-terminal review_records owned by a human for creator quota enforcement.
217async fn count_active_challenge_review_records_for_human_tx(
218    tx: &mut Transaction<'_, Postgres>,
219    human_id: &HumanId,
220) -> Result<i64> {
221    let count = sqlx::query_scalar::<_, i64>(
222        r#"
223        SELECT COUNT(*)::BIGINT
224        FROM challenge_review_records
225        WHERE creator_human_id = $1::uuid
226          AND status IN ('pending_review', 'validated', 'approved', 'publishing')
227        "#,
228    )
229    .bind(human_id.as_str())
230    .fetch_one(&mut **tx)
231    .await?;
232
233    Ok(count)
234}
235
236/// Fail and clear active validation leases that exceeded the configured timeout.
237pub(super) async fn clear_stale_active_validation_tx(
238    tx: &mut Transaction<'_, Postgres>,
239    review_record_id: &str,
240    timeout_minutes: i32,
241) -> Result<()> {
242    let stale_validation_id: Option<String> = sqlx::query_scalar(
243        r#"
244        SELECT v.id::text
245        FROM challenge_review_records d
246        JOIN challenge_review_validation_records v ON v.id = d.active_validation_record_id
247        WHERE d.id = $1::uuid
248          AND (
249            v.status <> 'running'
250            OR v.created_at < NOW() - INTERVAL '1 minute' * $2
251          )
252        "#,
253    )
254    .bind(review_record_id)
255    .bind(timeout_minutes.max(1))
256    .fetch_optional(&mut **tx)
257    .await?;
258    let Some(stale_validation_id) = stale_validation_id else {
259        return Ok(());
260    };
261
262    sqlx::query(
263        r#"
264        UPDATE challenge_review_validation_records
265        SET status = 'failed',
266            message = 'challenge review record validation lease expired'
267        WHERE id = $1::uuid
268          AND status = 'running'
269        "#,
270    )
271    .bind(&stale_validation_id)
272    .execute(&mut **tx)
273    .await?;
274
275    sqlx::query(
276        r#"
277        UPDATE challenge_review_records
278        SET active_validation_record_id = NULL,
279            validation_message = 'challenge review record validation lease expired',
280            updated_at = NOW()
281        WHERE id = $1::uuid
282          AND active_validation_record_id = $2::uuid
283        "#,
284    )
285    .bind(review_record_id)
286    .bind(&stale_validation_id)
287    .execute(&mut **tx)
288    .await?;
289
290    Ok(())
291}
292
293/// Handles lock quota scope for this module.
294pub(super) async fn lock_quota_scope(
295    tx: &mut Transaction<'_, Postgres>,
296    scope: &str,
297) -> Result<()> {
298    sqlx::query(
299        r#"
300        INSERT INTO quota_admission_locks (scope)
301        VALUES ($1)
302        ON CONFLICT (scope) DO NOTHING
303        "#,
304    )
305    .bind(scope)
306    .execute(&mut **tx)
307    .await?;
308
309    sqlx::query(
310        r#"
311        SELECT scope
312        FROM quota_admission_locks
313        WHERE scope = $1
314        FOR UPDATE
315        "#,
316    )
317    .bind(scope)
318    .fetch_one(&mut **tx)
319    .await?;
320
321    Ok(())
322}
323
324/// Approve the latest validated review_record content and freeze its review digest.
325pub async fn approve_validated_challenge_review_record(
326    pool: &PgPool,
327    review_record_id: &str,
328    message: Option<&str>,
329) -> Result<()> {
330    let result = sqlx::query(
331        r#"
332        UPDATE challenge_review_records
333        SET status = 'approved',
334            validation_message = COALESCE($2, validation_message),
335            approved_bundle_sha256 = validation_bundle_sha256,
336            updated_at = NOW()
337        WHERE id = $1::uuid
338          AND status = 'validated'
339          AND validation_bundle_sha256 IS NOT NULL
340          AND active_validation_record_id IS NULL
341        "#,
342    )
343    .bind(review_record_id)
344    .bind(message)
345    .execute(pool)
346    .await?;
347
348    if result.rows_affected() == 0 {
349        return Err(ServiceError::Conflict);
350    }
351    Ok(())
352}
353
354/// Approve a validated review_record and audit the exact digest approved in one transaction.
355pub async fn approve_validated_challenge_review_record_with_audit(
356    pool: &PgPool,
357    review_record_id: &ChallengeReviewRecordId,
358    expected_validation_bundle_sha256: &Sha256Digest,
359    message: Option<&str>,
360    audit_event: &CreateChallengeReviewRecordAuditEventInput,
361) -> Result<()> {
362    if audit_event.review_record_id != *review_record_id {
363        return Err(ServiceError::Internal(
364            "review record approval audit event targets a different review record".to_string(),
365        ));
366    }
367    let mut tx = pool.begin().await?;
368    let row = sqlx::query(
369        r#"
370        UPDATE challenge_review_records
371        SET status = 'approved',
372            validation_message = COALESCE($2, validation_message),
373            approved_bundle_sha256 = validation_bundle_sha256,
374            updated_at = NOW()
375        WHERE id = $1::uuid
376          AND status = 'validated'
377          AND validation_bundle_sha256 IS NOT NULL
378          AND validation_bundle_sha256 = $3
379          AND active_validation_record_id IS NULL
380        RETURNING approved_bundle_sha256
381        "#,
382    )
383    .bind(review_record_id.as_str())
384    .bind(message)
385    .bind(expected_validation_bundle_sha256.to_string())
386    .fetch_optional(&mut *tx)
387    .await?;
388
389    let Some(row) = row else {
390        return Err(ServiceError::Conflict);
391    };
392    let approved_bundle_sha256: Option<String> = row.try_get("approved_bundle_sha256")?;
393    create_challenge_review_audit_event_tx(
394        &mut tx,
395        &CreateChallengeReviewRecordAuditEventInput {
396            event_id: audit_event.event_id.clone(),
397            review_record_id: review_record_id.clone(),
398            actor_human_id: audit_event.actor_human_id.clone(),
399            actor_admin_service_token_id: audit_event.actor_admin_service_token_id.clone(),
400            actor_display: audit_event.actor_display.clone(),
401            action: "review_record_approved".to_string(),
402            message: message.unwrap_or_default().to_string(),
403            metadata: serde_json::json!({ "approved_bundle_sha256": approved_bundle_sha256 }),
404        },
405    )
406    .await?;
407
408    tx.commit().await?;
409    Ok(())
410}
411
412/// Move a review_record to a review status.
413pub async fn update_challenge_review_record_status(
414    pool: &PgPool,
415    review_record_id: &str,
416    status: ChallengeReviewRecordStatus,
417    message: Option<&str>,
418) -> Result<()> {
419    let result = sqlx::query(
420        r#"
421        UPDATE challenge_review_records
422        SET status = $2,
423            validation_message = COALESCE($3, validation_message),
424            updated_at = NOW()
425        WHERE id = $1::uuid
426          AND status IN ('pending_review', 'validated', 'approved')
427          AND active_validation_record_id IS NULL
428        "#,
429    )
430    .bind(review_record_id)
431    .bind(status.as_str())
432    .bind(message)
433    .execute(pool)
434    .await?;
435
436    if result.rows_affected() == 0 {
437        return Err(ServiceError::Conflict);
438    }
439    Ok(())
440}
441
442/// Move a review_record to a review status and append its audit event atomically.
443pub async fn update_challenge_review_record_status_with_audit(
444    pool: &PgPool,
445    review_record_id: &ChallengeReviewRecordId,
446    status: ChallengeReviewRecordStatus,
447    message: Option<&str>,
448    audit_event: &CreateChallengeReviewRecordAuditEventInput,
449) -> Result<()> {
450    let mut tx = pool.begin().await?;
451    let result = sqlx::query(
452        r#"
453        UPDATE challenge_review_records
454        SET status = $2,
455            validation_message = COALESCE($3, validation_message),
456            updated_at = NOW()
457        WHERE id = $1::uuid
458          AND status IN ('pending_review', 'validated', 'approved')
459          AND active_validation_record_id IS NULL
460        "#,
461    )
462    .bind(review_record_id.as_str())
463    .bind(status.as_str())
464    .bind(message)
465    .execute(&mut *tx)
466    .await?;
467
468    if result.rows_affected() == 0 {
469        return Err(ServiceError::Conflict);
470    }
471    create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
472    tx.commit().await?;
473    Ok(())
474}
475
476/// Mark one review_record abandoned unless it has already been published.
477pub async fn abandon_challenge_review_record(
478    pool: &PgPool,
479    review_record_id: &str,
480    message: Option<&str>,
481) -> Result<()> {
482    let result = sqlx::query(
483        r#"
484        UPDATE challenge_review_records
485        SET status = 'abandoned',
486            validation_message = COALESCE($2, validation_message),
487            updated_at = NOW()
488        WHERE id = $1::uuid
489          AND status IN ('pending_review', 'validated', 'approved', 'rejected')
490          AND active_validation_record_id IS NULL
491        "#,
492    )
493    .bind(review_record_id)
494    .bind(message)
495    .execute(pool)
496    .await?;
497
498    if result.rows_affected() == 0 {
499        return Err(ServiceError::Conflict);
500    }
501    Ok(())
502}
503
504/// Mark one review_record abandoned and append its audit event atomically.
505pub async fn abandon_challenge_review_record_with_audit(
506    pool: &PgPool,
507    review_record_id: &ChallengeReviewRecordId,
508    message: Option<&str>,
509    audit_event: &CreateChallengeReviewRecordAuditEventInput,
510) -> Result<()> {
511    let mut tx = pool.begin().await?;
512    let result = sqlx::query(
513        r#"
514        UPDATE challenge_review_records
515        SET status = 'abandoned',
516            validation_message = COALESCE($2, validation_message),
517            updated_at = NOW()
518        WHERE id = $1::uuid
519          AND status IN ('pending_review', 'validated', 'approved', 'rejected')
520          AND active_validation_record_id IS NULL
521        "#,
522    )
523    .bind(review_record_id.as_str())
524    .bind(message)
525    .execute(&mut *tx)
526    .await?;
527
528    if result.rows_affected() == 0 {
529        return Err(ServiceError::Conflict);
530    }
531    create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
532    tx.commit().await?;
533    Ok(())
534}
535
536/// Mark inactive unpublished review_records abandoned after the configured TTL.
537pub async fn abandon_stale_challenge_review_records(pool: &PgPool, ttl_days: i64) -> Result<i64> {
538    let result = sqlx::query(
539        r#"
540        UPDATE challenge_review_records
541        SET status = 'abandoned',
542            validation_message = COALESCE(validation_message, 'review record abandoned due to inactivity'),
543            updated_at = NOW()
544        WHERE status IN ('pending_review', 'validated', 'approved')
545          AND updated_at < NOW() - ($1::TEXT || ' days')::INTERVAL
546        "#,
547    )
548    .bind(ttl_days)
549    .execute(pool)
550    .await?;
551
552    i64::try_from(result.rows_affected()).map_err(|_| {
553        ServiceError::Internal("abandoned review record count exceeds supported range".to_string())
554    })
555}
556
557/// List private assets eligible for cleanup because their review_record did not publish.
558pub async fn list_unpublished_private_assets_for_purge(
559    pool: &PgPool,
560    grace_days: i64,
561) -> Result<Vec<ChallengePrivateAssetPurgeRecord>> {
562    let rows = sqlx::query(
563        r#"
564        SELECT a.id, a.storage_key, a.temporary_storage_key
565        FROM challenge_private_assets a
566        JOIN challenge_review_records d ON d.id = a.review_record_id
567        WHERE d.status IN ('abandoned', 'rejected')
568          AND d.updated_at < NOW() - ($1::TEXT || ' days')::INTERVAL
569          AND a.status IN ('pending', 'active', 'failed', 'purging')
570        ORDER BY a.created_at ASC
571        "#,
572    )
573    .bind(grace_days)
574    .fetch_all(pool)
575    .await?;
576
577    rows.into_iter()
578        .map(|row| {
579            Ok(ChallengePrivateAssetPurgeRecord {
580                id: challenge_private_asset_id_from_row(&row, "id")?,
581                storage_key: storage_key_from_row(&row, "storage_key")?,
582                temporary_storage_key: optional_storage_key_from_row(
583                    &row,
584                    "temporary_storage_key",
585                )?,
586            })
587        })
588        .collect()
589}
590
591/// Mark a purge-eligible private asset as purging before deleting storage.
592pub async fn mark_challenge_private_asset_purging(
593    pool: &PgPool,
594    asset_row_id: &ChallengePrivateAssetId,
595) -> Result<Option<ChallengePrivateAssetPurgeRecord>> {
596    let row = sqlx::query(
597        r#"
598        UPDATE challenge_private_assets
599        SET status = 'purging'
600        WHERE id = $1::uuid
601          AND status IN ('pending', 'active', 'failed', 'purging')
602        RETURNING id, storage_key, temporary_storage_key
603        "#,
604    )
605    .bind(asset_row_id.as_str())
606    .fetch_optional(pool)
607    .await?;
608
609    row.map(|row| {
610        Ok(ChallengePrivateAssetPurgeRecord {
611            id: challenge_private_asset_id_from_row(&row, "id")?,
612            storage_key: storage_key_from_row(&row, "storage_key")?,
613            temporary_storage_key: optional_storage_key_from_row(&row, "temporary_storage_key")?,
614        })
615    })
616    .transpose()
617}
618
619/// Delete a private asset record after its object has been removed.
620pub async fn delete_challenge_private_asset(
621    pool: &PgPool,
622    asset_row_id: &ChallengePrivateAssetId,
623) -> Result<()> {
624    sqlx::query(
625        r#"
626        WITH deleted AS (
627            DELETE FROM challenge_private_assets
628            WHERE id = $1::uuid
629            RETURNING review_record_id
630        )
631        UPDATE challenge_review_records d
632        SET updated_at = NOW()
633        WHERE d.id IN (SELECT review_record_id FROM deleted)
634        "#,
635    )
636    .bind(asset_row_id.as_str())
637    .execute(pool)
638    .await?;
639
640    Ok(())
641}