use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use awaken_eval::{EvalRun, EvalRunExecutionMode, LlmExecutorJudge, expand_cells, mint_run_id};
use awaken_eval::{Expectation, Fixture, MockResponse};
use awaken_ext_observability::trace_store::TraceStoreSink;
use awaken_server_contract::agent_spec_patch::AgentSpecPatch;
use axum::Json;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use crate::app::EvalRoutesState;
use crate::error::ApiError;
use crate::services::eval_cell::{
DEFAULT_MAX_TOTAL_TOKENS, LiveCellOptions, ResolvedCell, run_live_eval_cells,
validate_judge_required_for_expectation,
};
use crate::services::eval_common::resolve_live_executor;
pub(crate) const ADHOC_DATASET_ID: &str = "_adhoc";
const DEFAULT_MAX_WALLTIME_SECS: u64 = 60;
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct OnlineEvalRequest {
pub user_input: String,
pub models: Vec<String>,
#[serde(default)]
pub agent_id: Option<String>,
#[serde(default)]
pub agent_overrides: Option<AgentSpecPatch>,
#[serde(default)]
pub expectations: Option<Expectation>,
#[serde(default = "default_persist")]
pub persist: bool,
#[serde(default = "default_walltime")]
pub max_walltime_secs: u64,
#[serde(default = "default_token_budget")]
pub max_total_tokens: u32,
#[serde(default)]
pub samples: Option<u32>,
#[serde(default)]
pub judge: Option<crate::services::eval_run_service::JudgeRequest>,
}
fn default_persist() -> bool {
true
}
fn default_walltime() -> u64 {
DEFAULT_MAX_WALLTIME_SECS
}
fn default_token_budget() -> u32 {
DEFAULT_MAX_TOTAL_TOKENS
}
#[derive(Debug, Serialize)]
pub struct OnlineEvalResponse {
pub run: EvalRun,
pub persisted: bool,
}
#[tracing::instrument(skip_all, fields(models = ?body.models))]
pub async fn start_online_eval(
State(state): State<EvalRoutesState>,
headers: HeaderMap,
Json(body): Json<OnlineEvalRequest>,
) -> Result<Response, ApiError> {
crate::config_routes::ensure_admin_auth(&state.admin, &headers)?;
let limits = state.limits.clone();
if body.models.is_empty() {
return Err(ApiError::BadRequest(
"models must contain at least one model id".into(),
));
}
crate::services::eval_cell::validate_unique_models(&body.models)?;
if body.max_walltime_secs == 0 {
return Err(ApiError::BadRequest(
"max_walltime_secs must be >= 1 (0 would time out every cell immediately)".into(),
));
}
let cells = expand_cells(&body.models);
if body.samples == Some(0) {
return Err(ApiError::BadRequest(
"samples must be >= 1 (omit the field for a single sample)".into(),
));
}
let samples = body.samples.unwrap_or(1).max(1);
if samples > limits.max_samples_per_cell {
return Err(ApiError::BadRequest(format!(
"samples={samples} exceeds cap {}",
limits.max_samples_per_cell
)));
}
let total_units = cells.len() * samples as usize;
if total_units > limits.max_cells_per_sync_online {
return Err(ApiError::BadRequest(format!(
"{total_units} units (cells × samples) exceed sync online cap {}; \
split or persist as a dataset and use /v1/eval/runs",
limits.max_cells_per_sync_online,
)));
}
let expectations = body.expectations.clone().unwrap_or_default();
validate_judge_required_for_expectation(
&expectations,
"online expectations",
true,
body.judge.is_some(),
body.judge
.as_ref()
.and_then(|judge| judge.rubric.as_deref()),
)?;
if let Some(jr) = body.judge.as_ref()
&& let Some(n) = jr.revise_max_retries
&& n > limits.max_judge_revisions
{
return Err(ApiError::BadRequest(format!(
"revise_max_retries={n} exceeds cap {}",
limits.max_judge_revisions
)));
}
let agent_base = match &body.agent_id {
Some(id) => Some(crate::services::eval_common::resolve_agent_spec(&state, id).await?),
None => None,
};
let mut resolved: Vec<ResolvedCell> = Vec::with_capacity(cells.len());
for cell in cells {
let model_id = cell
.model_id
.as_deref()
.expect("expand_cells always sets model_id when models is non-empty");
let r = resolve_live_executor(&state, model_id).await?;
resolved.push(ResolvedCell {
cell,
executor: r.executor,
upstream_model: r.upstream_model,
spec: r.spec,
});
}
let judge = if let Some(ref jr) = body.judge {
let resolved = resolve_live_executor(&state, &jr.model_id).await?;
Some(crate::services::eval_cell::JudgeContext {
judge: LlmExecutorJudge::new(resolved.executor, resolved.upstream_model),
rubric: jr.rubric.clone(),
revise_max_retries: jr.revise_max_retries,
})
} else {
None
};
let fixture = Fixture {
id: ADHOC_DATASET_ID.into(),
description: None,
user_input: body.user_input.clone(),
provider_script: vec![],
provider_script_error: None,
source_run_id: None,
source_model_id: None,
allow_unused_provider_script: false,
mock_response: MockResponse::default(),
expect: expectations,
continued_turns: vec![],
};
let eval_run_id = mint_run_id();
let started_at_secs = epoch_secs_now();
crate::services::eval_events::record_eval_run_started(
&state,
crate::services::eval_events::EvalRunStartedEvent {
eval_run_id: eval_run_id.clone(),
dataset_id: ADHOC_DATASET_ID.to_string(),
dataset_revision: 0,
mode: "online_live_matrix",
planned_item_count: total_units,
started_at_secs,
},
)
.await;
let trace_store = state.trace.as_ref().map(|trace| trace.trace_store.clone());
let trace_sink = trace_store.as_ref().map(|store| {
Arc::new(TraceStoreSink::new(store.clone()))
as Arc<dyn awaken_ext_observability::MetricsSink>
});
let items = run_live_eval_cells(
std::slice::from_ref(&fixture),
&resolved,
LiveCellOptions {
samples,
max_concurrent: limits.max_concurrent_online_cells,
max_walltime_secs: body.max_walltime_secs,
agent_base,
agent_overrides: body.agent_overrides.clone(),
judge,
max_total_tokens: Some(body.max_total_tokens),
trace_sink,
trace_store,
task_context: "online cell",
},
)
.await?;
let run = EvalRun {
id: eval_run_id,
dataset_id: ADHOC_DATASET_ID.into(),
dataset_revision: 0,
execution_mode: EvalRunExecutionMode::Live,
items,
started_at_secs,
ended_at_secs: epoch_secs_now(),
};
let persisted = if body.persist {
let store = state.eval.eval_run_store.clone();
store
.write(&run)
.map_err(super::eval_run_service::map_eval_run_store_error)?;
true
} else {
false
};
crate::services::eval_events::record_eval_run_completed(
&state,
&run,
"online_live_matrix",
persisted,
)
.await;
Ok(Json(OnlineEvalResponse { run, persisted }).into_response())
}
fn epoch_secs_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}