Skip to main content

agentics_persistence/db/
evaluations.rs

1use sqlx::PgPool;
2
3use agentics_domain::models::evaluation::{
4    EvaluationStatus, MetricValue, PublicCaseResult, RunMetricResult, ScoreSummary, ScoringMode,
5    SolutionSubmissionStatus,
6};
7use agentics_domain::models::ids::{EvaluationId, EvaluationJobId, SolutionSubmissionId};
8use agentics_domain::models::names::TargetName;
9use agentics_domain::storage::StorageKey;
10use agentics_error::{Result, ServiceError};
11
12use super::leaderboard::{
13    update_official_metrics_for_solution_submission_tx,
14    upsert_leaderboard_entry_for_solution_submission_tx,
15};
16
17/// Input for creating the evaluation row associated with a claimed job.
18#[derive(Debug, Clone)]
19pub struct MarkEvaluationStartedInput {
20    pub evaluation_id: EvaluationId,
21    pub solution_submission_id: SolutionSubmissionId,
22    pub job_id: EvaluationJobId,
23    pub worker_id: String,
24    pub claim_attempt_count: i32,
25    pub target: TargetName,
26    pub eval_type: ScoringMode,
27}
28
29/// Mark a job's evaluation as running.
30pub async fn mark_evaluation_started(
31    pool: &PgPool,
32    input: &MarkEvaluationStartedInput,
33) -> Result<bool> {
34    let eval_type_str = input.eval_type.as_str();
35
36    let result = sqlx::query(
37        r#"
38        INSERT INTO evaluations (id, solution_submission_id, job_id, target, eval_type, status, started_at)
39        SELECT $1::uuid, j.solution_submission_id, j.id, j.target, j.eval_type, 'running', NOW()
40        FROM evaluation_jobs j
41        WHERE j.id = $3::uuid
42          AND j.solution_submission_id = $2::uuid
43          AND j.worker_id = $4
44          AND j.attempt_count = $5
45          AND j.status = 'running'
46          AND j.target = $6
47          AND j.eval_type = $7
48        ON CONFLICT (job_id) DO NOTHING
49        "#,
50    )
51    .bind(input.evaluation_id.as_str())
52    .bind(input.solution_submission_id.as_str())
53    .bind(input.job_id.as_str())
54    .bind(&input.worker_id)
55    .bind(input.claim_attempt_count)
56    .bind(input.target.as_str())
57    .bind(eval_type_str)
58    .execute(pool)
59    .await?;
60
61    Ok(result.rows_affected() == 1)
62}
63
64/// Validated runner result prepared for persistence.
65#[derive(Debug, Clone)]
66pub struct PersistedEvaluationResult {
67    pub solution_submission_id: SolutionSubmissionId,
68    pub job_id: EvaluationJobId,
69    pub worker_id: String,
70    pub claim_attempt_count: i32,
71    pub target: TargetName,
72    pub eval_type: ScoringMode,
73    pub status: EvaluationStatus,
74    pub aggregate_metrics: Vec<MetricValue>,
75    pub run_metrics: Vec<RunMetricResult>,
76    pub public_results: Vec<PublicCaseResult>,
77    pub validation_summary: Option<ScoreSummary>,
78    pub official_summary: Option<ScoreSummary>,
79    pub runner_log_storage_key: Option<StorageKey>,
80    pub last_error: Option<String>,
81}
82
83/// Persist a finished evaluation and update dependent solution submission and leaderboard state.
84pub async fn mark_evaluation_finished(
85    pool: &PgPool,
86    result: &PersistedEvaluationResult,
87) -> Result<bool> {
88    let mut tx = pool.begin().await?;
89
90    let public_results_json = serde_json::to_value(&result.public_results)
91        .map_err(|e| ServiceError::Internal(e.to_string()))?;
92    let validation_summary_json = serde_json::to_value(&result.validation_summary)
93        .map_err(|e| ServiceError::Internal(e.to_string()))?;
94    let official_json = serde_json::to_value(&result.official_summary)
95        .map_err(|e| ServiceError::Internal(e.to_string()))?;
96    let aggregate_metrics_json = serde_json::to_value(&result.aggregate_metrics)
97        .map_err(|e| ServiceError::Internal(e.to_string()))?;
98    let run_metrics_json = serde_json::to_value(&result.run_metrics)
99        .map_err(|e| ServiceError::Internal(e.to_string()))?;
100    let status_str = if result.status == EvaluationStatus::Completed {
101        EvaluationStatus::Completed.as_str()
102    } else {
103        EvaluationStatus::Failed.as_str()
104    };
105
106    let job_update = sqlx::query(
107        r#"
108        UPDATE evaluation_jobs
109        SET status = $2, finished_at = NOW(), last_error = $3
110        WHERE id = $1::uuid
111          AND status = 'running'
112          AND worker_id = $4
113          AND attempt_count = $5
114        "#,
115    )
116    .bind(result.job_id.as_str())
117    .bind(status_str)
118    .bind(&result.last_error)
119    .bind(&result.worker_id)
120    .bind(result.claim_attempt_count)
121    .execute(&mut *tx)
122    .await?;
123
124    if job_update.rows_affected() == 0 {
125        tx.commit().await?;
126        return Ok(false);
127    }
128
129    let evaluation_update = sqlx::query(
130        r#"
131        UPDATE evaluations
132        SET status = $2,
133            aggregate_metrics_json = $3, run_metrics_json = $4,
134            public_results_json = $5, validation_summary_json = $6,
135            official_summary_json = $7, runner_log_storage_key = $8, finished_at = NOW()
136        WHERE job_id = $1::uuid
137          AND status = 'running'
138        "#,
139    )
140    .bind(result.job_id.as_str())
141    .bind(status_str)
142    .bind(&aggregate_metrics_json)
143    .bind(&run_metrics_json)
144    .bind(&public_results_json)
145    .bind(&validation_summary_json)
146    .bind(&official_json)
147    .bind(
148        result
149            .runner_log_storage_key
150            .as_ref()
151            .map(StorageKey::as_str),
152    )
153    .execute(&mut *tx)
154    .await?;
155
156    if evaluation_update.rows_affected() != 1 {
157        return Err(ServiceError::Conflict);
158    }
159
160    let has_previous_official_result =
161        has_completed_official_evaluation_tx(&mut tx, result).await?;
162
163    match result.eval_type {
164        ScoringMode::Validation => {
165            if has_previous_official_result {
166                tx.commit().await?;
167                return Ok(true);
168            }
169            let sub_status = if result.status == EvaluationStatus::Completed {
170                SolutionSubmissionStatus::Completed.as_str()
171            } else {
172                SolutionSubmissionStatus::Failed.as_str()
173            };
174            sqlx::query(
175                "UPDATE solution_submissions SET status = $2, visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
176            )
177            .bind(result.solution_submission_id.as_str())
178            .bind(sub_status)
179            .execute(&mut *tx)
180            .await?;
181        }
182        ScoringMode::Official => {
183            let visible =
184                result.status == EvaluationStatus::Completed || has_previous_official_result;
185            let sub_status = if visible {
186                SolutionSubmissionStatus::Completed.as_str()
187            } else {
188                SolutionSubmissionStatus::Failed.as_str()
189            };
190            sqlx::query(
191                "UPDATE solution_submissions SET status = $2, visible_after_eval = $3, updated_at = NOW() WHERE id = $1::uuid"
192            )
193            .bind(result.solution_submission_id.as_str())
194            .bind(sub_status)
195            .bind(visible)
196            .execute(&mut *tx)
197            .await?;
198
199            if result.status == EvaluationStatus::Completed {
200                let became_best = upsert_leaderboard_entry_for_solution_submission_tx(
201                    &mut tx,
202                    &result.solution_submission_id,
203                    &result.target,
204                    &result.public_results,
205                    &result.aggregate_metrics,
206                )
207                .await?;
208                if became_best {
209                    update_official_metrics_for_solution_submission_tx(
210                        &mut tx,
211                        &result.solution_submission_id,
212                        &result.target,
213                        &result.aggregate_metrics,
214                    )
215                    .await?;
216                }
217            }
218        }
219    }
220
221    tx.commit().await?;
222    Ok(true)
223}
224
225/// Return whether this submission already has a completed official evaluation other than the
226/// evaluation currently being finalized.
227async fn has_completed_official_evaluation_tx(
228    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
229    result: &PersistedEvaluationResult,
230) -> Result<bool> {
231    let exists = sqlx::query_scalar::<_, bool>(
232        r#"
233        SELECT EXISTS (
234            SELECT 1
235            FROM evaluations
236            WHERE solution_submission_id = $1::uuid
237              AND target = $2
238              AND eval_type = 'official'
239              AND status = 'completed'
240              AND job_id <> $3::uuid
241        )
242        "#,
243    )
244    .bind(result.solution_submission_id.as_str())
245    .bind(result.target.as_str())
246    .bind(result.job_id.as_str())
247    .fetch_one(&mut **tx)
248    .await?;
249
250    Ok(exists)
251}