agentics_persistence/db/
evaluations.rs1use sqlx::PgPool;
2
3use agentics_domain::models::evaluation::{
4 EvaluationStatus, MetricValue, PublicCaseResult, RunMetricResult, ScoreSummary, ScoringMode,
5 SolutionSubmissionStatus,
6};
7use agentics_domain::models::ids::{EvaluationId, EvaluationJobId, SolutionSubmissionId};
8use agentics_domain::models::names::TargetName;
9use agentics_domain::storage::StorageKey;
10use agentics_error::{Result, ServiceError};
11
12use super::leaderboard::{
13 update_official_metrics_for_solution_submission_tx,
14 upsert_leaderboard_entry_for_solution_submission_tx,
15};
16
17#[derive(Debug, Clone)]
19pub struct MarkEvaluationStartedInput {
20 pub evaluation_id: EvaluationId,
21 pub solution_submission_id: SolutionSubmissionId,
22 pub job_id: EvaluationJobId,
23 pub worker_id: String,
24 pub claim_attempt_count: i32,
25 pub target: TargetName,
26 pub eval_type: ScoringMode,
27}
28
29pub async fn mark_evaluation_started(
31 pool: &PgPool,
32 input: &MarkEvaluationStartedInput,
33) -> Result<bool> {
34 let eval_type_str = input.eval_type.as_str();
35
36 let result = sqlx::query(
37 r#"
38 INSERT INTO evaluations (id, solution_submission_id, job_id, target, eval_type, status, started_at)
39 SELECT $1::uuid, j.solution_submission_id, j.id, j.target, j.eval_type, 'running', NOW()
40 FROM evaluation_jobs j
41 WHERE j.id = $3::uuid
42 AND j.solution_submission_id = $2::uuid
43 AND j.worker_id = $4
44 AND j.attempt_count = $5
45 AND j.status = 'running'
46 AND j.target = $6
47 AND j.eval_type = $7
48 ON CONFLICT (job_id) DO NOTHING
49 "#,
50 )
51 .bind(input.evaluation_id.as_str())
52 .bind(input.solution_submission_id.as_str())
53 .bind(input.job_id.as_str())
54 .bind(&input.worker_id)
55 .bind(input.claim_attempt_count)
56 .bind(input.target.as_str())
57 .bind(eval_type_str)
58 .execute(pool)
59 .await?;
60
61 Ok(result.rows_affected() == 1)
62}
63
64#[derive(Debug, Clone)]
66pub struct PersistedEvaluationResult {
67 pub solution_submission_id: SolutionSubmissionId,
68 pub job_id: EvaluationJobId,
69 pub worker_id: String,
70 pub claim_attempt_count: i32,
71 pub target: TargetName,
72 pub eval_type: ScoringMode,
73 pub status: EvaluationStatus,
74 pub aggregate_metrics: Vec<MetricValue>,
75 pub run_metrics: Vec<RunMetricResult>,
76 pub public_results: Vec<PublicCaseResult>,
77 pub validation_summary: Option<ScoreSummary>,
78 pub official_summary: Option<ScoreSummary>,
79 pub runner_log_storage_key: Option<StorageKey>,
80 pub last_error: Option<String>,
81}
82
83pub async fn mark_evaluation_finished(
85 pool: &PgPool,
86 result: &PersistedEvaluationResult,
87) -> Result<bool> {
88 let mut tx = pool.begin().await?;
89
90 let public_results_json = serde_json::to_value(&result.public_results)
91 .map_err(|e| ServiceError::Internal(e.to_string()))?;
92 let validation_summary_json = serde_json::to_value(&result.validation_summary)
93 .map_err(|e| ServiceError::Internal(e.to_string()))?;
94 let official_json = serde_json::to_value(&result.official_summary)
95 .map_err(|e| ServiceError::Internal(e.to_string()))?;
96 let aggregate_metrics_json = serde_json::to_value(&result.aggregate_metrics)
97 .map_err(|e| ServiceError::Internal(e.to_string()))?;
98 let run_metrics_json = serde_json::to_value(&result.run_metrics)
99 .map_err(|e| ServiceError::Internal(e.to_string()))?;
100 let status_str = if result.status == EvaluationStatus::Completed {
101 EvaluationStatus::Completed.as_str()
102 } else {
103 EvaluationStatus::Failed.as_str()
104 };
105
106 let job_update = sqlx::query(
107 r#"
108 UPDATE evaluation_jobs
109 SET status = $2, finished_at = NOW(), last_error = $3
110 WHERE id = $1::uuid
111 AND status = 'running'
112 AND worker_id = $4
113 AND attempt_count = $5
114 "#,
115 )
116 .bind(result.job_id.as_str())
117 .bind(status_str)
118 .bind(&result.last_error)
119 .bind(&result.worker_id)
120 .bind(result.claim_attempt_count)
121 .execute(&mut *tx)
122 .await?;
123
124 if job_update.rows_affected() == 0 {
125 tx.commit().await?;
126 return Ok(false);
127 }
128
129 let evaluation_update = sqlx::query(
130 r#"
131 UPDATE evaluations
132 SET status = $2,
133 aggregate_metrics_json = $3, run_metrics_json = $4,
134 public_results_json = $5, validation_summary_json = $6,
135 official_summary_json = $7, runner_log_storage_key = $8, finished_at = NOW()
136 WHERE job_id = $1::uuid
137 AND status = 'running'
138 "#,
139 )
140 .bind(result.job_id.as_str())
141 .bind(status_str)
142 .bind(&aggregate_metrics_json)
143 .bind(&run_metrics_json)
144 .bind(&public_results_json)
145 .bind(&validation_summary_json)
146 .bind(&official_json)
147 .bind(
148 result
149 .runner_log_storage_key
150 .as_ref()
151 .map(StorageKey::as_str),
152 )
153 .execute(&mut *tx)
154 .await?;
155
156 if evaluation_update.rows_affected() != 1 {
157 return Err(ServiceError::Conflict);
158 }
159
160 let has_previous_official_result =
161 has_completed_official_evaluation_tx(&mut tx, result).await?;
162
163 match result.eval_type {
164 ScoringMode::Validation => {
165 if has_previous_official_result {
166 tx.commit().await?;
167 return Ok(true);
168 }
169 let sub_status = if result.status == EvaluationStatus::Completed {
170 SolutionSubmissionStatus::Completed.as_str()
171 } else {
172 SolutionSubmissionStatus::Failed.as_str()
173 };
174 sqlx::query(
175 "UPDATE solution_submissions SET status = $2, visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
176 )
177 .bind(result.solution_submission_id.as_str())
178 .bind(sub_status)
179 .execute(&mut *tx)
180 .await?;
181 }
182 ScoringMode::Official => {
183 let visible =
184 result.status == EvaluationStatus::Completed || has_previous_official_result;
185 let sub_status = if visible {
186 SolutionSubmissionStatus::Completed.as_str()
187 } else {
188 SolutionSubmissionStatus::Failed.as_str()
189 };
190 sqlx::query(
191 "UPDATE solution_submissions SET status = $2, visible_after_eval = $3, updated_at = NOW() WHERE id = $1::uuid"
192 )
193 .bind(result.solution_submission_id.as_str())
194 .bind(sub_status)
195 .bind(visible)
196 .execute(&mut *tx)
197 .await?;
198
199 if result.status == EvaluationStatus::Completed {
200 let became_best = upsert_leaderboard_entry_for_solution_submission_tx(
201 &mut tx,
202 &result.solution_submission_id,
203 &result.target,
204 &result.public_results,
205 &result.aggregate_metrics,
206 )
207 .await?;
208 if became_best {
209 update_official_metrics_for_solution_submission_tx(
210 &mut tx,
211 &result.solution_submission_id,
212 &result.target,
213 &result.aggregate_metrics,
214 )
215 .await?;
216 }
217 }
218 }
219 }
220
221 tx.commit().await?;
222 Ok(true)
223}
224
225async fn has_completed_official_evaluation_tx(
228 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
229 result: &PersistedEvaluationResult,
230) -> Result<bool> {
231 let exists = sqlx::query_scalar::<_, bool>(
232 r#"
233 SELECT EXISTS (
234 SELECT 1
235 FROM evaluations
236 WHERE solution_submission_id = $1::uuid
237 AND target = $2
238 AND eval_type = 'official'
239 AND status = 'completed'
240 AND job_id <> $3::uuid
241 )
242 "#,
243 )
244 .bind(result.solution_submission_id.as_str())
245 .bind(result.target.as_str())
246 .bind(result.job_id.as_str())
247 .fetch_one(&mut **tx)
248 .await?;
249
250 Ok(exists)
251}