1use sqlx::{PgPool, Postgres, Row, Transaction};
4
5use agentics_domain::models::auth::GithubUserId;
6use agentics_domain::models::challenge_creation::{
7 ChallengeCreationManifest, ChallengePrivateAssetKind, ChallengeReviewRecordStatus,
8};
9use agentics_domain::models::github::GithubPullRequestNumber;
10use agentics_domain::models::hashes::GitCommitSha;
11use agentics_domain::models::hashes::Sha256Digest;
12use agentics_domain::models::ids::{ChallengePrivateAssetId, ChallengeReviewRecordId, HumanId};
13use agentics_domain::models::names::AssetName;
14use agentics_domain::models::paths::RepoRelativePath;
15use agentics_domain::models::urls::{GithubPullRequestUrl, GithubRepoRemote};
16use agentics_domain::storage::StorageKey;
17use agentics_error::{Result, ServiceError};
18
19mod assets;
20mod audit;
21mod publishing;
22mod rows;
23mod validation;
24
25pub use assets::{
26 activate_challenge_private_asset, activate_challenge_private_asset_with_audit,
27 fail_challenge_private_asset, private_asset_storage_key_has_active_reference,
28 reserve_challenge_private_asset,
29};
30pub use audit::CreateChallengeReviewRecordAuditEventInput;
31use audit::create_challenge_review_audit_event_tx;
32pub use publishing::{
33 ClaimedChallengeReviewRecordForPublish, PublishArchiveChallengeReviewRecordInput,
34 PublishNewChallengeReviewRecordInput, claim_challenge_review_record_for_publish,
35 fail_challenge_review_record_publish, mark_challenge_review_record_published,
36 publish_archive_challenge_review_record, publish_new_challenge_review_record,
37};
38pub use rows::{
39 AdminChallengePrivateAssetRecord, ChallengePrivateAssetRecord, ChallengeReviewRecordRecord,
40 ChallengeReviewValidationRecord, list_challenge_private_asset_states,
41};
42use rows::{
43 list_private_assets_for_review_record, list_validation_records_for_review_record,
44 optional_storage_key_from_row, row_to_review_record, storage_key_from_row,
45};
46pub use validation::{
47 BeginChallengeReviewRecordValidationInput, FinishChallengeReviewRecordValidationInput,
48 begin_challenge_review_record_validation, finish_challenge_review_record_validation,
49};
50
51use super::ids::{challenge_private_asset_id_from_row, challenge_review_record_id_from_row};
52
53#[derive(Debug, Clone)]
55pub struct CreateChallengeReviewRecordInput {
56 pub review_record_id: ChallengeReviewRecordId,
57 pub creator_human_id: HumanId,
58 pub max_active_review_records: i64,
59 pub creator_github_user_id: GithubUserId,
60 pub creator_github_login: String,
61 pub repo_url: GithubRepoRemote,
62 pub pr_number: GithubPullRequestNumber,
63 pub pr_url: GithubPullRequestUrl,
64 pub commit_sha: GitCommitSha,
65 pub challenge_path: RepoRelativePath,
66 pub manifest_sha256: Sha256Digest,
67 pub manifest: ChallengeCreationManifest,
68}
69
70#[derive(Debug, Clone)]
72pub struct CreateChallengePrivateAssetInput {
73 pub asset_row_id: ChallengePrivateAssetId,
74 pub review_record_id: ChallengeReviewRecordId,
75 pub asset_name: AssetName,
76 pub kind: ChallengePrivateAssetKind,
77 pub required: bool,
78 pub size_bytes: i64,
79 pub sha256: Sha256Digest,
80 pub storage_key: StorageKey,
81 pub temporary_storage_key: StorageKey,
82 pub uploader_human_id: HumanId,
83}
84
85#[derive(Debug, Clone)]
87pub struct ChallengePrivateAssetPurgeRecord {
88 pub id: ChallengePrivateAssetId,
89 pub storage_key: StorageKey,
90 pub temporary_storage_key: Option<StorageKey>,
91}
92
93pub async fn create_challenge_review_record(
95 pool: &PgPool,
96 input: &CreateChallengeReviewRecordInput,
97 audit_event: &CreateChallengeReviewRecordAuditEventInput,
98) -> Result<ChallengeReviewRecordRecord> {
99 if audit_event.review_record_id != input.review_record_id {
100 return Err(ServiceError::Internal(
101 "review record creation audit event targets a different review record".to_string(),
102 ));
103 }
104 let manifest_json =
105 serde_json::to_value(&input.manifest).map_err(|e| ServiceError::Internal(e.to_string()))?;
106 let mut tx = pool.begin().await?;
107 let scope = format!("challenge-review-records:human:{}", input.creator_human_id);
108 lock_quota_scope(&mut tx, &scope).await?;
109 let active_review_records =
110 count_active_challenge_review_records_for_human_tx(&mut tx, &input.creator_human_id)
111 .await?;
112 if active_review_records >= input.max_active_review_records {
113 return Err(ServiceError::TooManyRequests(format!(
114 "challenge review record quota exceeded: {active_review_records} of {} active review records are already open",
115 input.max_active_review_records
116 )));
117 }
118 let row = sqlx::query(
119 r#"
120 INSERT INTO challenge_review_records (
121 id,
122 challenge_name,
123 request_kind,
124 status,
125 creator_human_id,
126 creator_github_user_id,
127 creator_github_login,
128 repo_url,
129 repo_key,
130 pr_number,
131 pr_url,
132 commit_sha,
133 challenge_path,
134 manifest_sha256,
135 manifest_json
136 )
137 VALUES ($1::uuid, $2, $3, 'pending_review', $4::uuid, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
138 RETURNING *
139 "#,
140 )
141 .bind(input.review_record_id.as_str())
142 .bind(input.manifest.challenge_name.as_str())
143 .bind(input.manifest.request.as_str())
144 .bind(input.creator_human_id.as_str())
145 .bind(input.creator_github_user_id.as_i64())
146 .bind(&input.creator_github_login)
147 .bind(input.repo_url.as_str())
148 .bind(input.repo_url.repository_key().as_str())
149 .bind(input.pr_number.as_i32().map_err(|e| {
150 ServiceError::Internal(format!(
151 "invalid typed pull request number reached database: {e}"
152 ))
153 })?)
154 .bind(input.pr_url.as_str())
155 .bind(input.commit_sha.to_string())
156 .bind(input.challenge_path.as_str())
157 .bind(input.manifest_sha256.to_string())
158 .bind(&manifest_json)
159 .fetch_one(&mut *tx)
160 .await?;
161
162 create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
163 tx.commit().await?;
164
165 row_to_review_record(row, Vec::new(), Vec::new())
166}
167
168pub async fn get_challenge_review_record(
170 pool: &PgPool,
171 review_record_id: &ChallengeReviewRecordId,
172) -> Result<Option<ChallengeReviewRecordRecord>> {
173 let row = sqlx::query("SELECT * FROM challenge_review_records WHERE id = $1::uuid")
174 .bind(review_record_id.as_str())
175 .fetch_optional(pool)
176 .await?;
177
178 let Some(row) = row else {
179 return Ok(None);
180 };
181
182 let assets = list_private_assets_for_review_record(pool, review_record_id).await?;
183 let validation_records =
184 list_validation_records_for_review_record(pool, review_record_id).await?;
185 row_to_review_record(row, assets, validation_records).map(Some)
186}
187
188pub async fn list_challenge_review_records(
190 pool: &PgPool,
191 limit: i64,
192) -> Result<Vec<ChallengeReviewRecordRecord>> {
193 let rows = sqlx::query(
194 r#"
195 SELECT *
196 FROM challenge_review_records
197 ORDER BY updated_at DESC, created_at DESC
198 LIMIT $1
199 "#,
200 )
201 .bind(limit)
202 .fetch_all(pool)
203 .await?;
204
205 let mut review_records = Vec::with_capacity(rows.len());
206 for row in rows {
207 let review_record_id = challenge_review_record_id_from_row(&row, "id")?;
208 let assets = list_private_assets_for_review_record(pool, &review_record_id).await?;
209 let validation_records =
210 list_validation_records_for_review_record(pool, &review_record_id).await?;
211 review_records.push(row_to_review_record(row, assets, validation_records)?);
212 }
213 Ok(review_records)
214}
215
216async fn count_active_challenge_review_records_for_human_tx(
218 tx: &mut Transaction<'_, Postgres>,
219 human_id: &HumanId,
220) -> Result<i64> {
221 let count = sqlx::query_scalar::<_, i64>(
222 r#"
223 SELECT COUNT(*)::BIGINT
224 FROM challenge_review_records
225 WHERE creator_human_id = $1::uuid
226 AND status IN ('pending_review', 'validated', 'approved', 'publishing')
227 "#,
228 )
229 .bind(human_id.as_str())
230 .fetch_one(&mut **tx)
231 .await?;
232
233 Ok(count)
234}
235
236pub(super) async fn clear_stale_active_validation_tx(
238 tx: &mut Transaction<'_, Postgres>,
239 review_record_id: &str,
240 timeout_minutes: i32,
241) -> Result<()> {
242 let stale_validation_id: Option<String> = sqlx::query_scalar(
243 r#"
244 SELECT v.id::text
245 FROM challenge_review_records d
246 JOIN challenge_review_validation_records v ON v.id = d.active_validation_record_id
247 WHERE d.id = $1::uuid
248 AND (
249 v.status <> 'running'
250 OR v.created_at < NOW() - INTERVAL '1 minute' * $2
251 )
252 "#,
253 )
254 .bind(review_record_id)
255 .bind(timeout_minutes.max(1))
256 .fetch_optional(&mut **tx)
257 .await?;
258 let Some(stale_validation_id) = stale_validation_id else {
259 return Ok(());
260 };
261
262 sqlx::query(
263 r#"
264 UPDATE challenge_review_validation_records
265 SET status = 'failed',
266 message = 'challenge review record validation lease expired'
267 WHERE id = $1::uuid
268 AND status = 'running'
269 "#,
270 )
271 .bind(&stale_validation_id)
272 .execute(&mut **tx)
273 .await?;
274
275 sqlx::query(
276 r#"
277 UPDATE challenge_review_records
278 SET active_validation_record_id = NULL,
279 validation_message = 'challenge review record validation lease expired',
280 updated_at = NOW()
281 WHERE id = $1::uuid
282 AND active_validation_record_id = $2::uuid
283 "#,
284 )
285 .bind(review_record_id)
286 .bind(&stale_validation_id)
287 .execute(&mut **tx)
288 .await?;
289
290 Ok(())
291}
292
293pub(super) async fn lock_quota_scope(
295 tx: &mut Transaction<'_, Postgres>,
296 scope: &str,
297) -> Result<()> {
298 sqlx::query(
299 r#"
300 INSERT INTO quota_admission_locks (scope)
301 VALUES ($1)
302 ON CONFLICT (scope) DO NOTHING
303 "#,
304 )
305 .bind(scope)
306 .execute(&mut **tx)
307 .await?;
308
309 sqlx::query(
310 r#"
311 SELECT scope
312 FROM quota_admission_locks
313 WHERE scope = $1
314 FOR UPDATE
315 "#,
316 )
317 .bind(scope)
318 .fetch_one(&mut **tx)
319 .await?;
320
321 Ok(())
322}
323
324pub async fn approve_validated_challenge_review_record(
326 pool: &PgPool,
327 review_record_id: &str,
328 message: Option<&str>,
329) -> Result<()> {
330 let result = sqlx::query(
331 r#"
332 UPDATE challenge_review_records
333 SET status = 'approved',
334 validation_message = COALESCE($2, validation_message),
335 approved_bundle_sha256 = validation_bundle_sha256,
336 updated_at = NOW()
337 WHERE id = $1::uuid
338 AND status = 'validated'
339 AND validation_bundle_sha256 IS NOT NULL
340 AND active_validation_record_id IS NULL
341 "#,
342 )
343 .bind(review_record_id)
344 .bind(message)
345 .execute(pool)
346 .await?;
347
348 if result.rows_affected() == 0 {
349 return Err(ServiceError::Conflict);
350 }
351 Ok(())
352}
353
354pub async fn approve_validated_challenge_review_record_with_audit(
356 pool: &PgPool,
357 review_record_id: &ChallengeReviewRecordId,
358 expected_validation_bundle_sha256: &Sha256Digest,
359 message: Option<&str>,
360 audit_event: &CreateChallengeReviewRecordAuditEventInput,
361) -> Result<()> {
362 if audit_event.review_record_id != *review_record_id {
363 return Err(ServiceError::Internal(
364 "review record approval audit event targets a different review record".to_string(),
365 ));
366 }
367 let mut tx = pool.begin().await?;
368 let row = sqlx::query(
369 r#"
370 UPDATE challenge_review_records
371 SET status = 'approved',
372 validation_message = COALESCE($2, validation_message),
373 approved_bundle_sha256 = validation_bundle_sha256,
374 updated_at = NOW()
375 WHERE id = $1::uuid
376 AND status = 'validated'
377 AND validation_bundle_sha256 IS NOT NULL
378 AND validation_bundle_sha256 = $3
379 AND active_validation_record_id IS NULL
380 RETURNING approved_bundle_sha256
381 "#,
382 )
383 .bind(review_record_id.as_str())
384 .bind(message)
385 .bind(expected_validation_bundle_sha256.to_string())
386 .fetch_optional(&mut *tx)
387 .await?;
388
389 let Some(row) = row else {
390 return Err(ServiceError::Conflict);
391 };
392 let approved_bundle_sha256: Option<String> = row.try_get("approved_bundle_sha256")?;
393 create_challenge_review_audit_event_tx(
394 &mut tx,
395 &CreateChallengeReviewRecordAuditEventInput {
396 event_id: audit_event.event_id.clone(),
397 review_record_id: review_record_id.clone(),
398 actor_human_id: audit_event.actor_human_id.clone(),
399 actor_admin_service_token_id: audit_event.actor_admin_service_token_id.clone(),
400 actor_display: audit_event.actor_display.clone(),
401 action: "review_record_approved".to_string(),
402 message: message.unwrap_or_default().to_string(),
403 metadata: serde_json::json!({ "approved_bundle_sha256": approved_bundle_sha256 }),
404 },
405 )
406 .await?;
407
408 tx.commit().await?;
409 Ok(())
410}
411
412pub async fn update_challenge_review_record_status(
414 pool: &PgPool,
415 review_record_id: &str,
416 status: ChallengeReviewRecordStatus,
417 message: Option<&str>,
418) -> Result<()> {
419 let result = sqlx::query(
420 r#"
421 UPDATE challenge_review_records
422 SET status = $2,
423 validation_message = COALESCE($3, validation_message),
424 updated_at = NOW()
425 WHERE id = $1::uuid
426 AND status IN ('pending_review', 'validated', 'approved')
427 AND active_validation_record_id IS NULL
428 "#,
429 )
430 .bind(review_record_id)
431 .bind(status.as_str())
432 .bind(message)
433 .execute(pool)
434 .await?;
435
436 if result.rows_affected() == 0 {
437 return Err(ServiceError::Conflict);
438 }
439 Ok(())
440}
441
442pub async fn update_challenge_review_record_status_with_audit(
444 pool: &PgPool,
445 review_record_id: &ChallengeReviewRecordId,
446 status: ChallengeReviewRecordStatus,
447 message: Option<&str>,
448 audit_event: &CreateChallengeReviewRecordAuditEventInput,
449) -> Result<()> {
450 let mut tx = pool.begin().await?;
451 let result = sqlx::query(
452 r#"
453 UPDATE challenge_review_records
454 SET status = $2,
455 validation_message = COALESCE($3, validation_message),
456 updated_at = NOW()
457 WHERE id = $1::uuid
458 AND status IN ('pending_review', 'validated', 'approved')
459 AND active_validation_record_id IS NULL
460 "#,
461 )
462 .bind(review_record_id.as_str())
463 .bind(status.as_str())
464 .bind(message)
465 .execute(&mut *tx)
466 .await?;
467
468 if result.rows_affected() == 0 {
469 return Err(ServiceError::Conflict);
470 }
471 create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
472 tx.commit().await?;
473 Ok(())
474}
475
476pub async fn abandon_challenge_review_record(
478 pool: &PgPool,
479 review_record_id: &str,
480 message: Option<&str>,
481) -> Result<()> {
482 let result = sqlx::query(
483 r#"
484 UPDATE challenge_review_records
485 SET status = 'abandoned',
486 validation_message = COALESCE($2, validation_message),
487 updated_at = NOW()
488 WHERE id = $1::uuid
489 AND status IN ('pending_review', 'validated', 'approved', 'rejected')
490 AND active_validation_record_id IS NULL
491 "#,
492 )
493 .bind(review_record_id)
494 .bind(message)
495 .execute(pool)
496 .await?;
497
498 if result.rows_affected() == 0 {
499 return Err(ServiceError::Conflict);
500 }
501 Ok(())
502}
503
504pub async fn abandon_challenge_review_record_with_audit(
506 pool: &PgPool,
507 review_record_id: &ChallengeReviewRecordId,
508 message: Option<&str>,
509 audit_event: &CreateChallengeReviewRecordAuditEventInput,
510) -> Result<()> {
511 let mut tx = pool.begin().await?;
512 let result = sqlx::query(
513 r#"
514 UPDATE challenge_review_records
515 SET status = 'abandoned',
516 validation_message = COALESCE($2, validation_message),
517 updated_at = NOW()
518 WHERE id = $1::uuid
519 AND status IN ('pending_review', 'validated', 'approved', 'rejected')
520 AND active_validation_record_id IS NULL
521 "#,
522 )
523 .bind(review_record_id.as_str())
524 .bind(message)
525 .execute(&mut *tx)
526 .await?;
527
528 if result.rows_affected() == 0 {
529 return Err(ServiceError::Conflict);
530 }
531 create_challenge_review_audit_event_tx(&mut tx, audit_event).await?;
532 tx.commit().await?;
533 Ok(())
534}
535
536pub async fn abandon_stale_challenge_review_records(pool: &PgPool, ttl_days: i64) -> Result<i64> {
538 let result = sqlx::query(
539 r#"
540 UPDATE challenge_review_records
541 SET status = 'abandoned',
542 validation_message = COALESCE(validation_message, 'review record abandoned due to inactivity'),
543 updated_at = NOW()
544 WHERE status IN ('pending_review', 'validated', 'approved')
545 AND updated_at < NOW() - ($1::TEXT || ' days')::INTERVAL
546 "#,
547 )
548 .bind(ttl_days)
549 .execute(pool)
550 .await?;
551
552 i64::try_from(result.rows_affected()).map_err(|_| {
553 ServiceError::Internal("abandoned review record count exceeds supported range".to_string())
554 })
555}
556
557pub async fn list_unpublished_private_assets_for_purge(
559 pool: &PgPool,
560 grace_days: i64,
561) -> Result<Vec<ChallengePrivateAssetPurgeRecord>> {
562 let rows = sqlx::query(
563 r#"
564 SELECT a.id, a.storage_key, a.temporary_storage_key
565 FROM challenge_private_assets a
566 JOIN challenge_review_records d ON d.id = a.review_record_id
567 WHERE d.status IN ('abandoned', 'rejected')
568 AND d.updated_at < NOW() - ($1::TEXT || ' days')::INTERVAL
569 AND a.status IN ('pending', 'active', 'failed', 'purging')
570 ORDER BY a.created_at ASC
571 "#,
572 )
573 .bind(grace_days)
574 .fetch_all(pool)
575 .await?;
576
577 rows.into_iter()
578 .map(|row| {
579 Ok(ChallengePrivateAssetPurgeRecord {
580 id: challenge_private_asset_id_from_row(&row, "id")?,
581 storage_key: storage_key_from_row(&row, "storage_key")?,
582 temporary_storage_key: optional_storage_key_from_row(
583 &row,
584 "temporary_storage_key",
585 )?,
586 })
587 })
588 .collect()
589}
590
591pub async fn mark_challenge_private_asset_purging(
593 pool: &PgPool,
594 asset_row_id: &ChallengePrivateAssetId,
595) -> Result<Option<ChallengePrivateAssetPurgeRecord>> {
596 let row = sqlx::query(
597 r#"
598 UPDATE challenge_private_assets
599 SET status = 'purging'
600 WHERE id = $1::uuid
601 AND status IN ('pending', 'active', 'failed', 'purging')
602 RETURNING id, storage_key, temporary_storage_key
603 "#,
604 )
605 .bind(asset_row_id.as_str())
606 .fetch_optional(pool)
607 .await?;
608
609 row.map(|row| {
610 Ok(ChallengePrivateAssetPurgeRecord {
611 id: challenge_private_asset_id_from_row(&row, "id")?,
612 storage_key: storage_key_from_row(&row, "storage_key")?,
613 temporary_storage_key: optional_storage_key_from_row(&row, "temporary_storage_key")?,
614 })
615 })
616 .transpose()
617}
618
619pub async fn delete_challenge_private_asset(
621 pool: &PgPool,
622 asset_row_id: &ChallengePrivateAssetId,
623) -> Result<()> {
624 sqlx::query(
625 r#"
626 WITH deleted AS (
627 DELETE FROM challenge_private_assets
628 WHERE id = $1::uuid
629 RETURNING review_record_id
630 )
631 UPDATE challenge_review_records d
632 SET updated_at = NOW()
633 WHERE d.id IN (SELECT review_record_id FROM deleted)
634 "#,
635 )
636 .bind(asset_row_id.as_str())
637 .execute(pool)
638 .await?;
639
640 Ok(())
641}