use sqlx::PgPool;
use agentics_domain::models::evaluation::{
EvaluationStatus, MetricValue, PublicCaseResult, RunMetricResult, ScoreSummary, ScoringMode,
SolutionSubmissionStatus,
};
use agentics_domain::models::ids::{EvaluationId, EvaluationJobId, SolutionSubmissionId};
use agentics_domain::models::names::TargetName;
use agentics_domain::storage::StorageKey;
use agentics_error::{Result, ServiceError};
use super::leaderboard::{
update_official_metrics_for_solution_submission_tx,
upsert_leaderboard_entry_for_solution_submission_tx,
};
#[derive(Debug, Clone)]
pub struct MarkEvaluationStartedInput {
pub evaluation_id: EvaluationId,
pub solution_submission_id: SolutionSubmissionId,
pub job_id: EvaluationJobId,
pub worker_id: String,
pub claim_attempt_count: i32,
pub target: TargetName,
pub eval_type: ScoringMode,
}
pub async fn mark_evaluation_started(
pool: &PgPool,
input: &MarkEvaluationStartedInput,
) -> Result<bool> {
let eval_type_str = input.eval_type.as_str();
let result = sqlx::query(
r#"
INSERT INTO evaluations (id, solution_submission_id, job_id, target, eval_type, status, started_at)
SELECT $1::uuid, j.solution_submission_id, j.id, j.target, j.eval_type, 'running', NOW()
FROM evaluation_jobs j
WHERE j.id = $3::uuid
AND j.solution_submission_id = $2::uuid
AND j.worker_id = $4
AND j.attempt_count = $5
AND j.status = 'running'
AND j.target = $6
AND j.eval_type = $7
ON CONFLICT (job_id) DO NOTHING
"#,
)
.bind(input.evaluation_id.as_str())
.bind(input.solution_submission_id.as_str())
.bind(input.job_id.as_str())
.bind(&input.worker_id)
.bind(input.claim_attempt_count)
.bind(input.target.as_str())
.bind(eval_type_str)
.execute(pool)
.await?;
Ok(result.rows_affected() == 1)
}
#[derive(Debug, Clone)]
pub struct PersistedEvaluationResult {
pub solution_submission_id: SolutionSubmissionId,
pub job_id: EvaluationJobId,
pub worker_id: String,
pub claim_attempt_count: i32,
pub target: TargetName,
pub eval_type: ScoringMode,
pub status: EvaluationStatus,
pub aggregate_metrics: Vec<MetricValue>,
pub run_metrics: Vec<RunMetricResult>,
pub public_results: Vec<PublicCaseResult>,
pub validation_summary: Option<ScoreSummary>,
pub official_summary: Option<ScoreSummary>,
pub runner_log_storage_key: Option<StorageKey>,
pub last_error: Option<String>,
}
pub async fn mark_evaluation_finished(
pool: &PgPool,
result: &PersistedEvaluationResult,
) -> Result<bool> {
let mut tx = pool.begin().await?;
let public_results_json = serde_json::to_value(&result.public_results)
.map_err(|e| ServiceError::Internal(e.to_string()))?;
let validation_summary_json = serde_json::to_value(&result.validation_summary)
.map_err(|e| ServiceError::Internal(e.to_string()))?;
let official_json = serde_json::to_value(&result.official_summary)
.map_err(|e| ServiceError::Internal(e.to_string()))?;
let aggregate_metrics_json = serde_json::to_value(&result.aggregate_metrics)
.map_err(|e| ServiceError::Internal(e.to_string()))?;
let run_metrics_json = serde_json::to_value(&result.run_metrics)
.map_err(|e| ServiceError::Internal(e.to_string()))?;
let status_str = if result.status == EvaluationStatus::Completed {
EvaluationStatus::Completed.as_str()
} else {
EvaluationStatus::Failed.as_str()
};
let job_update = sqlx::query(
r#"
UPDATE evaluation_jobs
SET status = $2, finished_at = NOW(), last_error = $3
WHERE id = $1::uuid
AND status = 'running'
AND worker_id = $4
AND attempt_count = $5
"#,
)
.bind(result.job_id.as_str())
.bind(status_str)
.bind(&result.last_error)
.bind(&result.worker_id)
.bind(result.claim_attempt_count)
.execute(&mut *tx)
.await?;
if job_update.rows_affected() == 0 {
tx.commit().await?;
return Ok(false);
}
let evaluation_update = sqlx::query(
r#"
UPDATE evaluations
SET status = $2,
aggregate_metrics_json = $3, run_metrics_json = $4,
public_results_json = $5, validation_summary_json = $6,
official_summary_json = $7, runner_log_storage_key = $8, finished_at = NOW()
WHERE job_id = $1::uuid
AND status = 'running'
"#,
)
.bind(result.job_id.as_str())
.bind(status_str)
.bind(&aggregate_metrics_json)
.bind(&run_metrics_json)
.bind(&public_results_json)
.bind(&validation_summary_json)
.bind(&official_json)
.bind(
result
.runner_log_storage_key
.as_ref()
.map(StorageKey::as_str),
)
.execute(&mut *tx)
.await?;
if evaluation_update.rows_affected() != 1 {
return Err(ServiceError::Conflict);
}
let has_previous_official_result =
has_completed_official_evaluation_tx(&mut tx, result).await?;
match result.eval_type {
ScoringMode::Validation => {
if has_previous_official_result {
tx.commit().await?;
return Ok(true);
}
let sub_status = if result.status == EvaluationStatus::Completed {
SolutionSubmissionStatus::Completed.as_str()
} else {
SolutionSubmissionStatus::Failed.as_str()
};
sqlx::query(
"UPDATE solution_submissions SET status = $2, visible_after_eval = FALSE, updated_at = NOW() WHERE id = $1::uuid"
)
.bind(result.solution_submission_id.as_str())
.bind(sub_status)
.execute(&mut *tx)
.await?;
}
ScoringMode::Official => {
let visible =
result.status == EvaluationStatus::Completed || has_previous_official_result;
let sub_status = if visible {
SolutionSubmissionStatus::Completed.as_str()
} else {
SolutionSubmissionStatus::Failed.as_str()
};
sqlx::query(
"UPDATE solution_submissions SET status = $2, visible_after_eval = $3, updated_at = NOW() WHERE id = $1::uuid"
)
.bind(result.solution_submission_id.as_str())
.bind(sub_status)
.bind(visible)
.execute(&mut *tx)
.await?;
if result.status == EvaluationStatus::Completed {
let became_best = upsert_leaderboard_entry_for_solution_submission_tx(
&mut tx,
&result.solution_submission_id,
&result.target,
&result.public_results,
&result.aggregate_metrics,
)
.await?;
if became_best {
update_official_metrics_for_solution_submission_tx(
&mut tx,
&result.solution_submission_id,
&result.target,
&result.aggregate_metrics,
)
.await?;
}
}
}
}
tx.commit().await?;
Ok(true)
}
async fn has_completed_official_evaluation_tx(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
result: &PersistedEvaluationResult,
) -> Result<bool> {
let exists = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS (
SELECT 1
FROM evaluations
WHERE solution_submission_id = $1::uuid
AND target = $2
AND eval_type = 'official'
AND status = 'completed'
AND job_id <> $3::uuid
)
"#,
)
.bind(result.solution_submission_id.as_str())
.bind(result.target.as_str())
.bind(result.job_id.as_str())
.fetch_one(&mut **tx)
.await?;
Ok(exists)
}