use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use awaken_eval::{
EvalRun, EvalRunExecutionMode, EvalRunItem, Expectation, Failure, Fixture, LlmExecutorJudge,
MatrixCell, ReplayOutcome, ReplayReport, RuntimeReplayer, replay_all, score, score_with_judge,
};
use awaken_ext_observability::MetricsSink;
use awaken_ext_observability::trace_store::TraceStore;
use awaken_server_contract::agent_spec_patch::AgentSpecPatch;
use awaken_server_contract::contract::executor::LlmExecutor;
use awaken_server_contract::registry_spec::{AgentSpec, ModelSpec};
use crate::error::ApiError;
pub(crate) const DEFAULT_MAX_TOTAL_TOKENS: u32 = 10_000;
#[derive(Clone)]
pub(crate) struct JudgeContext {
pub judge: LlmExecutorJudge,
pub rubric: Option<String>,
pub revise_max_retries: Option<u32>,
}
pub(crate) type ReviseTuple = (Arc<dyn awaken_eval::judge::Judge>, Option<String>, f32, u32);
#[derive(Clone)]
pub(crate) struct ResolvedCell {
pub cell: MatrixCell,
pub executor: Arc<dyn LlmExecutor>,
pub upstream_model: String,
pub spec: ModelSpec,
}
pub(crate) struct LiveCellOptions {
pub samples: u32,
pub max_concurrent: usize,
pub max_walltime_secs: u64,
pub agent_base: Option<AgentSpec>,
pub agent_overrides: Option<AgentSpecPatch>,
pub judge: Option<JudgeContext>,
pub max_total_tokens: Option<u32>,
pub trace_sink: Option<Arc<dyn MetricsSink>>,
pub trace_store: Option<Arc<dyn TraceStore>>,
pub task_context: &'static str,
}
pub(crate) fn revise_tuple_for(
judge: Option<&JudgeContext>,
expect: &Expectation,
) -> Option<ReviseTuple> {
match (judge, expect.min_judge_score) {
(
Some(JudgeContext {
judge: j,
rubric,
revise_max_retries: Some(retries),
}),
Some(threshold),
) if *retries > 0 => Some((
Arc::new(j.clone()) as Arc<dyn awaken_eval::judge::Judge>,
rubric.clone(),
threshold,
*retries,
)),
_ => None,
}
}
pub(crate) fn validate_baseline_sample_count(
baseline: &EvalRun,
new_run_samples: Option<u32>,
) -> Result<(), ApiError> {
let Some(new_samples) = new_run_samples else {
return Ok(());
};
let Some(baseline_samples) = live_run_sample_count(baseline)? else {
return Ok(());
};
if baseline_samples != new_samples {
return Err(ApiError::BadRequest(format!(
"cannot diff live runs with different sample counts: baseline {} has {}, new run has {}",
baseline.id, baseline_samples, new_samples
)));
}
Ok(())
}
pub(crate) fn live_run_sample_count(run: &EvalRun) -> Result<Option<u32>, ApiError> {
if run.execution_mode != EvalRunExecutionMode::Live {
return Ok(None);
}
let mut groups: BTreeMap<(String, MatrixCell), BTreeSet<u32>> = BTreeMap::new();
for item in &run.items {
groups
.entry((
item.fixture_id.clone(),
item.cell.clone().unwrap_or_default(),
))
.or_default()
.insert(item.sample_index.unwrap_or(0));
}
let mut sample_count = None;
for ((fixture_id, cell), indexes) in groups {
let actual: Vec<u32> = indexes.into_iter().collect();
let expected: Vec<u32> = (0..actual.len() as u32).collect();
if actual != expected {
return Err(ApiError::BadRequest(format!(
"run {} has non-contiguous samples for fixture {} cell {:?}: {:?}",
run.id, fixture_id, cell, actual
)));
}
let n = actual.len() as u32;
if let Some(previous) = sample_count
&& previous != n
{
return Err(ApiError::BadRequest(format!(
"run {} has inconsistent sample counts across live cells",
run.id
)));
}
sample_count = Some(n);
}
Ok(sample_count)
}
pub(crate) fn apply_cell_decorators(
mut replayer: RuntimeReplayer,
overrides: Option<AgentSpecPatch>,
trace_sink: Option<Arc<dyn awaken_ext_observability::MetricsSink>>,
revise: Option<ReviseTuple>,
) -> RuntimeReplayer {
if let Some(p) = overrides {
replayer = replayer.with_agent_overrides(p);
}
if let Some(sink) = trace_sink {
replayer = replayer.with_tee_sink(sink);
}
if let Some((j, rubric, threshold, retries)) = revise {
replayer = replayer.with_revise_on_judge_fail(j, rubric, threshold, retries);
}
replayer
}
pub(crate) fn validate_unique_models(models: &[String]) -> Result<(), ApiError> {
use std::collections::HashSet;
let mut seen: HashSet<&str> = HashSet::with_capacity(models.len());
for m in models {
if !seen.insert(m.as_str()) {
return Err(ApiError::BadRequest(format!(
"duplicate model id in models axis: {m}"
)));
}
}
Ok(())
}
pub(crate) fn validate_judge_required_for_expectation(
expect: &Expectation,
label: &str,
live_mode: bool,
has_judge: bool,
judge_rubric: Option<&str>,
) -> Result<(), ApiError> {
if expect.min_judge_score.is_none() {
return Ok(());
}
awaken_eval::validate_min_judge_score(expect, label).map_err(ApiError::BadRequest)?;
if !live_mode {
return Err(ApiError::BadRequest(format!(
"{label} sets expect.min_judge_score; LLM grading requires mode=\"live\" with `judge`"
)));
}
if !has_judge {
return Err(ApiError::BadRequest(format!(
"{label} sets expect.min_judge_score; provide `judge` so the LLM grading criterion is actually evaluated"
)));
}
match judge_rubric {
Some(rubric) if !rubric.trim().is_empty() => {}
_ => {
return Err(ApiError::BadRequest(format!(
"{label} sets expect.min_judge_score; provide non-empty `judge.rubric` so the LLM grading criterion has an explicit rubric"
)));
}
}
Ok(())
}
pub(crate) fn cost_usd_for(report: &ReplayReport, spec: &ModelSpec) -> Option<f64> {
if report.total_input_tokens == 0 && report.total_output_tokens == 0 {
return None;
}
spec.compute_cost_usd(report.total_input_tokens, report.total_output_tokens)
}
pub(crate) fn persisted_trace_run_id(
trace_store: Option<&dyn TraceStore>,
outcome: &ReplayOutcome,
) -> Option<String> {
let run_id = outcome.trace_run_id()?;
let Some(store) = trace_store else {
tracing::warn!(
run_id = %run_id,
"dropping eval trace_run_id because no TraceStore is configured"
);
return None;
};
match store.read(run_id) {
Ok(events) if !events.is_empty() => Some(run_id.to_string()),
Ok(_) => {
tracing::warn!(
run_id = %run_id,
"dropping eval trace_run_id because TraceStore returned no events"
);
None
}
Err(err) => {
tracing::warn!(
run_id = %run_id,
error = %err,
"dropping eval trace_run_id because TraceStore read failed"
);
None
}
}
}
pub(crate) async fn run_live_eval_cells(
fixtures: &[Fixture],
resolved_cells: &[ResolvedCell],
options: LiveCellOptions,
) -> Result<Vec<EvalRunItem>, ApiError> {
let LiveCellOptions {
samples,
max_concurrent,
max_walltime_secs,
agent_base,
agent_overrides,
judge,
max_total_tokens,
trace_sink,
trace_store,
task_context,
} = options;
let walltime = std::time::Duration::from_secs(max_walltime_secs);
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent));
let mut handles = Vec::with_capacity(fixtures.len() * resolved_cells.len() * samples as usize);
let emit_sample_index = samples > 1;
for fixture in fixtures {
for resolved in resolved_cells {
for sample in 0..samples {
let fixture = fixture.clone();
let fixture_id = fixture.id.clone();
let cell = resolved.cell.clone();
let executor = resolved.executor.clone();
let upstream_model = resolved.upstream_model.clone();
let spec = resolved.spec.clone();
let overrides = agent_overrides.clone();
let base = agent_base.clone();
let trace_sink = trace_sink.clone();
let judge_for_task = judge.clone();
let revise_for_task = revise_tuple_for(judge.as_ref(), &fixture.expect);
let permit = semaphore.clone().acquire_owned().await.expect("semaphore");
handles.push(tokio::spawn(async move {
let _permit = permit;
let mut builder = RuntimeReplayer::new()
.with_live_executor(executor, upstream_model)
.with_content_capture(
awaken_ext_observability::ContentCapture::Enabled,
);
if let Some(max) = max_total_tokens {
builder = builder.with_max_total_tokens(max);
}
if let Some(b) = base {
builder = builder.with_agent_base(b);
}
let replayer =
apply_cell_decorators(builder, overrides, trace_sink, revise_for_task);
let deadline = tokio::time::Instant::now() + walltime;
let walltime_secs = walltime.as_secs();
let outcome = match tokio::time::timeout_at(deadline, async {
let outcomes =
replay_all(&replayer, std::slice::from_ref(&fixture)).await;
outcomes
.into_iter()
.next()
.expect("one fixture → one outcome")
})
.await
{
Ok(o) => o,
Err(_) => {
let (o, f) = cell_timeout_outcome(
fixture_id,
walltime_secs,
&fixture.expect,
);
return Ok::<_, ApiError>((fixture.id, cell, sample, o, f, spec));
}
};
let (outcome, failures) = match tokio::time::timeout_at(
deadline,
score_outcome(&outcome, &fixture, judge_for_task.as_ref()),
)
.await
{
Ok(Ok((failures, judge_result))) => {
let mut outcome = outcome;
if let Some(jr) = judge_result {
outcome.judge_score = Some(jr.score);
outcome.judge_reasoning = jr.reasoning;
}
(outcome, failures)
}
Ok(Err(err)) => cell_error_outcome(
outcome,
format!("scoring failed: {err}"),
&fixture.expect,
),
Err(_) => cell_error_outcome(
outcome,
format!(
"scoring timed out after {walltime_secs}s wall-clock (replay completed)"
),
&fixture.expect,
),
};
Ok::<_, ApiError>((fixture.id, cell, sample, outcome, failures, spec))
}));
}
}
}
let mut items: Vec<EvalRunItem> = Vec::with_capacity(handles.len());
for handle in handles {
let task_result = handle
.await
.map_err(|err| ApiError::Internal(format!("{task_context} task panicked: {err}")))?;
let (fixture_id, cell, sample, outcome, failures, spec) = task_result?;
let mut report = ReplayReport::from_outcome(&outcome, failures);
report.cost_usd = cost_usd_for(&report, &spec);
items.push(EvalRunItem {
fixture_id,
cell: Some(cell),
report,
trace_run_id: persisted_trace_run_id(trace_store.as_deref(), &outcome),
sample_index: if emit_sample_index {
Some(sample)
} else {
None
},
});
}
Ok(items)
}
pub(crate) fn cell_timeout_outcome(
fixture_id: String,
walltime_secs: u64,
expect: &Expectation,
) -> (ReplayOutcome, Vec<Failure>) {
let outcome = ReplayOutcome::timeout_failure(fixture_id, walltime_secs);
let failures = score(&outcome, expect);
(outcome, failures)
}
pub(crate) fn cell_error_outcome(
mut outcome: ReplayOutcome,
message: String,
expect: &Expectation,
) -> (ReplayOutcome, Vec<Failure>) {
let had_existing = outcome.runtime_failure.is_some();
if !had_existing {
outcome.runtime_failure = Some(awaken_eval::ReplayRuntimeFailure::RuntimeError {
message: message.clone(),
});
}
let mut failures = score(&outcome, expect);
if had_existing {
failures.push(Failure::ReplayRuntimeFailure {
failure: awaken_eval::ReplayRuntimeFailure::RuntimeError { message },
});
}
(outcome, failures)
}
pub(crate) async fn score_outcome(
outcome: &awaken_eval::ReplayOutcome,
fixture: &Fixture,
judge: Option<&JudgeContext>,
) -> Result<(Vec<awaken_eval::Failure>, Option<awaken_eval::JudgeResult>), ApiError> {
match (judge, fixture.expect.min_judge_score) {
(
Some(JudgeContext {
judge: j, rubric, ..
}),
Some(_),
) => {
let (failures, judge_result) = score_with_judge(
outcome,
&fixture.expect,
&fixture.judge_prompt(),
rubric.as_deref(),
j,
)
.await
.map_err(|err| ApiError::Internal(format!("judge invocation failed: {err}")))?;
Ok((failures, judge_result))
}
(None, Some(_)) => Err(ApiError::BadRequest(
"expect.min_judge_score requires `judge`; refusing to ignore the LLM grading criterion"
.into(),
)),
_ => Ok((score(outcome, &fixture.expect), None)),
}
}
#[cfg(test)]
#[path = "eval_cell_test.rs"]
mod tests;