1use serde_json::Value;
2use sqlx::{PgPool, Postgres, Row, Transaction};
3
4use agentics_config::WorkerAccelerators;
5use agentics_domain::models::challenge::{ChallengeBundleSpec, TargetAccelerator};
6use agentics_domain::models::evaluation::{
7 EvaluationJobPayload, ScoringMode, SolutionArtifactMetadata,
8};
9use agentics_domain::models::hashes::Sha256Digest;
10use agentics_domain::models::ids::{EvaluationJobId, SolutionSubmissionId};
11use agentics_domain::models::names::{ChallengeName, TargetName};
12use agentics_error::{Result, ServiceError};
13
14use super::evaluation_policy::{
15 ensure_challenge_supports_eval_type_tx, ensure_validation_uses_public_bundle,
16};
17use super::ids::{
18 agent_id_from_row, challenge_name_from_row, evaluation_job_id_from_row,
19 solution_submission_id_from_row, target_from_row,
20};
21use super::leaderboard::repair_leaderboard_entry_for_solution_submission_tx;
22
23#[derive(Debug, Clone)]
25pub struct EvaluationJobRecord {
26 pub id: EvaluationJobId,
27 pub solution_submission_id: SolutionSubmissionId,
28 pub challenge_name: ChallengeName,
29 pub target: TargetName,
30 pub required_accelerator: TargetAccelerator,
31 pub eval_type: ScoringMode,
32 pub status: String,
33 pub attempt_count: i32,
34 pub payload: EvaluationJobPayload,
35 pub artifact_metadata: Option<SolutionArtifactMetadata>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct RunnerJobClaimRecord {
41 pub status: String,
42 pub worker_id: Option<String>,
43 pub attempt_count: i32,
44 pub claim_is_fresh: bool,
45}
46
47pub async fn claim_next_evaluation_job(
52 pool: &PgPool,
53 worker_id: &str,
54 worker_accelerators: WorkerAccelerators,
55) -> Result<Option<EvaluationJobRecord>> {
56 let mut tx = pool.begin().await?;
57
58 let row = sqlx::query(
59 r#"
60 WITH next_job AS (
61 SELECT id, solution_submission_id
62 FROM evaluation_jobs
63 WHERE status = 'queued'
64 AND scheduled_at <= NOW()
65 AND attempt_count < max_attempts
66 AND (required_accelerator = 'none' OR ($2 AND required_accelerator = 'gpu'))
67 ORDER BY priority DESC, scheduled_at ASC
68 FOR UPDATE SKIP LOCKED
69 LIMIT 1
70 )
71 UPDATE evaluation_jobs j
72 SET status = 'running', claimed_at = NOW(), worker_id = $1, attempt_count = j.attempt_count + 1
73 FROM next_job
74 JOIN solution_submissions s ON s.id = next_job.solution_submission_id
75 WHERE j.id = next_job.id
76 RETURNING
77 j.id, j.solution_submission_id, j.challenge_name, j.target, j.required_accelerator,
78 j.eval_type, j.status, j.attempt_count, j.payload_json,
79 s.artifact_zip_bytes, s.artifact_uncompressed_bytes, s.artifact_file_count, s.artifact_sha256
80 "#
81 )
82 .bind(worker_id)
83 .bind(worker_accelerators.supports(TargetAccelerator::Gpu))
84 .fetch_optional(&mut *tx)
85 .await?;
86
87 let Some(r) = row else {
88 tx.commit().await?;
89 return Ok(None);
90 };
91
92 let eval_type_raw: String = r.try_get("eval_type")?;
93 let eval_type = ScoringMode::from_storage_value(&eval_type_raw).ok_or_else(|| {
94 ServiceError::Internal(format!("unexpected evaluation job type `{eval_type_raw}`"))
95 })?;
96 let solution_submission_id = solution_submission_id_from_row(&r, "solution_submission_id")?;
97
98 sqlx::query(
99 r#"
100 UPDATE solution_submissions
101 SET status = 'running', updated_at = NOW()
102 WHERE id = $1::uuid
103 AND visible_after_eval = FALSE
104 "#,
105 )
106 .bind(solution_submission_id.as_str())
107 .execute(&mut *tx)
108 .await?;
109
110 let payload: EvaluationJobPayload = serde_json::from_value(r.try_get("payload_json")?)
111 .map_err(|e| ServiceError::Internal(e.to_string()))?;
112
113 tx.commit().await?;
114
115 Ok(Some(EvaluationJobRecord {
116 id: evaluation_job_id_from_row(&r, "id")?,
117 solution_submission_id,
118 challenge_name: payload.challenge_name.clone(),
119 target: target_from_row(&r, "target")?,
120 required_accelerator: required_accelerator_from_row(&r, "required_accelerator")?,
121 eval_type,
122 status: r.try_get("status")?,
123 attempt_count: r.try_get("attempt_count")?,
124 payload,
125 artifact_metadata: artifact_metadata_from_row(&r)?,
126 }))
127}
128
129pub async fn get_runner_job_claim(
131 pool: &PgPool,
132 job_id: &EvaluationJobId,
133 stale_minutes: i32,
134) -> Result<Option<RunnerJobClaimRecord>> {
135 let row: Option<(String, Option<String>, i32, bool)> = sqlx::query_as(
136 r#"
137 SELECT
138 status,
139 worker_id,
140 attempt_count,
141 claimed_at IS NOT NULL
142 AND claimed_at >= NOW() - INTERVAL '1 minute' * $2 AS claim_is_fresh
143 FROM evaluation_jobs
144 WHERE id = $1::uuid
145 "#,
146 )
147 .bind(job_id.as_str())
148 .bind(stale_minutes.max(1))
149 .fetch_optional(pool)
150 .await?;
151
152 Ok(row.map(
153 |(status, worker_id, attempt_count, claim_is_fresh)| RunnerJobClaimRecord {
154 status,
155 worker_id,
156 attempt_count,
157 claim_is_fresh,
158 },
159 ))
160}
161
162pub async fn refresh_evaluation_job_claim(
164 pool: &PgPool,
165 job_id: &EvaluationJobId,
166 worker_id: &str,
167 attempt_count: i32,
168) -> Result<bool> {
169 let result = sqlx::query(
170 r#"
171 UPDATE evaluation_jobs
172 SET claimed_at = NOW()
173 WHERE id = $1::uuid
174 AND worker_id = $2
175 AND attempt_count = $3
176 AND status = 'running'
177 "#,
178 )
179 .bind(job_id.as_str())
180 .bind(worker_id)
181 .bind(attempt_count)
182 .execute(pool)
183 .await?;
184
185 Ok(result.rows_affected() > 0)
186}
187
188pub async fn requeue_running_evaluation_job_for_capacity(
193 pool: &PgPool,
194 job_id: &EvaluationJobId,
195 worker_id: &str,
196 attempt_count: i32,
197 last_error: &str,
198) -> Result<bool> {
199 let mut tx = pool.begin().await?;
200 let Some(solution_submission_id) =
201 requeue_claimed_job_for_capacity(&mut tx, job_id, worker_id, attempt_count, last_error)
202 .await?
203 else {
204 tx.commit().await?;
205 return Ok(false);
206 };
207
208 delete_running_evaluation_for_job(&mut tx, job_id).await?;
209 repair_submission_after_capacity_requeue(&mut tx, &solution_submission_id).await?;
210 tx.commit().await?;
211 Ok(true)
212}
213
214async fn requeue_claimed_job_for_capacity(
215 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
216 job_id: &EvaluationJobId,
217 worker_id: &str,
218 attempt_count: i32,
219 last_error: &str,
220) -> Result<Option<SolutionSubmissionId>> {
221 let row = sqlx::query(
222 r#"
223 UPDATE evaluation_jobs
224 SET status = 'queued',
225 worker_id = NULL,
226 claimed_at = NULL,
227 scheduled_at = NOW() + INTERVAL '5 seconds',
228 attempt_count = GREATEST(attempt_count - 1, 0),
229 last_error = $4
230 WHERE id = $1::uuid
231 AND status = 'running'
232 AND worker_id = $2
233 AND attempt_count = $3
234 RETURNING solution_submission_id
235 "#,
236 )
237 .bind(job_id.as_str())
238 .bind(worker_id)
239 .bind(attempt_count)
240 .bind(last_error)
241 .fetch_optional(&mut **tx)
242 .await?;
243
244 row.map(|row| solution_submission_id_from_row(&row, "solution_submission_id"))
245 .transpose()
246}
247
248async fn delete_running_evaluation_for_job(
249 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
250 job_id: &EvaluationJobId,
251) -> Result<()> {
252 sqlx::query("DELETE FROM evaluations WHERE job_id = $1::uuid AND status = 'running'")
253 .bind(job_id.as_str())
254 .execute(&mut **tx)
255 .await?;
256 Ok(())
257}
258
259async fn repair_submission_after_capacity_requeue(
260 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
261 solution_submission_id: &SolutionSubmissionId,
262) -> Result<()> {
263 let visible_after_eval = sqlx::query_scalar::<_, bool>(
264 "SELECT visible_after_eval FROM solution_submissions WHERE id = $1::uuid FOR UPDATE",
265 )
266 .bind(solution_submission_id.as_str())
267 .fetch_one(&mut **tx)
268 .await?;
269 if !visible_after_eval {
270 sqlx::query(
271 "UPDATE solution_submissions SET status = 'queued', visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
272 )
273 .bind(solution_submission_id.as_str())
274 .execute(&mut **tx)
275 .await?;
276 }
277 Ok(())
278}
279
280pub async fn mark_evaluation_job_ready(pool: &PgPool, job_id: &EvaluationJobId) -> Result<()> {
282 let mut tx = pool.begin().await?;
283 let row = sqlx::query(
284 r#"
285 UPDATE evaluation_jobs
286 SET status = 'queued', scheduled_at = NOW()
287 WHERE id = $1::uuid
288 AND status = 'staged'
289 RETURNING solution_submission_id
290 "#,
291 )
292 .bind(job_id.as_str())
293 .fetch_optional(&mut *tx)
294 .await?;
295
296 let Some(row) = row else {
297 return Err(ServiceError::Internal(format!(
298 "staged evaluation job `{job_id}` is not staged"
299 )));
300 };
301 let solution_submission_id = solution_submission_id_from_row(&row, "solution_submission_id")?;
302
303 sqlx::query(
304 r#"
305 UPDATE solution_submissions
306 SET status = 'queued', updated_at = NOW()
307 WHERE id = $1::uuid
308 AND status = 'pending'
309 "#,
310 )
311 .bind(solution_submission_id.as_str())
312 .execute(&mut *tx)
313 .await?;
314
315 tx.commit().await?;
316 Ok(())
317}
318
319#[derive(Debug, Clone)]
321pub struct QueueEvaluationJobInput {
322 pub job_id: EvaluationJobId,
323 pub solution_submission_id: SolutionSubmissionId,
324 pub eval_type: ScoringMode,
325 pub max_active_official_jobs: Option<i64>,
326}
327
328pub async fn queue_evaluation_job(
334 pool: &PgPool,
335 input: &QueueEvaluationJobInput,
336) -> Result<EvaluationJobRecord> {
337 let mut tx = pool.begin().await?;
338
339 let row = sqlx::query(
340 r#"
341 SELECT s.id, s.challenge_name, s.target, s.agent_id::text AS agent_id, s.artifact_key, s.visible_after_eval,
342 s.artifact_zip_bytes, s.artifact_uncompressed_bytes, s.artifact_file_count, s.artifact_sha256,
343 p.bundle_key, p.public_bundle_key, p.spec_json
344 FROM solution_submissions s
345 JOIN challenges p ON p.challenge_name = s.challenge_name
346 WHERE s.id = $1::uuid
347 AND p.status = 'active'
348 AND p.spec_json IS NOT NULL
349 LIMIT 1
350 FOR UPDATE OF s, p
351 "#,
352 )
353 .bind(input.solution_submission_id.as_str())
354 .fetch_one(&mut *tx)
355 .await
356 .map_err(|_| ServiceError::NotFound)?;
357 let was_visible: bool = row.try_get("visible_after_eval")?;
358
359 let spec_json: Value = row.try_get("spec_json")?;
360 let spec: ChallengeBundleSpec =
361 serde_json::from_value(spec_json).map_err(|e| ServiceError::Internal(e.to_string()))?;
362
363 let target = target_from_row(&row, "target")?;
364 let challenge_name = challenge_name_from_row(&row, "challenge_name")?;
365 ensure_challenge_supports_eval_type_tx(
366 &mut tx,
367 &challenge_name,
368 &spec,
369 &target,
370 input.eval_type,
371 &agent_id_from_row(&row, "agent_id")?,
372 )
373 .await?;
374 let bundle_key = storage_key_from_row(&row, "bundle_key")?;
375 let public_bundle_key = storage_key_from_row(&row, "public_bundle_key")?;
376 ensure_validation_uses_public_bundle(input.eval_type, &spec, &bundle_key, &public_bundle_key)?;
377 ensure_no_active_job_for_submission_tx(&mut tx, &input.solution_submission_id).await?;
378
379 let payload = serde_json::to_value(EvaluationJobPayload {
380 artifact_key: storage_key_from_row(&row, "artifact_key")?,
381 bundle_key,
382 public_bundle_key,
383 challenge_name: challenge_name.clone(),
384 target: target.clone(),
385 })
386 .map_err(|e| ServiceError::Internal(e.to_string()))?;
387
388 let eval_type_str = input.eval_type.as_str();
389 let required_accelerator = required_accelerator_for_target(&spec, &target)?;
390 let priority = if input.eval_type == ScoringMode::Official {
391 if let Some(max_active) = input.max_active_official_jobs {
392 lock_quota_scope(&mut tx, "global:official-active").await?;
393 let active = count_active_evaluation_jobs_tx(&mut tx, ScoringMode::Official).await?;
394 if active >= max_active {
395 return Err(ServiceError::TooManyRequests(format!(
396 "official evaluation queue is full: {active} of {max_active} official jobs are staged, queued, or running"
397 )));
398 }
399 }
400 10
401 } else {
402 0
403 };
404
405 sqlx::query(
406 r#"
407 INSERT INTO evaluation_jobs (id, solution_submission_id, challenge_name, target, required_accelerator, eval_type, status, priority, payload_json)
408 VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, 'queued', $7, $8)
409 "#
410 )
411 .bind(input.job_id.as_str())
412 .bind(input.solution_submission_id.as_str())
413 .bind(challenge_name.as_str())
414 .bind(target.as_str())
415 .bind(required_accelerator.as_str())
416 .bind(eval_type_str)
417 .bind(priority)
418 .bind(&payload)
419 .execute(&mut *tx)
420 .await
421 .map_err(map_active_job_conflict)?;
422
423 if input.eval_type == ScoringMode::Official && was_visible {
424 sqlx::query("UPDATE solution_submissions SET updated_at = NOW() WHERE id = $1::uuid")
425 .bind(input.solution_submission_id.as_str())
426 .execute(&mut *tx)
427 .await?;
428 } else {
429 sqlx::query(
430 "UPDATE solution_submissions SET status = 'queued', visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
431 )
432 .bind(input.solution_submission_id.as_str())
433 .execute(&mut *tx)
434 .await?;
435 repair_leaderboard_entry_for_solution_submission_tx(&mut tx, &input.solution_submission_id)
436 .await?;
437 }
438
439 tx.commit().await?;
440
441 Ok(EvaluationJobRecord {
442 id: input.job_id.clone(),
443 solution_submission_id: solution_submission_id_from_row(&row, "id")?,
444 challenge_name,
445 target,
446 required_accelerator,
447 eval_type: input.eval_type,
448 status: "queued".to_string(),
449 attempt_count: 0,
450 payload: serde_json::from_value(payload)
451 .map_err(|e| ServiceError::Internal(e.to_string()))?,
452 artifact_metadata: artifact_metadata_from_row(&row)?,
453 })
454}
455
456fn storage_key_from_row(
458 row: &sqlx::postgres::PgRow,
459 column: &str,
460) -> Result<agentics_domain::storage::StorageKey> {
461 let value: String = row.try_get(column)?;
462 agentics_domain::storage::StorageKey::try_new(&value).map_err(|e| {
463 ServiceError::Internal(format!("stored invalid storage key in `{column}`: {e}"))
464 })
465}
466
467fn required_accelerator_from_row(
469 row: &sqlx::postgres::PgRow,
470 column: &str,
471) -> Result<TargetAccelerator> {
472 let value: String = row.try_get(column)?;
473 TargetAccelerator::from_storage_value(&value).ok_or_else(|| {
474 ServiceError::Internal(format!(
475 "stored invalid required accelerator `{value}` in `{column}`"
476 ))
477 })
478}
479
480fn required_accelerator_for_target(
482 spec: &ChallengeBundleSpec,
483 target: &TargetName,
484) -> Result<TargetAccelerator> {
485 let target_spec = spec.target(target).ok_or_else(|| {
486 ServiceError::Internal(format!(
487 "challenge `{}` does not declare target `{target}` after admission validation",
488 spec.challenge_name
489 ))
490 })?;
491 Ok(target_spec.accelerator)
492}
493
494fn artifact_metadata_from_row(
496 row: &sqlx::postgres::PgRow,
497) -> Result<Option<SolutionArtifactMetadata>> {
498 let artifact_zip_bytes = optional_u64_from_row(row, "artifact_zip_bytes")?;
499 let artifact_uncompressed_bytes = optional_u64_from_row(row, "artifact_uncompressed_bytes")?;
500 let artifact_file_count = optional_u64_from_row(row, "artifact_file_count")?;
501 let artifact_sha256: Option<String> = row.try_get("artifact_sha256")?;
502 match (
503 artifact_zip_bytes,
504 artifact_uncompressed_bytes,
505 artifact_file_count,
506 artifact_sha256,
507 ) {
508 (None, None, None, None) => Ok(None),
509 (
510 Some(artifact_zip_bytes),
511 Some(artifact_uncompressed_bytes),
512 Some(artifact_file_count),
513 Some(artifact_sha256),
514 ) => {
515 let artifact_sha256 = Sha256Digest::try_new(&artifact_sha256).map_err(|e| {
516 ServiceError::Internal(format!("stored invalid artifact_sha256: {e}"))
517 })?;
518 Ok(Some(SolutionArtifactMetadata {
519 artifact_zip_bytes,
520 artifact_uncompressed_bytes,
521 artifact_file_count,
522 artifact_sha256,
523 }))
524 }
525 _ => Err(ServiceError::Internal(
526 "stored partial solution artifact metadata".to_string(),
527 )),
528 }
529}
530
531fn optional_u64_from_row(row: &sqlx::postgres::PgRow, column: &str) -> Result<Option<u64>> {
533 let value: Option<i64> = row.try_get(column)?;
534 value
535 .map(|value| {
536 u64::try_from(value)
537 .map_err(|_| ServiceError::Internal(format!("stored negative value in `{column}`")))
538 })
539 .transpose()
540}
541
542fn map_active_job_conflict(error: sqlx::Error) -> ServiceError {
544 match error {
545 sqlx::Error::Database(db_err)
546 if db_err.constraint().is_some_and(|constraint| {
547 constraint == "idx_evaluation_jobs_one_active_per_submission"
548 || constraint == "idx_evaluation_jobs_one_active_per_submission_mode"
549 }) =>
550 {
551 super::DbWorkflowError::AdmissionConflict(
552 "one active evaluation job already exists for this submission".to_string(),
553 )
554 .into()
555 }
556 other => super::DbWorkflowError::Sql(other).into(),
557 }
558}
559
560async fn lock_quota_scope(tx: &mut Transaction<'_, Postgres>, scope: &str) -> Result<()> {
562 sqlx::query(
563 r#"
564 INSERT INTO quota_admission_locks (scope)
565 VALUES ($1)
566 ON CONFLICT (scope) DO NOTHING
567 "#,
568 )
569 .bind(scope)
570 .execute(&mut **tx)
571 .await?;
572
573 sqlx::query(
574 r#"
575 SELECT scope
576 FROM quota_admission_locks
577 WHERE scope = $1
578 FOR UPDATE
579 "#,
580 )
581 .bind(scope)
582 .fetch_one(&mut **tx)
583 .await?;
584
585 Ok(())
586}
587
588async fn count_active_evaluation_jobs_tx(
590 tx: &mut Transaction<'_, Postgres>,
591 eval_type: ScoringMode,
592) -> Result<i64> {
593 let count = sqlx::query_scalar::<_, i64>(
594 r#"
595 SELECT COUNT(*)::BIGINT
596 FROM evaluation_jobs
597 WHERE eval_type = $1
598 AND status IN ('staged', 'queued', 'running')
599 "#,
600 )
601 .bind(eval_type.as_str())
602 .fetch_one(&mut **tx)
603 .await?;
604
605 Ok(count)
606}
607
608async fn ensure_no_active_job_for_submission_tx(
610 tx: &mut Transaction<'_, Postgres>,
611 solution_submission_id: &SolutionSubmissionId,
612) -> Result<()> {
613 let active = sqlx::query_scalar::<_, bool>(
614 r#"
615 SELECT EXISTS (
616 SELECT 1
617 FROM evaluation_jobs
618 WHERE solution_submission_id = $1::uuid
619 AND status IN ('staged', 'queued', 'running')
620 )
621 "#,
622 )
623 .bind(solution_submission_id.as_str())
624 .fetch_one(&mut **tx)
625 .await?;
626 if active {
627 return Err(ServiceError::Conflict);
628 }
629 Ok(())
630}
631
632pub async fn count_active_evaluation_jobs(pool: &PgPool, eval_type: ScoringMode) -> Result<i64> {
634 let count = sqlx::query_scalar::<_, i64>(
635 r#"
636 SELECT COUNT(*)::BIGINT
637 FROM evaluation_jobs
638 WHERE eval_type = $1
639 AND status IN ('staged', 'queued', 'running')
640 "#,
641 )
642 .bind(eval_type.as_str())
643 .fetch_one(pool)
644 .await?;
645
646 Ok(count)
647}