Skip to main content

agentics_persistence/db/
evaluation_jobs.rs

1use serde_json::Value;
2use sqlx::{PgPool, Postgres, Row, Transaction};
3
4use agentics_config::WorkerAccelerators;
5use agentics_domain::models::challenge::{ChallengeBundleSpec, TargetAccelerator};
6use agentics_domain::models::evaluation::{
7    EvaluationJobPayload, ScoringMode, SolutionArtifactMetadata,
8};
9use agentics_domain::models::hashes::Sha256Digest;
10use agentics_domain::models::ids::{EvaluationJobId, SolutionSubmissionId};
11use agentics_domain::models::names::{ChallengeName, TargetName};
12use agentics_error::{Result, ServiceError};
13
14use super::evaluation_policy::{
15    ensure_challenge_supports_eval_type_tx, ensure_validation_uses_public_bundle,
16};
17use super::ids::{
18    agent_id_from_row, challenge_name_from_row, evaluation_job_id_from_row,
19    solution_submission_id_from_row, target_from_row,
20};
21use super::leaderboard::repair_leaderboard_entry_for_solution_submission_tx;
22
23/// Claimed or queued evaluation job with parsed runner payload.
24#[derive(Debug, Clone)]
25pub struct EvaluationJobRecord {
26    pub id: EvaluationJobId,
27    pub solution_submission_id: SolutionSubmissionId,
28    pub challenge_name: ChallengeName,
29    pub target: TargetName,
30    pub required_accelerator: TargetAccelerator,
31    pub eval_type: ScoringMode,
32    pub status: String,
33    pub attempt_count: i32,
34    pub payload: EvaluationJobPayload,
35    pub artifact_metadata: Option<SolutionArtifactMetadata>,
36}
37
38/// Current durable claim state for runner container reconciliation.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct RunnerJobClaimRecord {
41    pub status: String,
42    pub worker_id: Option<String>,
43    pub attempt_count: i32,
44    pub claim_is_fresh: bool,
45}
46
47/// Claim the next queued job using `FOR UPDATE SKIP LOCKED`.
48///
49/// Claimed jobs move their solution submission into `running` so public visibility can be
50/// controlled consistently by the completion path for each evaluation mode.
51pub async fn claim_next_evaluation_job(
52    pool: &PgPool,
53    worker_id: &str,
54    worker_accelerators: WorkerAccelerators,
55) -> Result<Option<EvaluationJobRecord>> {
56    let mut tx = pool.begin().await?;
57
58    let row = sqlx::query(
59        r#"
60        WITH next_job AS (
61            SELECT id, solution_submission_id
62            FROM evaluation_jobs
63            WHERE status = 'queued'
64              AND scheduled_at <= NOW()
65              AND attempt_count < max_attempts
66              AND (required_accelerator = 'none' OR ($2 AND required_accelerator = 'gpu'))
67            ORDER BY priority DESC, scheduled_at ASC
68            FOR UPDATE SKIP LOCKED
69            LIMIT 1
70        )
71        UPDATE evaluation_jobs j
72        SET status = 'running', claimed_at = NOW(), worker_id = $1, attempt_count = j.attempt_count + 1
73        FROM next_job
74        JOIN solution_submissions s ON s.id = next_job.solution_submission_id
75        WHERE j.id = next_job.id
76        RETURNING
77            j.id, j.solution_submission_id, j.challenge_name, j.target, j.required_accelerator,
78            j.eval_type, j.status, j.attempt_count, j.payload_json,
79            s.artifact_zip_bytes, s.artifact_uncompressed_bytes, s.artifact_file_count, s.artifact_sha256
80        "#
81    )
82    .bind(worker_id)
83    .bind(worker_accelerators.supports(TargetAccelerator::Gpu))
84    .fetch_optional(&mut *tx)
85    .await?;
86
87    let Some(r) = row else {
88        tx.commit().await?;
89        return Ok(None);
90    };
91
92    let eval_type_raw: String = r.try_get("eval_type")?;
93    let eval_type = ScoringMode::from_storage_value(&eval_type_raw).ok_or_else(|| {
94        ServiceError::Internal(format!("unexpected evaluation job type `{eval_type_raw}`"))
95    })?;
96    let solution_submission_id = solution_submission_id_from_row(&r, "solution_submission_id")?;
97
98    sqlx::query(
99        r#"
100        UPDATE solution_submissions
101        SET status = 'running', updated_at = NOW()
102        WHERE id = $1::uuid
103          AND visible_after_eval = FALSE
104        "#,
105    )
106    .bind(solution_submission_id.as_str())
107    .execute(&mut *tx)
108    .await?;
109
110    let payload: EvaluationJobPayload = serde_json::from_value(r.try_get("payload_json")?)
111        .map_err(|e| ServiceError::Internal(e.to_string()))?;
112
113    tx.commit().await?;
114
115    Ok(Some(EvaluationJobRecord {
116        id: evaluation_job_id_from_row(&r, "id")?,
117        solution_submission_id,
118        challenge_name: payload.challenge_name.clone(),
119        target: target_from_row(&r, "target")?,
120        required_accelerator: required_accelerator_from_row(&r, "required_accelerator")?,
121        eval_type,
122        status: r.try_get("status")?,
123        attempt_count: r.try_get("attempt_count")?,
124        payload,
125        artifact_metadata: artifact_metadata_from_row(&r)?,
126    }))
127}
128
129/// Load the durable job claim corresponding to one runner container label set.
130pub async fn get_runner_job_claim(
131    pool: &PgPool,
132    job_id: &EvaluationJobId,
133    stale_minutes: i32,
134) -> Result<Option<RunnerJobClaimRecord>> {
135    let row: Option<(String, Option<String>, i32, bool)> = sqlx::query_as(
136        r#"
137        SELECT
138            status,
139            worker_id,
140            attempt_count,
141            claimed_at IS NOT NULL
142              AND claimed_at >= NOW() - INTERVAL '1 minute' * $2 AS claim_is_fresh
143        FROM evaluation_jobs
144        WHERE id = $1::uuid
145        "#,
146    )
147    .bind(job_id.as_str())
148    .bind(stale_minutes.max(1))
149    .fetch_optional(pool)
150    .await?;
151
152    Ok(row.map(
153        |(status, worker_id, attempt_count, claim_is_fresh)| RunnerJobClaimRecord {
154            status,
155            worker_id,
156            attempt_count,
157            claim_is_fresh,
158        },
159    ))
160}
161
162/// Refresh a running job lease owned by one worker.
163pub async fn refresh_evaluation_job_claim(
164    pool: &PgPool,
165    job_id: &EvaluationJobId,
166    worker_id: &str,
167    attempt_count: i32,
168) -> Result<bool> {
169    let result = sqlx::query(
170        r#"
171        UPDATE evaluation_jobs
172        SET claimed_at = NOW()
173        WHERE id = $1::uuid
174          AND worker_id = $2
175          AND attempt_count = $3
176          AND status = 'running'
177        "#,
178    )
179    .bind(job_id.as_str())
180    .bind(worker_id)
181    .bind(attempt_count)
182    .execute(pool)
183    .await?;
184
185    Ok(result.rows_affected() > 0)
186}
187
188/// Requeue a running job when platform capacity is temporarily unavailable.
189///
190/// Capacity requeues do not consume an evaluation attempt because participant
191/// code did not run to completion.
192pub async fn requeue_running_evaluation_job_for_capacity(
193    pool: &PgPool,
194    job_id: &EvaluationJobId,
195    worker_id: &str,
196    attempt_count: i32,
197    last_error: &str,
198) -> Result<bool> {
199    let mut tx = pool.begin().await?;
200    let Some(solution_submission_id) =
201        requeue_claimed_job_for_capacity(&mut tx, job_id, worker_id, attempt_count, last_error)
202            .await?
203    else {
204        tx.commit().await?;
205        return Ok(false);
206    };
207
208    delete_running_evaluation_for_job(&mut tx, job_id).await?;
209    repair_submission_after_capacity_requeue(&mut tx, &solution_submission_id).await?;
210    tx.commit().await?;
211    Ok(true)
212}
213
214async fn requeue_claimed_job_for_capacity(
215    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
216    job_id: &EvaluationJobId,
217    worker_id: &str,
218    attempt_count: i32,
219    last_error: &str,
220) -> Result<Option<SolutionSubmissionId>> {
221    let row = sqlx::query(
222        r#"
223        UPDATE evaluation_jobs
224        SET status = 'queued',
225            worker_id = NULL,
226            claimed_at = NULL,
227            scheduled_at = NOW() + INTERVAL '5 seconds',
228            attempt_count = GREATEST(attempt_count - 1, 0),
229            last_error = $4
230        WHERE id = $1::uuid
231          AND status = 'running'
232          AND worker_id = $2
233          AND attempt_count = $3
234        RETURNING solution_submission_id
235        "#,
236    )
237    .bind(job_id.as_str())
238    .bind(worker_id)
239    .bind(attempt_count)
240    .bind(last_error)
241    .fetch_optional(&mut **tx)
242    .await?;
243
244    row.map(|row| solution_submission_id_from_row(&row, "solution_submission_id"))
245        .transpose()
246}
247
248async fn delete_running_evaluation_for_job(
249    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
250    job_id: &EvaluationJobId,
251) -> Result<()> {
252    sqlx::query("DELETE FROM evaluations WHERE job_id = $1::uuid AND status = 'running'")
253        .bind(job_id.as_str())
254        .execute(&mut **tx)
255        .await?;
256    Ok(())
257}
258
259async fn repair_submission_after_capacity_requeue(
260    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
261    solution_submission_id: &SolutionSubmissionId,
262) -> Result<()> {
263    let visible_after_eval = sqlx::query_scalar::<_, bool>(
264        "SELECT visible_after_eval FROM solution_submissions WHERE id = $1::uuid FOR UPDATE",
265    )
266    .bind(solution_submission_id.as_str())
267    .fetch_one(&mut **tx)
268    .await?;
269    if !visible_after_eval {
270        sqlx::query(
271            "UPDATE solution_submissions SET status = 'queued', visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
272        )
273        .bind(solution_submission_id.as_str())
274        .execute(&mut **tx)
275        .await?;
276    }
277    Ok(())
278}
279
280/// Make a staged queued job eligible for worker claiming after its artifact is durable.
281pub async fn mark_evaluation_job_ready(pool: &PgPool, job_id: &EvaluationJobId) -> Result<()> {
282    let mut tx = pool.begin().await?;
283    let row = sqlx::query(
284        r#"
285        UPDATE evaluation_jobs
286        SET status = 'queued', scheduled_at = NOW()
287        WHERE id = $1::uuid
288          AND status = 'staged'
289        RETURNING solution_submission_id
290        "#,
291    )
292    .bind(job_id.as_str())
293    .fetch_optional(&mut *tx)
294    .await?;
295
296    let Some(row) = row else {
297        return Err(ServiceError::Internal(format!(
298            "staged evaluation job `{job_id}` is not staged"
299        )));
300    };
301    let solution_submission_id = solution_submission_id_from_row(&row, "solution_submission_id")?;
302
303    sqlx::query(
304        r#"
305        UPDATE solution_submissions
306        SET status = 'queued', updated_at = NOW()
307        WHERE id = $1::uuid
308          AND status = 'pending'
309        "#,
310    )
311    .bind(solution_submission_id.as_str())
312    .execute(&mut *tx)
313    .await?;
314
315    tx.commit().await?;
316    Ok(())
317}
318
319/// Input for queueing a validation or official re-run.
320#[derive(Debug, Clone)]
321pub struct QueueEvaluationJobInput {
322    pub job_id: EvaluationJobId,
323    pub solution_submission_id: SolutionSubmissionId,
324    pub eval_type: ScoringMode,
325    pub max_active_official_jobs: Option<i64>,
326}
327
328/// Queue an evaluation job for an existing solution submission.
329///
330/// Official jobs are rejected when the challenge does not enable private benchmark data.
331/// Queued official re-runs preserve an already visible official result until a newer
332/// official run succeeds.
333pub async fn queue_evaluation_job(
334    pool: &PgPool,
335    input: &QueueEvaluationJobInput,
336) -> Result<EvaluationJobRecord> {
337    let mut tx = pool.begin().await?;
338
339    let row = sqlx::query(
340        r#"
341        SELECT s.id, s.challenge_name, s.target, s.agent_id::text AS agent_id, s.artifact_key, s.visible_after_eval,
342               s.artifact_zip_bytes, s.artifact_uncompressed_bytes, s.artifact_file_count, s.artifact_sha256,
343               p.bundle_key, p.public_bundle_key, p.spec_json
344        FROM solution_submissions s
345        JOIN challenges p ON p.challenge_name = s.challenge_name
346        WHERE s.id = $1::uuid
347          AND p.status = 'active'
348          AND p.spec_json IS NOT NULL
349        LIMIT 1
350        FOR UPDATE OF s, p
351        "#,
352    )
353    .bind(input.solution_submission_id.as_str())
354    .fetch_one(&mut *tx)
355    .await
356    .map_err(|_| ServiceError::NotFound)?;
357    let was_visible: bool = row.try_get("visible_after_eval")?;
358
359    let spec_json: Value = row.try_get("spec_json")?;
360    let spec: ChallengeBundleSpec =
361        serde_json::from_value(spec_json).map_err(|e| ServiceError::Internal(e.to_string()))?;
362
363    let target = target_from_row(&row, "target")?;
364    let challenge_name = challenge_name_from_row(&row, "challenge_name")?;
365    ensure_challenge_supports_eval_type_tx(
366        &mut tx,
367        &challenge_name,
368        &spec,
369        &target,
370        input.eval_type,
371        &agent_id_from_row(&row, "agent_id")?,
372    )
373    .await?;
374    let bundle_key = storage_key_from_row(&row, "bundle_key")?;
375    let public_bundle_key = storage_key_from_row(&row, "public_bundle_key")?;
376    ensure_validation_uses_public_bundle(input.eval_type, &spec, &bundle_key, &public_bundle_key)?;
377    ensure_no_active_job_for_submission_tx(&mut tx, &input.solution_submission_id).await?;
378
379    let payload = serde_json::to_value(EvaluationJobPayload {
380        artifact_key: storage_key_from_row(&row, "artifact_key")?,
381        bundle_key,
382        public_bundle_key,
383        challenge_name: challenge_name.clone(),
384        target: target.clone(),
385    })
386    .map_err(|e| ServiceError::Internal(e.to_string()))?;
387
388    let eval_type_str = input.eval_type.as_str();
389    let required_accelerator = required_accelerator_for_target(&spec, &target)?;
390    let priority = if input.eval_type == ScoringMode::Official {
391        if let Some(max_active) = input.max_active_official_jobs {
392            lock_quota_scope(&mut tx, "global:official-active").await?;
393            let active = count_active_evaluation_jobs_tx(&mut tx, ScoringMode::Official).await?;
394            if active >= max_active {
395                return Err(ServiceError::TooManyRequests(format!(
396                    "official evaluation queue is full: {active} of {max_active} official jobs are staged, queued, or running"
397                )));
398            }
399        }
400        10
401    } else {
402        0
403    };
404
405    sqlx::query(
406        r#"
407        INSERT INTO evaluation_jobs (id, solution_submission_id, challenge_name, target, required_accelerator, eval_type, status, priority, payload_json)
408        VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, 'queued', $7, $8)
409        "#
410    )
411    .bind(input.job_id.as_str())
412    .bind(input.solution_submission_id.as_str())
413    .bind(challenge_name.as_str())
414    .bind(target.as_str())
415    .bind(required_accelerator.as_str())
416    .bind(eval_type_str)
417    .bind(priority)
418    .bind(&payload)
419    .execute(&mut *tx)
420    .await
421    .map_err(map_active_job_conflict)?;
422
423    if input.eval_type == ScoringMode::Official && was_visible {
424        sqlx::query("UPDATE solution_submissions SET updated_at = NOW() WHERE id = $1::uuid")
425            .bind(input.solution_submission_id.as_str())
426            .execute(&mut *tx)
427            .await?;
428    } else {
429        sqlx::query(
430            "UPDATE solution_submissions SET status = 'queued', visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
431        )
432        .bind(input.solution_submission_id.as_str())
433        .execute(&mut *tx)
434        .await?;
435        repair_leaderboard_entry_for_solution_submission_tx(&mut tx, &input.solution_submission_id)
436            .await?;
437    }
438
439    tx.commit().await?;
440
441    Ok(EvaluationJobRecord {
442        id: input.job_id.clone(),
443        solution_submission_id: solution_submission_id_from_row(&row, "id")?,
444        challenge_name,
445        target,
446        required_accelerator,
447        eval_type: input.eval_type,
448        status: "queued".to_string(),
449        attempt_count: 0,
450        payload: serde_json::from_value(payload)
451            .map_err(|e| ServiceError::Internal(e.to_string()))?,
452        artifact_metadata: artifact_metadata_from_row(&row)?,
453    })
454}
455
456/// Reads storage key from a database row and validates its domain shape.
457fn storage_key_from_row(
458    row: &sqlx::postgres::PgRow,
459    column: &str,
460) -> Result<agentics_domain::storage::StorageKey> {
461    let value: String = row.try_get(column)?;
462    agentics_domain::storage::StorageKey::try_new(&value).map_err(|e| {
463        ServiceError::Internal(format!("stored invalid storage key in `{column}`: {e}"))
464    })
465}
466
467/// Reads required worker accelerator from a database row.
468fn required_accelerator_from_row(
469    row: &sqlx::postgres::PgRow,
470    column: &str,
471) -> Result<TargetAccelerator> {
472    let value: String = row.try_get(column)?;
473    TargetAccelerator::from_storage_value(&value).ok_or_else(|| {
474        ServiceError::Internal(format!(
475            "stored invalid required accelerator `{value}` in `{column}`"
476        ))
477    })
478}
479
480/// Return the accelerator requirement declared by the selected challenge target.
481fn required_accelerator_for_target(
482    spec: &ChallengeBundleSpec,
483    target: &TargetName,
484) -> Result<TargetAccelerator> {
485    let target_spec = spec.target(target).ok_or_else(|| {
486        ServiceError::Internal(format!(
487            "challenge `{}` does not declare target `{target}` after admission validation",
488            spec.challenge_name
489        ))
490    })?;
491    Ok(target_spec.accelerator)
492}
493
494/// Reads optional solution artifact metadata from a database row.
495fn artifact_metadata_from_row(
496    row: &sqlx::postgres::PgRow,
497) -> Result<Option<SolutionArtifactMetadata>> {
498    let artifact_zip_bytes = optional_u64_from_row(row, "artifact_zip_bytes")?;
499    let artifact_uncompressed_bytes = optional_u64_from_row(row, "artifact_uncompressed_bytes")?;
500    let artifact_file_count = optional_u64_from_row(row, "artifact_file_count")?;
501    let artifact_sha256: Option<String> = row.try_get("artifact_sha256")?;
502    match (
503        artifact_zip_bytes,
504        artifact_uncompressed_bytes,
505        artifact_file_count,
506        artifact_sha256,
507    ) {
508        (None, None, None, None) => Ok(None),
509        (
510            Some(artifact_zip_bytes),
511            Some(artifact_uncompressed_bytes),
512            Some(artifact_file_count),
513            Some(artifact_sha256),
514        ) => {
515            let artifact_sha256 = Sha256Digest::try_new(&artifact_sha256).map_err(|e| {
516                ServiceError::Internal(format!("stored invalid artifact_sha256: {e}"))
517            })?;
518            Ok(Some(SolutionArtifactMetadata {
519                artifact_zip_bytes,
520                artifact_uncompressed_bytes,
521                artifact_file_count,
522                artifact_sha256,
523            }))
524        }
525        _ => Err(ServiceError::Internal(
526            "stored partial solution artifact metadata".to_string(),
527        )),
528    }
529}
530
531/// Reads an optional non-negative BIGINT as `u64`.
532fn optional_u64_from_row(row: &sqlx::postgres::PgRow, column: &str) -> Result<Option<u64>> {
533    let value: Option<i64> = row.try_get(column)?;
534    value
535        .map(|value| {
536            u64::try_from(value)
537                .map_err(|_| ServiceError::Internal(format!("stored negative value in `{column}`")))
538        })
539        .transpose()
540}
541
542/// Handles map active job conflict for this module.
543fn map_active_job_conflict(error: sqlx::Error) -> ServiceError {
544    match error {
545        sqlx::Error::Database(db_err)
546            if db_err.constraint().is_some_and(|constraint| {
547                constraint == "idx_evaluation_jobs_one_active_per_submission"
548                    || constraint == "idx_evaluation_jobs_one_active_per_submission_mode"
549            }) =>
550        {
551            super::DbWorkflowError::AdmissionConflict(
552                "one active evaluation job already exists for this submission".to_string(),
553            )
554            .into()
555        }
556        other => super::DbWorkflowError::Sql(other).into(),
557    }
558}
559
560/// Serialize active official-capacity admission through a database lock row.
561async fn lock_quota_scope(tx: &mut Transaction<'_, Postgres>, scope: &str) -> Result<()> {
562    sqlx::query(
563        r#"
564        INSERT INTO quota_admission_locks (scope)
565        VALUES ($1)
566        ON CONFLICT (scope) DO NOTHING
567        "#,
568    )
569    .bind(scope)
570    .execute(&mut **tx)
571    .await?;
572
573    sqlx::query(
574        r#"
575        SELECT scope
576        FROM quota_admission_locks
577        WHERE scope = $1
578        FOR UPDATE
579        "#,
580    )
581    .bind(scope)
582    .fetch_one(&mut **tx)
583    .await?;
584
585    Ok(())
586}
587
588/// Count active capacity reservations for one evaluation type inside a transaction.
589async fn count_active_evaluation_jobs_tx(
590    tx: &mut Transaction<'_, Postgres>,
591    eval_type: ScoringMode,
592) -> Result<i64> {
593    let count = sqlx::query_scalar::<_, i64>(
594        r#"
595        SELECT COUNT(*)::BIGINT
596        FROM evaluation_jobs
597        WHERE eval_type = $1
598          AND status IN ('staged', 'queued', 'running')
599        "#,
600    )
601    .bind(eval_type.as_str())
602    .fetch_one(&mut **tx)
603    .await?;
604
605    Ok(count)
606}
607
608/// Reject queueing when any evaluation mode already reserves this submission.
609async fn ensure_no_active_job_for_submission_tx(
610    tx: &mut Transaction<'_, Postgres>,
611    solution_submission_id: &SolutionSubmissionId,
612) -> Result<()> {
613    let active = sqlx::query_scalar::<_, bool>(
614        r#"
615        SELECT EXISTS (
616            SELECT 1
617            FROM evaluation_jobs
618            WHERE solution_submission_id = $1::uuid
619              AND status IN ('staged', 'queued', 'running')
620        )
621        "#,
622    )
623    .bind(solution_submission_id.as_str())
624    .fetch_one(&mut **tx)
625    .await?;
626    if active {
627        return Err(ServiceError::Conflict);
628    }
629    Ok(())
630}
631
632/// Count jobs that reserve active capacity for one evaluation type.
633pub async fn count_active_evaluation_jobs(pool: &PgPool, eval_type: ScoringMode) -> Result<i64> {
634    let count = sqlx::query_scalar::<_, i64>(
635        r#"
636        SELECT COUNT(*)::BIGINT
637        FROM evaluation_jobs
638        WHERE eval_type = $1
639          AND status IN ('staged', 'queued', 'running')
640        "#,
641    )
642    .bind(eval_type.as_str())
643    .fetch_one(pool)
644    .await?;
645
646    Ok(count)
647}