use sqlx::{PgPool, Postgres, Row, Transaction};
use super::rows::{
ChallengePrivateAssetRecord, review_record_status_from_row, row_to_private_asset_record,
};
use super::{
CreateChallengeReviewRecordAuditEventInput, clear_stale_active_validation_tx,
create_challenge_review_audit_event_tx, lock_quota_scope,
};
use crate::db::ids::human_id_from_row;
use agentics_domain::models::challenge_creation::ChallengeReviewRecordStatus;
use agentics_domain::models::ids::{ChallengePrivateAssetId, ChallengeReviewAuditEventId, HumanId};
use agentics_domain::storage::StorageKey;
use agentics_error::{Result, ServiceError};
use super::CreateChallengePrivateAssetInput;
pub async fn reserve_challenge_private_asset(
pool: &PgPool,
input: &CreateChallengePrivateAssetInput,
max_bytes_per_review_record: u64,
validation_timeout_minutes: i32,
pending_timeout_minutes: i32,
) -> Result<ChallengePrivateAssetRecord> {
let max_bytes_per_review_record = i64::try_from(max_bytes_per_review_record).map_err(|_| {
ServiceError::Internal("private asset quota limit exceeds supported range".to_string())
})?;
let mut tx = pool.begin().await?;
let scope = format!(
"challenge-review-record:{}:private-assets",
input.review_record_id
);
lock_quota_scope(&mut tx, &scope).await?;
auto_fail_stale_pending_private_assets_tx(
&mut tx,
input.review_record_id.as_str(),
pending_timeout_minutes,
)
.await?;
ensure_private_asset_upload_allowed_tx(&mut tx, input, validation_timeout_minutes).await?;
let existing_bytes =
sum_private_asset_bytes_for_review_record_tx(&mut tx, input.review_record_id.as_str())
.await?;
let next_total = existing_bytes
.checked_add(input.size_bytes)
.ok_or_else(|| ServiceError::BadRequest("private asset size overflow".to_string()))?;
if next_total > max_bytes_per_review_record {
return Err(ServiceError::TooManyRequests(format!(
"private asset quota exceeded for review record `{}`: {} of {} bytes would be used",
input.review_record_id, next_total, max_bytes_per_review_record
)));
}
let row = sqlx::query(
r#"
INSERT INTO challenge_private_assets (
id,
review_record_id,
asset_name,
kind,
required,
size_bytes,
sha256,
storage_key,
temporary_storage_key,
status,
uploader_human_id
)
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, 'pending', $10::uuid)
RETURNING *
"#,
)
.bind(input.asset_row_id.as_str())
.bind(input.review_record_id.as_str())
.bind(input.asset_name.as_str())
.bind(input.kind.as_str())
.bind(input.required)
.bind(input.size_bytes)
.bind(input.sha256.to_string())
.bind(input.storage_key.as_str())
.bind(input.temporary_storage_key.as_str())
.bind(input.uploader_human_id.as_str())
.fetch_one(&mut *tx)
.await?;
sqlx::query(
r#"
UPDATE challenge_review_records
SET status = CASE WHEN status = 'validated' THEN 'pending_review' ELSE status END,
validation_message = CASE WHEN status = 'validated' THEN NULL ELSE validation_message END,
validation_repository_path = CASE WHEN status = 'validated' THEN NULL ELSE validation_repository_path END,
validation_bundle_sha256 = CASE WHEN status = 'validated' THEN NULL ELSE validation_bundle_sha256 END,
approved_bundle_sha256 = CASE WHEN status = 'validated' THEN NULL ELSE approved_bundle_sha256 END,
updated_at = NOW()
WHERE id = $1::uuid
AND status IN ('pending_review', 'validated')
"#,
)
.bind(input.review_record_id.as_str())
.execute(&mut *tx)
.await?;
let response = row_to_private_asset_record(row)?;
tx.commit().await?;
Ok(response)
}
pub async fn activate_challenge_private_asset(
pool: &PgPool,
asset_row_id: &ChallengePrivateAssetId,
) -> Result<ChallengePrivateAssetRecord> {
let mut tx = pool.begin().await?;
let response = activate_challenge_private_asset_tx(&mut tx, asset_row_id).await?;
tx.commit().await?;
Ok(response)
}
pub async fn activate_challenge_private_asset_with_audit(
pool: &PgPool,
asset_row_id: &ChallengePrivateAssetId,
audit_event_id: ChallengeReviewAuditEventId,
actor_human_id: &HumanId,
) -> Result<ChallengePrivateAssetRecord> {
let mut tx = pool.begin().await?;
let response = activate_challenge_private_asset_tx(&mut tx, asset_row_id).await?;
create_challenge_review_audit_event_tx(
&mut tx,
&CreateChallengeReviewRecordAuditEventInput {
event_id: audit_event_id,
review_record_id: response.review_record_id.clone(),
actor_human_id: Some(actor_human_id.clone()),
actor_admin_service_token_id: None,
actor_display: None,
action: "private_asset_uploaded".to_string(),
message: "private benchmark asset uploaded".to_string(),
metadata: serde_json::json!({
"asset_name": &response.asset_name,
"kind": response.kind,
"size_bytes": response.size_bytes,
"sha256": &response.sha256
}),
},
)
.await?;
tx.commit().await?;
Ok(response)
}
async fn activate_challenge_private_asset_tx(
tx: &mut Transaction<'_, Postgres>,
asset_row_id: &ChallengePrivateAssetId,
) -> Result<ChallengePrivateAssetRecord> {
let row = sqlx::query(
r#"
UPDATE challenge_private_assets
SET status = 'active',
temporary_storage_key = NULL,
activated_at = NOW(),
failed_at = NULL,
failure_message = NULL
WHERE id = $1::uuid
AND status = 'pending'
RETURNING *
"#,
)
.bind(asset_row_id.as_str())
.fetch_optional(&mut **tx)
.await?;
let Some(row) = row else {
return Err(ServiceError::Conflict);
};
sqlx::query(
r#"
UPDATE challenge_review_records d
SET updated_at = NOW()
FROM challenge_private_assets a
WHERE a.id = $1::uuid
AND d.id = a.review_record_id
"#,
)
.bind(asset_row_id.as_str())
.execute(&mut **tx)
.await?;
row_to_private_asset_record(row)
}
pub async fn fail_challenge_private_asset(
pool: &PgPool,
asset_row_id: &ChallengePrivateAssetId,
message: &str,
) -> Result<()> {
sqlx::query(
r#"
WITH failed AS (
UPDATE challenge_private_assets
SET status = 'failed',
failed_at = NOW(),
failure_message = $2
WHERE id = $1::uuid
AND status = 'pending'
RETURNING review_record_id
)
UPDATE challenge_review_records d
SET updated_at = NOW()
WHERE d.id IN (SELECT review_record_id FROM failed)
"#,
)
.bind(asset_row_id.as_str())
.bind(message)
.execute(pool)
.await?;
Ok(())
}
pub async fn private_asset_storage_key_has_active_reference(
pool: &PgPool,
storage_key: &StorageKey,
) -> Result<bool> {
let exists = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS (
SELECT 1
FROM challenge_private_assets
WHERE storage_key = $1
AND status = 'active'
)
"#,
)
.bind(storage_key.as_str())
.fetch_one(pool)
.await?;
Ok(exists)
}
async fn ensure_private_asset_upload_allowed_tx(
tx: &mut Transaction<'_, Postgres>,
input: &CreateChallengePrivateAssetInput,
validation_timeout_minutes: i32,
) -> Result<()> {
let row = sqlx::query(
r#"
SELECT status, creator_human_id, active_validation_record_id::text AS active_validation_record_id
FROM challenge_review_records
WHERE id = $1::uuid
FOR UPDATE
"#,
)
.bind(input.review_record_id.as_str())
.fetch_optional(&mut **tx)
.await?;
let Some(row) = row else {
return Err(ServiceError::NotFound);
};
let creator_human_id = human_id_from_row(&row, "creator_human_id")?;
if creator_human_id != input.uploader_human_id {
return Err(ServiceError::NotFound);
}
if row
.try_get::<Option<String>, _>("active_validation_record_id")?
.is_some()
{
clear_stale_active_validation_tx(
tx,
input.review_record_id.as_str(),
validation_timeout_minutes,
)
.await?;
let active_validation_record_id: Option<String> = sqlx::query_scalar(
"SELECT active_validation_record_id::text FROM challenge_review_records WHERE id = $1::uuid",
)
.bind(input.review_record_id.as_str())
.fetch_one(&mut **tx)
.await?;
if active_validation_record_id.is_some() {
return Err(ServiceError::Conflict);
}
}
let status = review_record_status_from_row(&row, "status")?;
if !matches!(
status,
ChallengeReviewRecordStatus::PendingReview | ChallengeReviewRecordStatus::Validated
) {
return Err(ServiceError::Conflict);
}
Ok(())
}
async fn auto_fail_stale_pending_private_assets_tx(
tx: &mut Transaction<'_, Postgres>,
review_record_id: &str,
timeout_minutes: i32,
) -> Result<()> {
sqlx::query(
r#"
WITH failed AS (
UPDATE challenge_private_assets
SET status = 'failed',
failed_at = NOW(),
failure_message = 'private asset pending lease expired'
WHERE review_record_id = $1::uuid
AND status = 'pending'
AND created_at < NOW() - INTERVAL '1 minute' * $2
RETURNING review_record_id
)
UPDATE challenge_review_records d
SET updated_at = NOW()
WHERE d.id IN (SELECT review_record_id FROM failed)
"#,
)
.bind(review_record_id)
.bind(timeout_minutes.max(1))
.execute(&mut **tx)
.await?;
Ok(())
}
async fn sum_private_asset_bytes_for_review_record_tx(
tx: &mut Transaction<'_, Postgres>,
review_record_id: &str,
) -> Result<i64> {
let bytes = sqlx::query_scalar::<_, i64>(
r#"
SELECT COALESCE(SUM(size_bytes), 0)::BIGINT
FROM challenge_private_assets
WHERE review_record_id = $1::uuid
AND status IN ('pending', 'active')
"#,
)
.bind(review_record_id)
.fetch_one(&mut **tx)
.await?;
Ok(bytes)
}