agentics-persistence 0.3.0

Database persistence layer for the Agentics challenge platform.
Documentation
use sqlx::{PgPool, Postgres, Row, Transaction};

use super::rows::{
    ChallengePrivateAssetRecord, review_record_status_from_row, row_to_private_asset_record,
};
use super::{
    CreateChallengeReviewRecordAuditEventInput, clear_stale_active_validation_tx,
    create_challenge_review_audit_event_tx, lock_quota_scope,
};
use crate::db::ids::human_id_from_row;
use agentics_domain::models::challenge_creation::ChallengeReviewRecordStatus;
use agentics_domain::models::ids::{ChallengePrivateAssetId, ChallengeReviewAuditEventId, HumanId};
use agentics_domain::storage::StorageKey;
use agentics_error::{Result, ServiceError};

use super::CreateChallengePrivateAssetInput;

/// Reserve a pending private benchmark asset row before storage writes begin.
pub async fn reserve_challenge_private_asset(
    pool: &PgPool,
    input: &CreateChallengePrivateAssetInput,
    max_bytes_per_review_record: u64,
    validation_timeout_minutes: i32,
    pending_timeout_minutes: i32,
) -> Result<ChallengePrivateAssetRecord> {
    let max_bytes_per_review_record = i64::try_from(max_bytes_per_review_record).map_err(|_| {
        ServiceError::Internal("private asset quota limit exceeds supported range".to_string())
    })?;
    let mut tx = pool.begin().await?;
    let scope = format!(
        "challenge-review-record:{}:private-assets",
        input.review_record_id
    );
    lock_quota_scope(&mut tx, &scope).await?;
    auto_fail_stale_pending_private_assets_tx(
        &mut tx,
        input.review_record_id.as_str(),
        pending_timeout_minutes,
    )
    .await?;
    ensure_private_asset_upload_allowed_tx(&mut tx, input, validation_timeout_minutes).await?;

    let existing_bytes =
        sum_private_asset_bytes_for_review_record_tx(&mut tx, input.review_record_id.as_str())
            .await?;
    let next_total = existing_bytes
        .checked_add(input.size_bytes)
        .ok_or_else(|| ServiceError::BadRequest("private asset size overflow".to_string()))?;
    if next_total > max_bytes_per_review_record {
        return Err(ServiceError::TooManyRequests(format!(
            "private asset quota exceeded for review record `{}`: {} of {} bytes would be used",
            input.review_record_id, next_total, max_bytes_per_review_record
        )));
    }

    let row = sqlx::query(
        r#"
        INSERT INTO challenge_private_assets (
            id,
            review_record_id,
            asset_name,
            kind,
            required,
            size_bytes,
            sha256,
            storage_key,
            temporary_storage_key,
            status,
            uploader_human_id
        )
        VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, 'pending', $10::uuid)
        RETURNING *
        "#,
    )
    .bind(input.asset_row_id.as_str())
    .bind(input.review_record_id.as_str())
    .bind(input.asset_name.as_str())
    .bind(input.kind.as_str())
    .bind(input.required)
    .bind(input.size_bytes)
    .bind(input.sha256.to_string())
    .bind(input.storage_key.as_str())
    .bind(input.temporary_storage_key.as_str())
    .bind(input.uploader_human_id.as_str())
    .fetch_one(&mut *tx)
    .await?;

    sqlx::query(
        r#"
        UPDATE challenge_review_records
        SET status = CASE WHEN status = 'validated' THEN 'pending_review' ELSE status END,
            validation_message = CASE WHEN status = 'validated' THEN NULL ELSE validation_message END,
            validation_repository_path = CASE WHEN status = 'validated' THEN NULL ELSE validation_repository_path END,
            validation_bundle_sha256 = CASE WHEN status = 'validated' THEN NULL ELSE validation_bundle_sha256 END,
            approved_bundle_sha256 = CASE WHEN status = 'validated' THEN NULL ELSE approved_bundle_sha256 END,
            updated_at = NOW()
        WHERE id = $1::uuid
          AND status IN ('pending_review', 'validated')
        "#,
    )
    .bind(input.review_record_id.as_str())
    .execute(&mut *tx)
    .await?;

    let response = row_to_private_asset_record(row)?;
    tx.commit().await?;
    Ok(response)
}

/// Mark a pending private asset active after its object was promoted.
pub async fn activate_challenge_private_asset(
    pool: &PgPool,
    asset_row_id: &ChallengePrivateAssetId,
) -> Result<ChallengePrivateAssetRecord> {
    let mut tx = pool.begin().await?;
    let response = activate_challenge_private_asset_tx(&mut tx, asset_row_id).await?;
    tx.commit().await?;
    Ok(response)
}

/// Mark a pending private asset active and audit the activation atomically.
pub async fn activate_challenge_private_asset_with_audit(
    pool: &PgPool,
    asset_row_id: &ChallengePrivateAssetId,
    audit_event_id: ChallengeReviewAuditEventId,
    actor_human_id: &HumanId,
) -> Result<ChallengePrivateAssetRecord> {
    let mut tx = pool.begin().await?;
    let response = activate_challenge_private_asset_tx(&mut tx, asset_row_id).await?;
    create_challenge_review_audit_event_tx(
        &mut tx,
        &CreateChallengeReviewRecordAuditEventInput {
            event_id: audit_event_id,
            review_record_id: response.review_record_id.clone(),
            actor_human_id: Some(actor_human_id.clone()),
            actor_admin_service_token_id: None,
            actor_display: None,
            action: "private_asset_uploaded".to_string(),
            message: "private benchmark asset uploaded".to_string(),
            metadata: serde_json::json!({
                "asset_name": &response.asset_name,
                "kind": response.kind,
                "size_bytes": response.size_bytes,
                "sha256": &response.sha256
            }),
        },
    )
    .await?;
    tx.commit().await?;
    Ok(response)
}

/// Mark a pending private asset active inside an existing transaction.
async fn activate_challenge_private_asset_tx(
    tx: &mut Transaction<'_, Postgres>,
    asset_row_id: &ChallengePrivateAssetId,
) -> Result<ChallengePrivateAssetRecord> {
    let row = sqlx::query(
        r#"
        UPDATE challenge_private_assets
        SET status = 'active',
            temporary_storage_key = NULL,
            activated_at = NOW(),
            failed_at = NULL,
            failure_message = NULL
        WHERE id = $1::uuid
          AND status = 'pending'
        RETURNING *
    "#,
    )
    .bind(asset_row_id.as_str())
    .fetch_optional(&mut **tx)
    .await?;
    let Some(row) = row else {
        return Err(ServiceError::Conflict);
    };
    sqlx::query(
        r#"
        UPDATE challenge_review_records d
        SET updated_at = NOW()
        FROM challenge_private_assets a
        WHERE a.id = $1::uuid
          AND d.id = a.review_record_id
    "#,
    )
    .bind(asset_row_id.as_str())
    .execute(&mut **tx)
    .await?;
    row_to_private_asset_record(row)
}

/// Mark a pending private asset failed after storage write or promote failed.
pub async fn fail_challenge_private_asset(
    pool: &PgPool,
    asset_row_id: &ChallengePrivateAssetId,
    message: &str,
) -> Result<()> {
    sqlx::query(
        r#"
        WITH failed AS (
            UPDATE challenge_private_assets
            SET status = 'failed',
                failed_at = NOW(),
                failure_message = $2
            WHERE id = $1::uuid
              AND status = 'pending'
            RETURNING review_record_id
        )
        UPDATE challenge_review_records d
        SET updated_at = NOW()
        WHERE d.id IN (SELECT review_record_id FROM failed)
        "#,
    )
    .bind(asset_row_id.as_str())
    .bind(message)
    .execute(pool)
    .await?;
    Ok(())
}

/// Return whether a private asset storage key is owned by an active asset row.
pub async fn private_asset_storage_key_has_active_reference(
    pool: &PgPool,
    storage_key: &StorageKey,
) -> Result<bool> {
    let exists = sqlx::query_scalar::<_, bool>(
        r#"
        SELECT EXISTS (
            SELECT 1
            FROM challenge_private_assets
            WHERE storage_key = $1
              AND status = 'active'
        )
        "#,
    )
    .bind(storage_key.as_str())
    .fetch_one(pool)
    .await?;
    Ok(exists)
}

/// Lock a review_record row and confirm private assets may still be attached.
async fn ensure_private_asset_upload_allowed_tx(
    tx: &mut Transaction<'_, Postgres>,
    input: &CreateChallengePrivateAssetInput,
    validation_timeout_minutes: i32,
) -> Result<()> {
    let row = sqlx::query(
        r#"
        SELECT status, creator_human_id, active_validation_record_id::text AS active_validation_record_id
        FROM challenge_review_records
        WHERE id = $1::uuid
        FOR UPDATE
        "#,
    )
    .bind(input.review_record_id.as_str())
    .fetch_optional(&mut **tx)
    .await?;
    let Some(row) = row else {
        return Err(ServiceError::NotFound);
    };

    let creator_human_id = human_id_from_row(&row, "creator_human_id")?;
    if creator_human_id != input.uploader_human_id {
        return Err(ServiceError::NotFound);
    }
    if row
        .try_get::<Option<String>, _>("active_validation_record_id")?
        .is_some()
    {
        clear_stale_active_validation_tx(
            tx,
            input.review_record_id.as_str(),
            validation_timeout_minutes,
        )
        .await?;
        let active_validation_record_id: Option<String> = sqlx::query_scalar(
            "SELECT active_validation_record_id::text FROM challenge_review_records WHERE id = $1::uuid",
        )
        .bind(input.review_record_id.as_str())
        .fetch_one(&mut **tx)
        .await?;
        if active_validation_record_id.is_some() {
            return Err(ServiceError::Conflict);
        }
    }
    let status = review_record_status_from_row(&row, "status")?;
    if !matches!(
        status,
        ChallengeReviewRecordStatus::PendingReview | ChallengeReviewRecordStatus::Validated
    ) {
        return Err(ServiceError::Conflict);
    }

    Ok(())
}

/// Fail stale pending private assets before retrying the same asset name.
async fn auto_fail_stale_pending_private_assets_tx(
    tx: &mut Transaction<'_, Postgres>,
    review_record_id: &str,
    timeout_minutes: i32,
) -> Result<()> {
    sqlx::query(
        r#"
        WITH failed AS (
            UPDATE challenge_private_assets
            SET status = 'failed',
                failed_at = NOW(),
                failure_message = 'private asset pending lease expired'
            WHERE review_record_id = $1::uuid
              AND status = 'pending'
              AND created_at < NOW() - INTERVAL '1 minute' * $2
            RETURNING review_record_id
        )
        UPDATE challenge_review_records d
        SET updated_at = NOW()
        WHERE d.id IN (SELECT review_record_id FROM failed)
        "#,
    )
    .bind(review_record_id)
    .bind(timeout_minutes.max(1))
    .execute(&mut **tx)
    .await?;
    Ok(())
}

/// Handles sum private asset bytes for review_record tx for this module.
async fn sum_private_asset_bytes_for_review_record_tx(
    tx: &mut Transaction<'_, Postgres>,
    review_record_id: &str,
) -> Result<i64> {
    let bytes = sqlx::query_scalar::<_, i64>(
        r#"
        SELECT COALESCE(SUM(size_bytes), 0)::BIGINT
        FROM challenge_private_assets
        WHERE review_record_id = $1::uuid
          AND status IN ('pending', 'active')
        "#,
    )
    .bind(review_record_id)
    .fetch_one(&mut **tx)
    .await?;

    Ok(bytes)
}