Skip to main content

agentics_persistence/db/
solution_submissions.rs

1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use sqlx::{PgPool, Postgres, Row, Transaction};
4
5use agentics_domain::models::challenge::{ChallengeBundleSpec, TargetAccelerator};
6use agentics_domain::models::evaluation::{
7    EvaluationDto, EvaluationJobPayload, EvaluationJobStatus, EvaluationStatus, MetricValue,
8    PublicCaseResult, RunMetricResult, ScoringMode, SolutionArtifactMetadata,
9    SolutionSubmissionStatus,
10};
11use agentics_domain::models::hashes::Sha256Digest;
12use agentics_domain::models::ids::{AgentId, EvaluationId, EvaluationJobId, SolutionSubmissionId};
13use agentics_domain::models::names::{ChallengeName, TargetName};
14use agentics_domain::storage::StorageKey;
15use agentics_error::{Result, ServiceError};
16
17use super::evaluation_policy::{
18    ensure_challenge_supports_eval_type_tx, ensure_validation_uses_public_bundle,
19    lock_active_challenge_for_admission_tx,
20};
21use super::ids::{
22    agent_id_from_row, challenge_name_from_row, optional_solution_submission_id_from_row,
23    optional_uuid_string_from_row, solution_submission_id_from_row, target_from_row,
24};
25use super::json::decode_optional_json;
26
27/// Input for creating a solution submission and its initial evaluation job.
28#[derive(Debug, Clone)]
29pub struct CreateSolutionSubmissionInput {
30    pub solution_submission_id: SolutionSubmissionId,
31    pub job_id: EvaluationJobId,
32    pub agent_id: AgentId,
33    pub challenge_name: ChallengeName,
34    pub target: TargetName,
35    pub artifact_key: StorageKey,
36    pub artifact_metadata: SolutionArtifactMetadata,
37    pub note: String,
38    pub eval_type: ScoringMode,
39    pub explanation: String,
40    pub parent_solution_submission_id: Option<SolutionSubmissionId>,
41    pub credit_text: String,
42    pub quota_admission: SolutionSubmissionQuotaAdmission,
43}
44
45/// Admin solution-submission list row before DTO projection.
46#[derive(Debug, Clone)]
47pub struct AdminSolutionSubmissionListItemRecord {
48    pub id: SolutionSubmissionId,
49    pub challenge_name: ChallengeName,
50    pub challenge_title: String,
51    pub target: TargetName,
52    pub agent_id: AgentId,
53    pub agent_display_name: String,
54    pub status: SolutionSubmissionStatus,
55    pub note: String,
56    pub visible_after_eval: bool,
57    pub latest_job_id: Option<EvaluationJobId>,
58    pub latest_job_status: Option<EvaluationJobStatus>,
59    pub latest_job_eval_type: Option<ScoringMode>,
60    pub validation_status: Option<EvaluationStatus>,
61    pub official_status: Option<EvaluationStatus>,
62    pub created_at: DateTime<Utc>,
63    pub updated_at: DateTime<Utc>,
64}
65
66/// Public solution-submission list row before DTO projection/redaction.
67#[derive(Debug, Clone)]
68pub struct PublicSolutionSubmissionListItemRecord {
69    pub id: SolutionSubmissionId,
70    pub challenge_name: ChallengeName,
71    pub target: TargetName,
72    pub challenge_title: String,
73    pub agent_id: AgentId,
74    pub agent_display_name: String,
75    pub status: SolutionSubmissionStatus,
76    pub note: String,
77    pub explanation: String,
78    pub parent_solution_submission_id: Option<SolutionSubmissionId>,
79    pub credit_text: String,
80    pub official_metrics: Vec<MetricValue>,
81    pub created_at: DateTime<Utc>,
82    pub updated_at: DateTime<Utc>,
83}
84
85/// Aggregate public observer counters before transport projection.
86#[derive(Debug, Clone, Copy)]
87pub struct PublicObserverStatsRecord {
88    pub challenge_count: u64,
89    pub agent_count: u64,
90    pub public_completed_submission_count: u64,
91    pub total_solution_attempt_count: u64,
92}
93
94/// Authoritative quota limits applied inside the submission/job transaction.
95#[derive(Debug, Clone, Copy)]
96pub struct SolutionSubmissionQuotaAdmission {
97    pub window_seconds: i64,
98    pub per_agent_challenge_limit: i64,
99    pub challenge_lifetime_limit: Option<i64>,
100    pub max_active_official_jobs: Option<i64>,
101}
102
103/// Solution submission row with optional joined evaluation and job metadata.
104#[derive(Debug, Clone)]
105pub struct SolutionSubmissionRecord {
106    pub id: SolutionSubmissionId,
107    pub challenge_name: ChallengeName,
108    pub target: TargetName,
109    pub agent_id: AgentId,
110    pub agent_display_name: Option<String>,
111    pub challenge_title: Option<String>,
112    pub challenge_spec: ChallengeBundleSpec,
113    pub artifact_key: StorageKey,
114    pub artifact_metadata: Option<SolutionArtifactMetadata>,
115    pub note: String,
116    pub status: String,
117    pub explanation: String,
118    pub parent_solution_submission_id: Option<SolutionSubmissionId>,
119    pub credit_text: String,
120    pub visible_after_eval: bool,
121    pub created_at: DateTime<Utc>,
122    pub updated_at: DateTime<Utc>,
123    pub evaluation_job_id: Option<EvaluationJobId>,
124    pub evaluation_job_status: Option<String>,
125    pub evaluation: Option<EvaluationDto>,
126    pub validation_evaluation: Option<EvaluationDto>,
127    pub official_evaluation: Option<EvaluationDto>,
128}
129
130/// Create a solution submission and queue its first evaluation atomically.
131pub async fn create_solution_submission_with_job(
132    pool: &PgPool,
133    input: &CreateSolutionSubmissionInput,
134) -> Result<SolutionSubmissionRecord> {
135    let mut tx = pool.begin().await?;
136    let challenge = lock_active_challenge_for_admission_tx(&mut tx, &input.challenge_name).await?;
137    let spec: ChallengeBundleSpec = serde_json::from_value(challenge.spec_json.clone())
138        .map_err(|e| ServiceError::Internal(e.to_string()))?;
139    ensure_challenge_supports_eval_type_tx(
140        &mut tx,
141        &challenge.challenge_name,
142        &spec,
143        &input.target,
144        input.eval_type,
145        &input.agent_id,
146    )
147    .await?;
148    ensure_validation_uses_public_bundle(
149        input.eval_type,
150        &spec,
151        &challenge.bundle_key,
152        &challenge.public_bundle_key,
153    )?;
154    enforce_quota_admission(&mut tx, input).await?;
155    ensure_parent_solution_submission_matches_scope_tx(
156        &mut tx,
157        input.parent_solution_submission_id.as_ref(),
158        &input.agent_id,
159        &challenge.challenge_name,
160        &input.target,
161    )
162    .await?;
163
164    let row = sqlx::query(
165        r#"
166        INSERT INTO solution_submissions (
167            id, challenge_name, target, agent_id, artifact_key, note,
168            artifact_zip_bytes, artifact_uncompressed_bytes, artifact_file_count, artifact_sha256,
169            status, explanation, parent_solution_submission_id, credit_text, visible_after_eval
170        )
171        VALUES ($1::uuid, $2, $3, $4::uuid, $5, $6, $7, $8, $9, $10, 'pending', $11, $12::uuid, $13, FALSE)
172        RETURNING
173            id, challenge_name, target, agent_id, artifact_key, note,
174            artifact_zip_bytes, artifact_uncompressed_bytes, artifact_file_count, artifact_sha256,
175            status, explanation, parent_solution_submission_id, credit_text, visible_after_eval,
176            created_at, updated_at
177        "#,
178    )
179    .bind(input.solution_submission_id.as_str())
180    .bind(challenge.challenge_name.as_str())
181    .bind(input.target.as_str())
182    .bind(input.agent_id.as_str())
183    .bind(input.artifact_key.as_str())
184    .bind(&input.note)
185    .bind(u64_to_i64(
186        input.artifact_metadata.artifact_zip_bytes,
187        "artifact_zip_bytes",
188    )?)
189    .bind(u64_to_i64(
190        input.artifact_metadata.artifact_uncompressed_bytes,
191        "artifact_uncompressed_bytes",
192    )?)
193    .bind(u64_to_i64(
194        input.artifact_metadata.artifact_file_count,
195        "artifact_file_count",
196    )?)
197    .bind(input.artifact_metadata.artifact_sha256.to_string())
198    .bind(&input.explanation)
199    .bind(
200        input
201            .parent_solution_submission_id
202            .as_ref()
203            .map(SolutionSubmissionId::as_str),
204    )
205    .bind(&input.credit_text)
206    .fetch_one(&mut *tx)
207    .await?;
208
209    let payload = serde_json::to_value(EvaluationJobPayload {
210        artifact_key: input.artifact_key.clone(),
211        bundle_key: challenge.bundle_key.clone(),
212        public_bundle_key: challenge.public_bundle_key.clone(),
213        challenge_name: challenge.challenge_name.clone(),
214        target: input.target.clone(),
215    })
216    .map_err(|e| ServiceError::Internal(e.to_string()))?;
217
218    let priority = if input.eval_type == ScoringMode::Official {
219        10
220    } else {
221        0
222    };
223    let required_accelerator = required_accelerator_for_target(&spec, &input.target)?;
224
225    sqlx::query(
226        r#"
227        INSERT INTO evaluation_jobs (
228            id, solution_submission_id, challenge_name, target, required_accelerator, eval_type, status, priority, payload_json, scheduled_at
229        )
230        VALUES (
231            $1::uuid, $2::uuid, $3, $4, $5, $6, 'staged', $7, $8,
232            NOW()
233        )
234        "#,
235    )
236    .bind(input.job_id.as_str())
237    .bind(input.solution_submission_id.as_str())
238    .bind(challenge.challenge_name.as_str())
239    .bind(input.target.as_str())
240    .bind(required_accelerator.as_str())
241    .bind(input.eval_type.as_str())
242    .bind(priority)
243    .bind(&payload)
244    .execute(&mut *tx)
245    .await?;
246
247    tx.commit().await?;
248
249    Ok(SolutionSubmissionRecord {
250        id: solution_submission_id_from_row(&row, "id")?,
251        challenge_name: challenge.challenge_name,
252        target: target_from_row(&row, "target")?,
253        agent_id: agent_id_from_row(&row, "agent_id")?,
254        agent_display_name: None,
255        challenge_title: None,
256        challenge_spec: spec,
257        artifact_key: storage_key_from_row(&row, "artifact_key")?,
258        artifact_metadata: artifact_metadata_from_row(&row)?,
259        note: row.try_get("note")?,
260        status: row.try_get("status")?,
261        explanation: row.try_get("explanation")?,
262        parent_solution_submission_id: optional_solution_submission_id_from_row(
263            &row,
264            "parent_solution_submission_id",
265        )?,
266        credit_text: row.try_get("credit_text")?,
267        visible_after_eval: row.try_get("visible_after_eval")?,
268        created_at: row.try_get("created_at")?,
269        updated_at: row.try_get("updated_at")?,
270        evaluation_job_id: Some(input.job_id.clone()),
271        evaluation_job_status: Some("staged".to_string()),
272        evaluation: None,
273        validation_evaluation: None,
274        official_evaluation: None,
275    })
276}
277
278/// Verify that an optional parent submission belongs to the same agent and ranking scope.
279pub async fn ensure_parent_solution_submission_matches_scope(
280    pool: &PgPool,
281    parent_solution_submission_id: Option<&SolutionSubmissionId>,
282    agent_id: &AgentId,
283    challenge_name: &ChallengeName,
284    target: &TargetName,
285) -> Result<()> {
286    let mut tx = pool.begin().await?;
287    ensure_parent_solution_submission_matches_scope_tx(
288        &mut tx,
289        parent_solution_submission_id,
290        agent_id,
291        challenge_name,
292        target,
293    )
294    .await?;
295    tx.commit().await?;
296    Ok(())
297}
298
299/// Return the accelerator requirement declared by the selected challenge target.
300fn required_accelerator_for_target(
301    spec: &ChallengeBundleSpec,
302    target: &TargetName,
303) -> Result<TargetAccelerator> {
304    let target_spec = spec.target(target).ok_or_else(|| {
305        ServiceError::Internal(format!(
306            "challenge `{}` does not declare target `{target}` after admission validation",
307            spec.challenge_name
308        ))
309    })?;
310    Ok(target_spec.accelerator)
311}
312
313/// Enforce parent-submission lineage invariants inside a submission transaction.
314async fn ensure_parent_solution_submission_matches_scope_tx<'a>(
315    tx: &mut Transaction<'a, Postgres>,
316    parent_solution_submission_id: Option<&SolutionSubmissionId>,
317    agent_id: &AgentId,
318    challenge_name: &ChallengeName,
319    target: &TargetName,
320) -> Result<()> {
321    let Some(parent_solution_submission_id) = parent_solution_submission_id else {
322        return Ok(());
323    };
324
325    let row = sqlx::query(
326        r#"
327        SELECT agent_id, challenge_name, target, status, visible_after_eval
328        FROM solution_submissions
329        WHERE id = $1::uuid
330        LIMIT 1
331        "#,
332    )
333    .bind(parent_solution_submission_id.as_str())
334    .fetch_optional(&mut **tx)
335    .await?;
336    let Some(row) = row else {
337        return Err(ServiceError::BadRequest(
338            "parent_solution_submission_id does not reference an existing solution submission"
339                .to_string(),
340        ));
341    };
342
343    let parent_agent_id = agent_id_from_row(&row, "agent_id")?;
344    let parent_challenge_name = challenge_name_from_row(&row, "challenge_name")?;
345    let parent_target = target_from_row(&row, "target")?;
346    let parent_status: String = row.try_get("status")?;
347    let parent_visible: bool = row.try_get("visible_after_eval")?;
348
349    if &parent_agent_id != agent_id
350        || &parent_challenge_name != challenge_name
351        || &parent_target != target
352    {
353        return Err(ServiceError::BadRequest(
354            "parent_solution_submission_id must belong to the same agent, challenge_name, and target"
355                .to_string(),
356        ));
357    }
358    if parent_status != SolutionSubmissionStatus::Completed.as_str() || !parent_visible {
359        return Err(ServiceError::BadRequest(
360            "parent_solution_submission_id must reference a completed visible solution submission"
361                .to_string(),
362        ));
363    }
364
365    Ok(())
366}
367
368/// Delete a solution submission and its dependent jobs/evaluations.
369pub async fn delete_solution_submission(
370    pool: &PgPool,
371    solution_submission_id: &SolutionSubmissionId,
372) -> Result<()> {
373    sqlx::query("DELETE FROM solution_submissions WHERE id = $1::uuid")
374        .bind(solution_submission_id.as_str())
375        .execute(pool)
376        .await?;
377    Ok(())
378}
379
380/// Handles enforce quota admission for this module.
381async fn enforce_quota_admission(
382    tx: &mut Transaction<'_, Postgres>,
383    input: &CreateSolutionSubmissionInput,
384) -> Result<()> {
385    let mut scopes = vec![format!(
386        "agent:{}:challenge:{}:target:{}:mode:{}:daily",
387        input.agent_id,
388        input.challenge_name,
389        input.target,
390        input.eval_type.as_str()
391    )];
392    if input.eval_type == ScoringMode::Official {
393        scopes.push("global:official-active".to_string());
394    }
395    if input.quota_admission.challenge_lifetime_limit.is_some() {
396        scopes.push(format!(
397            "agent:{}:challenge:{}:target:{}:mode:{}:lifetime",
398            input.agent_id,
399            input.challenge_name,
400            input.target,
401            input.eval_type.as_str()
402        ));
403    }
404    scopes.sort();
405
406    for scope in scopes {
407        lock_quota_scope(tx, &scope).await?;
408    }
409
410    let used = count_recent_runs_for_agent_challenge_tx(
411        tx,
412        &input.agent_id,
413        &input.challenge_name,
414        &input.target,
415        input.eval_type,
416        input.quota_admission.window_seconds,
417    )
418    .await?;
419    let limit = input.quota_admission.per_agent_challenge_limit;
420    if used >= limit {
421        return Err(ServiceError::TooManyRequests(format!(
422            "{} quota exceeded for challenge `{}`: {} of {} runs used in the last 24 hours",
423            input.eval_type.as_str(),
424            input.challenge_name,
425            used,
426            limit
427        )));
428    }
429
430    if let Some(limit) = input.quota_admission.challenge_lifetime_limit {
431        let used = count_lifetime_runs_for_agent_challenge_tx(
432            tx,
433            &input.agent_id,
434            &input.challenge_name,
435            &input.target,
436            input.eval_type,
437        )
438        .await?;
439        if used >= limit {
440            return Err(ServiceError::TooManyRequests(format!(
441                "{} challenge limit exceeded for challenge `{}`: {} of {} lifetime runs used",
442                input.eval_type.as_str(),
443                input.challenge_name,
444                used,
445                limit
446            )));
447        }
448    }
449
450    if let Some(max_active) = input.quota_admission.max_active_official_jobs {
451        let active = count_active_evaluation_jobs_tx(tx, ScoringMode::Official).await?;
452        if active >= max_active {
453            return Err(ServiceError::TooManyRequests(format!(
454                "official evaluation queue is full: {active} of {max_active} official jobs are staged, queued, or running"
455            )));
456        }
457    }
458
459    Ok(())
460}
461
462/// Handles lock quota scope for this module.
463async fn lock_quota_scope(tx: &mut Transaction<'_, Postgres>, scope: &str) -> Result<()> {
464    sqlx::query(
465        r#"
466        INSERT INTO quota_admission_locks (scope)
467        VALUES ($1)
468        ON CONFLICT (scope) DO NOTHING
469        "#,
470    )
471    .bind(scope)
472    .execute(&mut **tx)
473    .await?;
474
475    sqlx::query(
476        r#"
477        SELECT scope
478        FROM quota_admission_locks
479        WHERE scope = $1
480        FOR UPDATE
481        "#,
482    )
483    .bind(scope)
484    .fetch_one(&mut **tx)
485    .await?;
486
487    Ok(())
488}
489
490/// Handles count recent runs for agent challenge tx for this module.
491async fn count_recent_runs_for_agent_challenge_tx(
492    tx: &mut Transaction<'_, Postgres>,
493    agent_id: &AgentId,
494    challenge_name: &ChallengeName,
495    target: &TargetName,
496    eval_type: ScoringMode,
497    window_seconds: i64,
498) -> Result<i64> {
499    let count = sqlx::query_scalar::<_, i64>(
500        r#"
501        SELECT COUNT(*)::BIGINT
502        FROM solution_submissions s
503        JOIN LATERAL (
504            SELECT eval_type
505            FROM evaluation_jobs
506            WHERE solution_submission_id = s.id
507            ORDER BY created_at ASC, id ASC
508            LIMIT 1
509        ) first_job ON TRUE
510        WHERE s.agent_id = $1::uuid
511          AND s.challenge_name = $2
512          AND s.target = $3
513          AND first_job.eval_type = $4
514          AND s.created_at >= NOW() - ($5::DOUBLE PRECISION * INTERVAL '1 second')
515        "#,
516    )
517    .bind(agent_id.as_str())
518    .bind(challenge_name.as_str())
519    .bind(target.as_str())
520    .bind(eval_type.as_str())
521    .bind(window_seconds)
522    .fetch_one(&mut **tx)
523    .await?;
524
525    Ok(count)
526}
527
528/// Handles count lifetime runs for agent challenge tx for this module.
529async fn count_lifetime_runs_for_agent_challenge_tx(
530    tx: &mut Transaction<'_, Postgres>,
531    agent_id: &AgentId,
532    challenge_name: &ChallengeName,
533    target: &TargetName,
534    eval_type: ScoringMode,
535) -> Result<i64> {
536    let count = sqlx::query_scalar::<_, i64>(
537        r#"
538        SELECT COUNT(*)::BIGINT
539        FROM solution_submissions s
540        JOIN LATERAL (
541            SELECT eval_type
542            FROM evaluation_jobs
543            WHERE solution_submission_id = s.id
544            ORDER BY created_at ASC, id ASC
545            LIMIT 1
546        ) first_job ON TRUE
547        WHERE s.agent_id = $1::uuid
548          AND s.challenge_name = $2
549          AND s.target = $3
550          AND first_job.eval_type = $4
551        "#,
552    )
553    .bind(agent_id.as_str())
554    .bind(challenge_name.as_str())
555    .bind(target.as_str())
556    .bind(eval_type.as_str())
557    .fetch_one(&mut **tx)
558    .await?;
559
560    Ok(count)
561}
562
563/// Handles count active evaluation jobs tx for this module.
564async fn count_active_evaluation_jobs_tx(
565    tx: &mut Transaction<'_, Postgres>,
566    eval_type: ScoringMode,
567) -> Result<i64> {
568    let count = sqlx::query_scalar::<_, i64>(
569        r#"
570        SELECT COUNT(*)::BIGINT
571        FROM evaluation_jobs
572        WHERE eval_type = $1
573          AND status IN ('staged', 'queued', 'running')
574        "#,
575    )
576    .bind(eval_type.as_str())
577    .fetch_one(&mut **tx)
578    .await?;
579
580    Ok(count)
581}
582
583/// Fetch one solution submission with latest job state and validation/official evaluations.
584pub async fn get_solution_submission_by_id(
585    pool: &PgPool,
586    solution_submission_id: &SolutionSubmissionId,
587) -> Result<Option<SolutionSubmissionRecord>> {
588    get_solution_submission_by_id_inner(pool, solution_submission_id, false).await
589}
590
591/// Fetch one public result-of-record submission with the latest completed official evaluation.
592pub async fn get_public_solution_submission_by_id(
593    pool: &PgPool,
594    solution_submission_id: &SolutionSubmissionId,
595) -> Result<Option<SolutionSubmissionRecord>> {
596    get_solution_submission_by_id_inner(pool, solution_submission_id, true).await
597}
598
599/// Fetch one solution submission while optionally restricting official evaluations to completed
600/// public result-of-record rows.
601async fn get_solution_submission_by_id_inner(
602    pool: &PgPool,
603    solution_submission_id: &SolutionSubmissionId,
604    completed_official_only: bool,
605) -> Result<Option<SolutionSubmissionRecord>> {
606    let row = sqlx::query(
607        r#"
608        SELECT
609            s.id, s.challenge_name, s.target, s.agent_id,
610            p.title AS challenge_title, p.spec_json AS challenge_spec_json,
611            a.display_name AS agent_display_name,
612            s.artifact_key, s.note, s.status, s.explanation,
613            s.artifact_zip_bytes, s.artifact_uncompressed_bytes, s.artifact_file_count, s.artifact_sha256,
614            s.parent_solution_submission_id, s.credit_text, s.visible_after_eval,
615            s.created_at, s.updated_at,
616            j.id AS latest_job_id, j.status AS latest_job_status,
617            pe.id AS validation_eval_id,
618            pe.target AS validation_eval_target,
619            pe.status AS validation_eval_status,
620            pe.eval_type AS validation_eval_eval_type,
621            pe.aggregate_metrics_json AS validation_eval_aggregate_metrics,
622            pe.run_metrics_json AS validation_eval_run_metrics,
623            pe.public_results_json AS validation_eval_public_results,
624            pe.validation_summary_json AS validation_eval_validation_summary,
625            pe.official_summary_json AS validation_eval_official_summary,
626            pe.runner_log_storage_key AS validation_eval_runner_log_storage_key,
627            pe.started_at AS validation_eval_started_at,
628            pe.finished_at AS validation_eval_finished_at,
629            oe.id AS official_eval_id,
630            oe.target AS official_eval_target,
631            oe.status AS official_eval_status,
632            oe.eval_type AS official_eval_eval_type,
633            oe.aggregate_metrics_json AS official_eval_aggregate_metrics,
634            oe.run_metrics_json AS official_eval_run_metrics,
635            oe.public_results_json AS official_eval_public_results,
636            oe.validation_summary_json AS official_eval_validation_summary,
637            oe.official_summary_json AS official_eval_official_summary,
638            oe.runner_log_storage_key AS official_eval_runner_log_storage_key,
639            oe.started_at AS official_eval_started_at,
640            oe.finished_at AS official_eval_finished_at
641        FROM solution_submissions s
642        JOIN agents a ON a.id = s.agent_id
643        JOIN challenges p ON p.challenge_name = s.challenge_name
644        LEFT JOIN LATERAL (
645            SELECT id, status FROM evaluation_jobs WHERE solution_submission_id = s.id ORDER BY created_at DESC LIMIT 1
646        ) j ON TRUE
647        LEFT JOIN LATERAL (
648            SELECT id, target, status, eval_type, aggregate_metrics_json, run_metrics_json, public_results_json, validation_summary_json, official_summary_json, runner_log_storage_key, started_at, finished_at
649            FROM evaluations WHERE solution_submission_id = s.id AND eval_type = 'validation' AND target = s.target ORDER BY created_at DESC LIMIT 1
650        ) pe ON TRUE
651        LEFT JOIN LATERAL (
652            SELECT id, target, status, eval_type, aggregate_metrics_json, run_metrics_json, public_results_json, validation_summary_json, official_summary_json, runner_log_storage_key, started_at, finished_at
653            FROM evaluations
654            WHERE solution_submission_id = s.id
655              AND eval_type = 'official'
656              AND target = s.target
657              AND (NOT $2::boolean OR status = 'completed')
658            ORDER BY created_at DESC
659            LIMIT 1
660        ) oe ON TRUE
661        WHERE s.id = $1::uuid
662        LIMIT 1
663        "#
664    )
665        .bind(solution_submission_id.as_str())
666        .bind(completed_official_only)
667        .fetch_optional(pool)
668        .await?;
669
670    let Some(r) = row else {
671        return Ok(None);
672    };
673
674    let validation_eval = parse_eval_from_row(&r, "validation_eval")?;
675    let official_eval = parse_eval_from_row(&r, "official_eval")?;
676    let challenge_spec_json: Value = r.try_get("challenge_spec_json")?;
677    let challenge_spec = serde_json::from_value::<ChallengeBundleSpec>(challenge_spec_json)
678        .map_err(|e| ServiceError::Internal(format!("stored challenge spec is invalid: {e}")))?;
679
680    Ok(Some(SolutionSubmissionRecord {
681        id: solution_submission_id_from_row(&r, "id")?,
682        challenge_name: challenge_name_from_row(&r, "challenge_name")?,
683        target: target_from_row(&r, "target")?,
684        agent_id: agent_id_from_row(&r, "agent_id")?,
685        agent_display_name: r.try_get::<Option<String>, _>("agent_display_name")?,
686        challenge_title: r.try_get::<Option<String>, _>("challenge_title")?,
687        challenge_spec,
688        artifact_key: storage_key_from_row(&r, "artifact_key")?,
689        artifact_metadata: artifact_metadata_from_row(&r)?,
690        note: r.try_get("note")?,
691        status: r.try_get("status")?,
692        explanation: r.try_get("explanation")?,
693        parent_solution_submission_id: optional_solution_submission_id_from_row(
694            &r,
695            "parent_solution_submission_id",
696        )?,
697        credit_text: r.try_get("credit_text")?,
698        visible_after_eval: r.try_get("visible_after_eval")?,
699        created_at: r.try_get("created_at")?,
700        updated_at: r.try_get("updated_at")?,
701        evaluation_job_id: optional_evaluation_job_id_from_row(&r, "latest_job_id")?,
702        evaluation_job_status: r.try_get::<Option<String>, _>("latest_job_status")?,
703        evaluation: official_eval.clone().or_else(|| validation_eval.clone()),
704        validation_evaluation: validation_eval,
705        official_evaluation: official_eval,
706    }))
707}
708
709/// List recent solution submissions for admin operations.
710pub async fn list_admin_solution_submissions(
711    pool: &PgPool,
712    limit: i64,
713) -> Result<Vec<AdminSolutionSubmissionListItemRecord>> {
714    let rows = sqlx::query(
715        r#"
716        SELECT
717            s.id,
718            s.challenge_name,
719            s.target,
720            p.title AS challenge_title,
721            s.agent_id,
722            a.display_name AS agent_display_name,
723            s.note,
724            s.status,
725            s.visible_after_eval,
726            s.created_at,
727            s.updated_at,
728            j.id AS latest_job_id,
729            j.status AS latest_job_status,
730            j.eval_type AS latest_job_eval_type,
731            ve.status AS validation_status,
732            oe.status AS official_status
733        FROM solution_submissions s
734        JOIN challenges p ON p.challenge_name = s.challenge_name
735        JOIN agents a ON a.id = s.agent_id
736        LEFT JOIN LATERAL (
737            SELECT id, status, eval_type
738            FROM evaluation_jobs
739            WHERE solution_submission_id = s.id
740            ORDER BY created_at DESC
741            LIMIT 1
742        ) j ON TRUE
743        LEFT JOIN LATERAL (
744            SELECT status
745            FROM evaluations
746            WHERE solution_submission_id = s.id AND eval_type = 'validation'
747            ORDER BY created_at DESC
748            LIMIT 1
749        ) ve ON TRUE
750        LEFT JOIN LATERAL (
751            SELECT status
752            FROM evaluations
753            WHERE solution_submission_id = s.id AND eval_type = 'official'
754            ORDER BY created_at DESC
755            LIMIT 1
756        ) oe ON TRUE
757        ORDER BY s.updated_at DESC, s.created_at DESC
758        LIMIT $1
759        "#,
760    )
761    .bind(limit)
762    .fetch_all(pool)
763    .await?;
764
765    rows.into_iter()
766        .map(|r| {
767            Ok(AdminSolutionSubmissionListItemRecord {
768                id: solution_submission_id_from_row(&r, "id")?,
769                challenge_name: challenge_name_from_row(&r, "challenge_name")?,
770                challenge_title: r.try_get("challenge_title")?,
771                target: target_from_row(&r, "target")?,
772                agent_id: agent_id_from_row(&r, "agent_id")?,
773                agent_display_name: r.try_get("agent_display_name")?,
774                note: r.try_get("note")?,
775                status: solution_submission_status_from_row(&r, "status")?,
776                visible_after_eval: r.try_get("visible_after_eval")?,
777                latest_job_id: optional_evaluation_job_id_from_row(&r, "latest_job_id")?,
778                latest_job_status: optional_evaluation_job_status_from_row(
779                    &r,
780                    "latest_job_status",
781                )?,
782                latest_job_eval_type: optional_scoring_mode_from_row(&r, "latest_job_eval_type")?,
783                validation_status: optional_evaluation_status_from_row(&r, "validation_status")?,
784                official_status: optional_evaluation_status_from_row(&r, "official_status")?,
785                created_at: r.try_get("created_at")?,
786                updated_at: r.try_get("updated_at")?,
787            })
788        })
789        .collect::<Result<Vec<_>>>()
790}
791
792/// List solution submissions for a challenge after an official evaluation makes them visible.
793pub async fn list_public_solution_submissions_for_challenge(
794    pool: &PgPool,
795    challenge_name: &ChallengeName,
796    target: &TargetName,
797    limit: i64,
798) -> Result<Vec<PublicSolutionSubmissionListItemRecord>> {
799    let rows = sqlx::query(
800        r#"
801        SELECT
802            s.id, s.challenge_name, s.target, p.title AS challenge_title,
803            s.agent_id, a.display_name AS agent_display_name, s.status, s.note, s.explanation,
804            s.parent_solution_submission_id, s.credit_text, s.created_at, s.updated_at,
805            COALESCE(oe.aggregate_metrics_json, '[]'::jsonb) AS official_metrics
806        FROM solution_submissions s
807        JOIN agents a ON a.id = s.agent_id
808        JOIN challenges p ON p.challenge_name = s.challenge_name
809        LEFT JOIN LATERAL (
810            SELECT aggregate_metrics_json, official_summary_json
811            FROM evaluations
812            WHERE solution_submission_id = s.id AND eval_type = 'official' AND status = 'completed' AND target = s.target
813            ORDER BY created_at DESC LIMIT 1
814        ) oe ON TRUE
815        WHERE p.challenge_name = $1
816          AND s.visible_after_eval = TRUE
817          AND s.target = $2
818        ORDER BY s.created_at DESC
819        LIMIT $3
820        "#,
821    )
822    .bind(challenge_name.as_str())
823    .bind(target.as_str())
824    .bind(limit)
825    .fetch_all(pool)
826    .await?;
827
828    rows.into_iter()
829        .map(|r| {
830            let official_metrics: Vec<MetricValue> = decode_optional_json(
831                r.try_get::<Option<Value>, _>("official_metrics")?,
832                "solution submission official metrics",
833            )?
834            .unwrap_or_default();
835            Ok(PublicSolutionSubmissionListItemRecord {
836                id: solution_submission_id_from_row(&r, "id")?,
837                challenge_name: challenge_name_from_row(&r, "challenge_name")?,
838                target: target_from_row(&r, "target")?,
839                challenge_title: r.try_get("challenge_title")?,
840                agent_id: agent_id_from_row(&r, "agent_id")?,
841                agent_display_name: r.try_get("agent_display_name")?,
842                status: solution_submission_status_from_row(&r, "status")?,
843                note: r.try_get("note")?,
844                explanation: r.try_get("explanation")?,
845                parent_solution_submission_id: optional_solution_submission_id_from_row(
846                    &r,
847                    "parent_solution_submission_id",
848                )?,
849                credit_text: r.try_get("credit_text")?,
850                official_metrics,
851                created_at: r.try_get("created_at")?,
852                updated_at: r.try_get("updated_at")?,
853            })
854        })
855        .collect::<Result<Vec<_>>>()
856}
857
858/// Count visible solution submissions for a challenge and target.
859pub async fn count_public_solution_submissions_for_challenge(
860    pool: &PgPool,
861    challenge_name: &ChallengeName,
862    target: &TargetName,
863) -> Result<i64> {
864    let count = sqlx::query_scalar::<_, i64>(
865        r#"
866        SELECT COUNT(*)::bigint
867        FROM solution_submissions s
868        WHERE s.challenge_name = $1
869          AND s.visible_after_eval = TRUE
870          AND s.target = $2
871        "#,
872    )
873    .bind(challenge_name.as_str())
874    .bind(target.as_str())
875    .fetch_one(pool)
876    .await?;
877
878    Ok(count)
879}
880
881/// Count aggregate currently public observer stats.
882pub async fn public_observer_stats(pool: &PgPool) -> Result<PublicObserverStatsRecord> {
883    let row = sqlx::query_as::<_, (i64, i64, i64, i64)>(
884        r#"
885        WITH public_challenges AS (
886            SELECT challenge_name, spec_json
887            FROM challenges
888            WHERE status = 'active'
889              AND spec_json IS NOT NULL
890        ),
891        public_submissions AS (
892            SELECT s.agent_id
893            FROM solution_submissions s
894            JOIN public_challenges c ON c.challenge_name = s.challenge_name
895            WHERE s.visible_after_eval = TRUE
896              AND s.status = 'completed'
897              AND (
898                c.spec_json #>> '{visibility,result_detail}' = 'submitter_live_public_live'
899                OR (
900                    c.spec_json #>> '{visibility,result_detail}' = 'submitter_live_public_after_close'
901                    AND (c.spec_json ->> 'closes_at')::timestamptz <= NOW()
902                )
903              )
904        ),
905        public_challenge_attempts AS (
906            SELECT s.id
907            FROM solution_submissions s
908            JOIN public_challenges c ON c.challenge_name = s.challenge_name
909        )
910        SELECT
911            (SELECT COUNT(*)::bigint FROM public_challenges) AS challenge_count,
912            (SELECT COUNT(DISTINCT agent_id)::bigint FROM public_submissions) AS agent_count,
913            (SELECT COUNT(*)::bigint FROM public_submissions) AS public_completed_submission_count,
914            (SELECT COUNT(*)::bigint FROM public_challenge_attempts) AS total_solution_attempt_count
915        "#,
916    )
917    .fetch_one(pool)
918    .await?;
919
920    Ok(PublicObserverStatsRecord {
921        challenge_count: count_to_u64("challenge_count", row.0)?,
922        agent_count: count_to_u64("agent_count", row.1)?,
923        public_completed_submission_count: count_to_u64(
924            "public_completed_submission_count",
925            row.2,
926        )?,
927        total_solution_attempt_count: count_to_u64("total_solution_attempt_count", row.3)?,
928    })
929}
930
931fn count_to_u64(label: &str, value: i64) -> Result<u64> {
932    u64::try_from(value).map_err(|_| ServiceError::Internal(format!("{label} count was negative")))
933}
934
935/// Reads parse eval from a database row and validates its domain shape.
936fn parse_eval_from_row(row: &sqlx::postgres::PgRow, prefix: &str) -> Result<Option<EvaluationDto>> {
937    let id_col = format!("{}_id", prefix);
938    let id = optional_evaluation_id_from_row(row, id_col.as_str())?;
939    let id = match id {
940        Some(i) => i,
941        _ => return Ok(None),
942    };
943    let status_str: String = row.try_get(format!("{}_status", prefix).as_str())?;
944    let target_col = format!("{}_target", prefix);
945    let target = target_from_row(row, target_col.as_str())?;
946    let eval_type_str: String = row.try_get(format!("{}_eval_type", prefix).as_str())?;
947    let aggregate_json: Option<Value> =
948        row.try_get(format!("{}_aggregate_metrics", prefix).as_str())?;
949    let run_metrics_json: Option<Value> =
950        row.try_get(format!("{}_run_metrics", prefix).as_str())?;
951    let public_results_json: Option<Value> =
952        row.try_get(format!("{}_public_results", prefix).as_str())?;
953    let validation_summary_json: Option<Value> =
954        row.try_get(format!("{}_validation_summary", prefix).as_str())?;
955    let official_json: Option<Value> =
956        row.try_get(format!("{}_official_summary", prefix).as_str())?;
957    let runner_log_storage_key =
958        optional_storage_key_from_row(row, format!("{prefix}_runner_log_storage_key").as_str())?;
959    let started_at: Option<DateTime<Utc>> =
960        row.try_get(format!("{}_started_at", prefix).as_str())?;
961    let finished_at: Option<DateTime<Utc>> =
962        row.try_get(format!("{}_finished_at", prefix).as_str())?;
963
964    let status = EvaluationStatus::from_storage_value(&status_str).ok_or_else(|| {
965        ServiceError::Internal(format!("unexpected evaluation status `{status_str}`"))
966    })?;
967    let eval_type = ScoringMode::from_storage_value(&eval_type_str).ok_or_else(|| {
968        ServiceError::Internal(format!("unexpected evaluation type `{eval_type_str}`"))
969    })?;
970    let public_results: Vec<PublicCaseResult> =
971        decode_optional_json(public_results_json, &format!("{prefix} public results"))?
972            .unwrap_or_default();
973    let aggregate_metrics: Vec<MetricValue> =
974        decode_optional_json(aggregate_json, &format!("{prefix} aggregate metrics"))?
975            .unwrap_or_default();
976    let run_metrics: Vec<RunMetricResult> =
977        decode_optional_json(run_metrics_json, &format!("{prefix} run metrics"))?
978            .unwrap_or_default();
979    let validation_summary = decode_optional_json(
980        validation_summary_json,
981        &format!("{prefix} validation summary"),
982    )?;
983    let official_summary =
984        decode_optional_json(official_json, &format!("{prefix} official summary"))?;
985
986    Ok(Some(EvaluationDto {
987        id,
988        target,
989        status,
990        eval_type,
991        aggregate_metrics,
992        run_metrics,
993        public_results,
994        validation_summary,
995        official_summary,
996        runner_log_storage_key,
997        started_at: started_at.map(|d| d.to_rfc3339()),
998        finished_at: finished_at.map(|d| d.to_rfc3339()),
999    }))
1000}
1001
1002/// Reads a solution-submission status and validates its persisted value.
1003fn solution_submission_status_from_row(
1004    row: &sqlx::postgres::PgRow,
1005    column: &str,
1006) -> Result<SolutionSubmissionStatus> {
1007    let value: String = row.try_get(column)?;
1008    SolutionSubmissionStatus::from_storage_value(&value).ok_or_else(|| {
1009        ServiceError::Internal(format!("unexpected solution submission status `{value}`"))
1010    })
1011}
1012
1013/// Reads an optional evaluation job status and validates its persisted value.
1014fn optional_evaluation_job_status_from_row(
1015    row: &sqlx::postgres::PgRow,
1016    column: &str,
1017) -> Result<Option<EvaluationJobStatus>> {
1018    let value: Option<String> = row.try_get(column)?;
1019    value
1020        .map(|value| {
1021            EvaluationJobStatus::from_storage_value(&value).ok_or_else(|| {
1022                ServiceError::Internal(format!("unexpected evaluation job status `{value}`"))
1023            })
1024        })
1025        .transpose()
1026}
1027
1028/// Reads an optional evaluation result status and validates its persisted value.
1029fn optional_evaluation_status_from_row(
1030    row: &sqlx::postgres::PgRow,
1031    column: &str,
1032) -> Result<Option<EvaluationStatus>> {
1033    let value: Option<String> = row.try_get(column)?;
1034    value
1035        .map(|value| {
1036            EvaluationStatus::from_storage_value(&value).ok_or_else(|| {
1037                ServiceError::Internal(format!("unexpected evaluation status `{value}`"))
1038            })
1039        })
1040        .transpose()
1041}
1042
1043/// Reads an optional scoring mode and validates its persisted value.
1044fn optional_scoring_mode_from_row(
1045    row: &sqlx::postgres::PgRow,
1046    column: &str,
1047) -> Result<Option<ScoringMode>> {
1048    let value: Option<String> = row.try_get(column)?;
1049    value
1050        .map(|value| {
1051            ScoringMode::from_storage_value(&value).ok_or_else(|| {
1052                ServiceError::Internal(format!("unexpected evaluation type `{value}`"))
1053            })
1054        })
1055        .transpose()
1056}
1057
1058/// Reads storage key from a database row and validates its domain shape.
1059fn storage_key_from_row(row: &sqlx::postgres::PgRow, column: &str) -> Result<StorageKey> {
1060    let value: String = row.try_get(column)?;
1061    StorageKey::try_new(&value).map_err(|e| {
1062        ServiceError::Internal(format!("stored invalid storage key in `{column}`: {e}"))
1063    })
1064}
1065
1066/// Reads optional solution artifact metadata from a database row.
1067fn artifact_metadata_from_row(
1068    row: &sqlx::postgres::PgRow,
1069) -> Result<Option<SolutionArtifactMetadata>> {
1070    let artifact_zip_bytes = optional_u64_from_row(row, "artifact_zip_bytes")?;
1071    let artifact_uncompressed_bytes = optional_u64_from_row(row, "artifact_uncompressed_bytes")?;
1072    let artifact_file_count = optional_u64_from_row(row, "artifact_file_count")?;
1073    let artifact_sha256: Option<String> = row.try_get("artifact_sha256")?;
1074    match (
1075        artifact_zip_bytes,
1076        artifact_uncompressed_bytes,
1077        artifact_file_count,
1078        artifact_sha256,
1079    ) {
1080        (None, None, None, None) => Ok(None),
1081        (
1082            Some(artifact_zip_bytes),
1083            Some(artifact_uncompressed_bytes),
1084            Some(artifact_file_count),
1085            Some(artifact_sha256),
1086        ) => {
1087            let artifact_sha256 = Sha256Digest::try_new(&artifact_sha256).map_err(|e| {
1088                ServiceError::Internal(format!("stored invalid artifact_sha256: {e}"))
1089            })?;
1090            Ok(Some(SolutionArtifactMetadata {
1091                artifact_zip_bytes,
1092                artifact_uncompressed_bytes,
1093                artifact_file_count,
1094                artifact_sha256,
1095            }))
1096        }
1097        _ => Err(ServiceError::Internal(
1098            "stored partial solution artifact metadata".to_string(),
1099        )),
1100    }
1101}
1102
1103/// Reads an optional non-negative BIGINT as `u64`.
1104fn optional_u64_from_row(row: &sqlx::postgres::PgRow, column: &str) -> Result<Option<u64>> {
1105    let value: Option<i64> = row.try_get(column)?;
1106    value
1107        .map(|value| {
1108            u64::try_from(value)
1109                .map_err(|_| ServiceError::Internal(format!("stored negative value in `{column}`")))
1110        })
1111        .transpose()
1112}
1113
1114/// Converts a bounded `u64` to PostgreSQL BIGINT.
1115fn u64_to_i64(value: u64, field: &str) -> Result<i64> {
1116    i64::try_from(value)
1117        .map_err(|_| ServiceError::Validation(format!("{field} exceeds supported range")))
1118}
1119
1120/// Reads optional storage key from a database row and validates its domain shape.
1121fn optional_storage_key_from_row(
1122    row: &sqlx::postgres::PgRow,
1123    column: &str,
1124) -> Result<Option<StorageKey>> {
1125    row.try_get::<Option<String>, _>(column)?
1126        .map(StorageKey::try_new)
1127        .transpose()
1128        .map_err(|e| {
1129            ServiceError::Internal(format!("stored invalid storage key in `{column}`: {e}"))
1130        })
1131}
1132
1133/// Reads optional evaluation job id from a database row and validates its domain shape.
1134fn optional_evaluation_job_id_from_row(
1135    row: &sqlx::postgres::PgRow,
1136    column: &str,
1137) -> Result<Option<EvaluationJobId>> {
1138    optional_uuid_string_from_row(row, column)?
1139        .map(EvaluationJobId::try_new)
1140        .transpose()
1141        .map_err(|e| {
1142            ServiceError::Internal(format!(
1143                "stored invalid evaluation job id in column `{column}`: {e}"
1144            ))
1145        })
1146}
1147
1148/// Reads optional evaluation id from a database row and validates its domain shape.
1149fn optional_evaluation_id_from_row(
1150    row: &sqlx::postgres::PgRow,
1151    column: &str,
1152) -> Result<Option<EvaluationId>> {
1153    optional_uuid_string_from_row(row, column)?
1154        .map(EvaluationId::try_new)
1155        .transpose()
1156        .map_err(|e| {
1157            ServiceError::Internal(format!(
1158                "stored invalid evaluation id in column `{column}`: {e}"
1159            ))
1160        })
1161}